What exactly is big data
?
The next big thing? A buzzword? Nobody knows?
For us: another tool in the ways to work with data
toolkit.
Many describe with The Four V's
(or 5 V's or 7 V's).
What I think people really mean…
Big data: when you have so much data, it's annoying.
For a programmer, annoying
probably means so much data it can't be processed on one computer
.
But one computer
from EC2 can mean hundreds of cores, terabytes of memory, and effectively unlimited storage.
Do you have big data
? Probably not.
Even if most people don't work with truly-big data most of the time, it's nice to have the tools to do it when necessary.
Sometimes it's nice to know your computation can scale if a megabyte of data becomes many gigabytes.
Or maybe can't be processed on one computer
should be can't be processed in a time I'm willing to wait on one computer
If one computer won't do the job, you need several: each one can store some of the data and do some of the processing, and they can work together to generate final results.
Compute cluster: several computers working together to do some work.
The compute cluster we have for use in this course: 4 nodes, 60 cores, 128 GB memory, 24 TB storage.
Really, once you have >1 CPU core or >1 computer, the interesting problems start appearing.
Actually managing work on a cluster sucks. You have all of the problems from an OS course (concurrency, interprocess communication, scheduling, …) except magnified by being a distributed system (some computers fail, network latency, …).
Do you want to worry about all that? Me neither. Obvious solution: let somebody else do it.
Apache Hadoop is a collection of tools for managing compute clusters.
The goal here is to express the computation we want to do, in such a way that the work can be sent out to the cluster and done in parallel.
Spark will let us do that.
YARN will take our job and make sure all of the pieces get done somewhere, somehow (with Spark's help).
HDFS will store all of the pieces of our data files, so they're there when YARN/Spark wants to work on them.
Let's forget about the compute cluster for now. We'll look at Spark as a tool to express ourselves.
We can use Spark locally on one computer, with some idea that we can scale up later (but with many traps to fall into).
Spark should already be available on the CSIL workstations.
Likely-works installation instructions for non-Windows:
pip install --user pyspark
It's possible to run Spark on Windows, but no support can be provided.
More robust instructions for Linux/OSX if installing with Pip fails:
pre-built).
export PYSPARK_PYTHON=python3
pyspark
shell or start jobs with spark-submit
.A complete Spark program:
import sys from pyspark.sql import SparkSession, functions, types spark = SparkSession.builder.appName('example 1').getOrCreate() spark.sparkContext.setLogLevel('WARN') assert sys.version_info >= (3,10) # make sure we have Python 3.10+ assert spark.version >= '3.5' # make sure we have Spark 3.5+ data = spark.read.csv('cities.csv', header=True, inferSchema=True) data.show()
The last two lines are real work. The rest is boilerplate that will be in all of our Spark programs.
A couple of setup commands if you have installed Spark manually:
export PYSPARK_PYTHON=python3 export PATH=${PATH}:/location/of/spark-3.5.3-bin-hadoop3/bin
Then run the job:
spark-submit spark-1.py
The example reads a file cities.csv
. Let's say:
city,population,area Vancouver,2463431,2878.52 Calgary,1392609,5110.21 Toronto,5928040,5905.71 Montreal,4098927,4604.26 Halifax,403390,5496.31
It will then output:
+---------+----------+-------+ | city|population| area| +---------+----------+-------+ |Vancouver| 2463431|2878.52| | Calgary| 1392609|5110.21| | Toronto| 5928040|5905.71| | Montreal| 4098927|4604.26| | Halifax| 403390|5496.31| +---------+----------+-------+
This line…
data = spark.read.csv('cities.csv', header=True, inferSchema=True)
…creates an object that is the primary way we'll store and manipulate data in Spark: a DataFrame.
A Pandas DataFrame and a Spark DataFrame are not the same thing. Spark's DataFrames were inspired by Pandas (and DataFrames in R).
Spark's DataFrames work differently, often because of their basic job: letting you do things in parallel across a cluster. Sometimes just because of design differences.
Some things will be familiar. Like Pandas, Spark DataFrames:
A lot of the operations are spelled differently, but you can see the similarities:
cities = spark.read.csv('cities.csv', header=True, inferSchema=True) cities.printSchema()
root |-- city: string (nullable = true) |-- population: integer (nullable = true) |-- area: double (nullable = true)
We can do operations that feel familiar:
c = cities.filter(cities['area'] < 5000) c = c.select(c['city'], c['population']) c.show()
+---------+----------+ | city|population| +---------+----------+ | Montreal| 4098927| |Vancouver| 2463431| +---------+----------+
And we can write the results:
c.write.json('spark-output', mode='overwrite')
But it doesn't create a file. It creates a directory with several files:
$ ls spark-output/
part-00000-cf9efe32-ee0e-41a5-a97f-442cbe3a7de8.json part-00001-cf9efe32-ee0e-41a5-a97f-442cbe3a7de8.json part-00002-cf9efe32-ee0e-41a5-a97f-442cbe3a7de8.json _SUCCESS
The real
output is the concatenation of those files.
One thing we have seen on a DataFrame: the .show()
method. It prints the first rows (20 by default) so you can see what's going on.
c.show()
+---------+----------+ | city|population| +---------+----------+ | Montreal| 4098927| |Vancouver| 2463431| +---------+----------+
It's often useful to inspect a DataFrame and then bail out of the computation so you can see the output near the end. A common pattern I find myself writing:
def main(input_dir, output_dir): ⋮ some_df = … #some_df.show() #return ⋮ if __name__ == '__main__': input_dir = sys.argv[1] output_dir = sys.argv[2] main(input_dir, output_dir)
Of course, you can also df.write.…
to see what's in a DataFrame. There are other methods/properties that might be useful (but .describe()
may be expensive):
print(cities.dtypes) print(cities.schema) # or cities.printSchema() cities.describe().show()
[('city', 'string'), ('population', 'int'), ('area', 'double')] StructType(List(StructField(city,StringType,true),StructField(population,IntegerType,true),StructField(area,DoubleType,true))) +-------+---------+------------------+-----------------+ |summary| city| population| area| +-------+---------+------------------+-----------------+ | count| 5| 5| 5| | mean| null| 2857279.4| 4799.002| | stddev| null|2196201.7527625505|1176.229890612375| | min| Calgary| 403390| 2878.52| | max|Vancouver| 5928040| 5905.71| +-------+---------+------------------+-----------------+
In the previous code, we saw two methods on DataFrames:
c = cities.filter(cities['area'] < 5000) c = c.select(c['city'], c['population'])
… and they feel very SQL-like. (Actually .where()
is a synonym for .filter()
… very SQL-like.)
The .select()
method creates a new DataFrame of the columns you specify: either existing or a calculation.
some_values = cities.select( cities['city'], cities['area'] * 1000000 ) some_values.show()
+---------+----------------+ | city|(area * 1000000)| +---------+----------------+ |Vancouver| 2.87852E9| | Calgary| 5.11021E9| | Toronto| 5.90571E9| | Montreal| 4.60426E9| | Halifax| 5.49631E9| +---------+----------------+
That could have been prettier…
some_values = cities.select( cities['city'], (cities['area'] * 1000000).alias('area_m2') ) some_values.show()
+---------+---------+ | city| area_m2| +---------+---------+ |Vancouver|2.87852E9| | Calgary|5.11021E9| | Toronto|5.90571E9| | Montreal|4.60426E9| | Halifax|5.49631E9| +---------+---------+
The .filter()
method keeps rows where the condition is true.
some_values = cities.filter(cities['population'] % 2 == 1) some_values.show()
+---------+----------+-------+ | city|population| area| +---------+----------+-------+ |Vancouver| 2463431|2878.52| | Calgary| 1392609|5110.21| | Montreal| 4098927|4604.26| +---------+----------+-------+
These methods all create a new DataFrame object. These are exactly equivalent:
cities = spark.read.csv('cities.csv', header=True, inferSchema=True) c_small = cities.filter(cities['area'] < 5000) c_droparea = c_small.select(c_small['city'], c_small['population']) c_droparea.show()
cities = spark.read.csv('cities.csv', header=True, inferSchema=True) cities.filter(cities['area'] < 5000).select(cities['city'], cities['population']).show()
This is typical: operations tend to build new DataFrames, and it's common to have many as you construct the final result. Giving them a name is just a question of style.
There are many methods on DataFrames that you'll find useful.
# Return a new DataFrame... c = cities.withColumn('area_m2', cities['area'] * 1000000) c = cities.drop('area') # DF without 'area' column c = cities.drop_duplicates() # remove duplicate rows c = cities.na.drop() # remove any rows with NaN values c = cities.sort([cities['city'], cities['population']]) c = cities.sample(withReplacement=False, fraction=0.5) # Returns a number... r = cities.stat.corr(cities['population'], cities['area'])
Let's start thinking about big data…
The underlying assumption is that a Spark DataFrame will not fit in any single computer's memory or disk. All we can hope for is to store pieces of it on many different computers.
All Spark DataFrames are partitioned to make this work.
Subsets of rows are handled separately by different processes/threads. Each piece can (hopefully) be operated on in parallel.
If operations can truly be done in parallel without much coordination, \(n\) processes can do the work almost \(n\) times faster. (Unfortunately, that won't always be true.)
Important point: a partition is the smallest unit that can be operated on in parallel.
If you have 2 partitions, you'll be using at most 2 processor cores to work on that DataFrame.
This extends to input and output. Each thread/process/core/executor is responsible for reading individual input files. When writing, they can each write in parallel.
Thus the output: each partition was written as a separate file in the output directory.
We will need a shared filesystem to make that work out: more later.
Here's roughly what happened to get us three output files in the earlier example:
One partition is empty: that's odd but not a disaster.
Generally, we will give a directory as our input.
Spark will read all files in that directory, and automatically decompress them if needed. The semantics: the “data” is the combination of everything in those files.
The way the data is split between files is the way the DataFrame will be partitioned initially.
We'll often have a collection of (possibly compressed) files as output. Usually you can inspect them with something like:
cat output/part-*.csv | less
cat output/part-*.csv.gz | zless
Running Spark on a local machine isn't crazy: it will use all the processor cores available. That's not easy with NumPy/Pandas.
… but it's not the point. We want to be able to attack larger problems.
The Hadoop infrastructure is what runs our cluster. Review:
We'll be using YARN to run Spark jobs on the cluster, and HDFS for input and output.
Running Spark jobs in the two ways we'll see is generally the same.
… but a few differences will confuse you if you're not aware of them.
On your computer, when you do something like this:
spark-submit program.py /some/path/inputs output
… your job runs on your computer using files (/some/path/inputs
and output
) from your computer's filesystem. That's the default.
On the cluster it is submitted to YARN and the path names refer to files on the cluster HDFS. (There is a configuration there that tells Spark to use YARN/HDFS.)
Here's how things are arranged on our cluster:
* * *HDFS is maybe badly named: it's not a filesystem in the standard Unix way.
It's not mounted: you can't do ls
, cp
, cat
, tab completion, etc.
It does store files in directories with owners, permissions, etc.
There are commands for HDFS that are analogous:
hdfs dfs -ls # list files in your HDFS home hdfs dfs -ls /courses/353 # list course data sets hdfs dfs -copyFromLocal data # copy file/directory to HDFS hdfs dfs -copyToLocal output # copy file/directory to gateway hdfs dfs -cat output/part* | less # show output files hdfs dfs -cat output/part* | zless # show compressed output
What HDFS is actually doing:
Whenever we create a file in HDFS, it will take care of replicating it. We end up with something like (with 5 files, assuming one block each, replication 3):
The file part-00000
can be accessed locally on nodes 1, 3, 4.
It's YARN's responsibility to get the compute work done: manage the CPU and memory resources.
When you start a job on the cluster, it's a request to YARN to give you the resources you need.
i.e. the spark-submit
command is the way we generally interact with YARN.
It's easier to move the compute work to the data than to move the data.
YARN tries to access data from a node where the data can be found on HDFS.
Spark asks YARN: I need to do some work on HDFS files
.f1
, f2
, f3
. Please give me three cores near them with 2GB each of memory
Or simply I need to do work on 20 partitions, so please give me 20 cores with 2GB each
.
YARN can respond I have two available
and start the tasks there.
You can have a look at what's going on in the HDFS and YARN web front-ends if you have SSH port forwards set up, as the instructions suggest.