Big Data and Spark

CMPT 353

Big Data

What exactly is big data?

The next big thing? A buzzword? Nobody knows?

For us: another tool in the ways to work with data toolkit.

Big Data

Many describe with The Four V's (or 5 V's or 7 V's).

  • Volume: the amount of data.
  • Velocity: the data arrives quickly and constantly.
  • Variety: many differently-structured (or less-structured) input data sets.
  • Veracity: some data might be incorrect, or of unknown correctness.

Big Data

What I think people really mean…

Big data: when you have so much data, it's annoying.

Big Data

For a programmer, annoying probably means so much data it can't be processed on one computer.

But one computer from EC2 can mean hundreds of cores, terabytes of memory, and effectively unlimited storage.

Do you have big data? Probably not.

Big Data

Even if most people don't work with truly-big data most of the time, it's nice to have the tools to do it when necessary.

Sometimes it's nice to know your computation can scale if a megabyte of data becomes many gigabytes.

Or maybe can't be processed on one computer should be can't be processed in a time I'm willing to wait on one computer

Compute Clusters

If one computer won't do the job, you need several: each one can store some of the data and do some of the processing, and they can work together to generate final results.

Compute cluster: several computers working together to do some work.

Compute Clusters

The compute cluster we have for use in this course: 4 nodes, 60 cores, 128 GB memory, 24 TB storage.

Really, once you have >1 CPU core or >1 computer, the interesting problems start appearing.

Compute Clusters

Actually managing work on a cluster sucks. You have all of the problems from an OS course (concurrency, interprocess communication, scheduling, …) except magnified by being a distributed system (some computers fail, network latency, …).

Do you want to worry about all that? Me neither. Obvious solution: let somebody else do it.

Hadoop

Apache Hadoop is a collection of tools for managing compute clusters.

  • YARN: managing compute jobs in the cluster.
  • HDFS: Hadoop Distributed File System, for storing data on the cluster's nodes.
  • Spark: a framework to do computation on YARN (or elsewhere).
  • [dozens of other things, but we'll only use those ↑.]

Hadoop

The goal here is to express the computation we want to do, in such a way that the work can be sent out to the cluster and done in parallel.

Spark will let us do that.

YARN will take our job and make sure all of the pieces get done somewhere, somehow (with Spark's help).

HDFS will store all of the pieces of our data files, so they're there when YARN/​Spark wants to work on them.

Small-Data Spark

Let's forget about the compute cluster for now. We'll look at Spark as a tool to express ourselves.

We can use Spark locally on one computer, with some idea that we can scale up later (but with many traps to fall into).

Small-Data Spark

Spark should already be available on the CSIL workstations.

Likely-works installation instructions for non-Windows:

pip install --user pyspark

It's possible to run Spark on Windows, but no support can be provided.

Small-Data Spark

More robust instructions for Linux/OSX if installing with Pip fails:

  1. Download Spark (3.5.3, latest pre-built).
  2. Set an environment variable so it uses Python 3 (not 2):
    export PYSPARK_PYTHON=python3
  3. Run the pyspark shell or start jobs with spark-submit.

First Spark Program

A complete Spark program:

import sys
from pyspark.sql import SparkSession, functions, types
 
spark = SparkSession.builder.appName('example 1').getOrCreate()
spark.sparkContext.setLogLevel('WARN')

assert sys.version_info >= (3,10) # make sure we have Python 3.10+
assert spark.version >= '3.5' # make sure we have Spark 3.5+

data = spark.read.csv('cities.csv', header=True,
                      inferSchema=True)
data.show()

The last two lines are real work. The rest is boilerplate that will be in all of our Spark programs.

First Spark Program

A couple of setup commands if you have installed Spark manually:

export PYSPARK_PYTHON=python3
export PATH=${PATH}:/location/of/spark-3.5.3-bin-hadoop3/bin

Then run the job:

spark-submit spark-1.py

First Spark Program

The example reads a file cities.csv. Let's say:

city,population,area
Vancouver,2463431,2878.52
Calgary,1392609,5110.21
Toronto,5928040,5905.71
Montreal,4098927,4604.26
Halifax,403390,5496.31

It will then output:

+---------+----------+-------+
|     city|population|   area|
+---------+----------+-------+
|Vancouver|   2463431|2878.52|
|  Calgary|   1392609|5110.21|
|  Toronto|   5928040|5905.71|
| Montreal|   4098927|4604.26|
|  Halifax|    403390|5496.31|
+---------+----------+-------+

Spark DataFrames

This line…

data = spark.read.csv('cities.csv', header=True,
                      inferSchema=True)

…creates an object that is the primary way we'll store and manipulate data in Spark: a DataFrame.

Spark DataFrames

A Pandas DataFrame and a Spark DataFrame are not the same thing. Spark's DataFrames were inspired by Pandas (and DataFrames in R).

Spark's DataFrames work differently, often because of their basic job: letting you do things in parallel across a cluster. Sometimes just because of design differences.

Spark DataFrames

Some things will be familiar. Like Pandas, Spark DataFrames:

  • have rows and columns.
  • have a schema: each column has a name and a type.
  • are operated on by implicitly doing operations on every element, not by explicitly iterating.
  • can be created from files; written to files.

Spark DataFrames

A lot of the operations are spelled differently, but you can see the similarities:

cities = spark.read.csv('cities.csv', header=True,
                        inferSchema=True)
cities.printSchema()
root
 |-- city: string (nullable = true)
 |-- population: integer (nullable = true)
 |-- area: double (nullable = true)

Spark DataFrames

We can do operations that feel familiar:

c = cities.filter(cities['area'] < 5000)
c = c.select(c['city'], c['population'])
c.show()
+---------+----------+
|     city|population|
+---------+----------+
| Montreal|   4098927|
|Vancouver|   2463431|
+---------+----------+

Spark DataFrames

And we can write the results:

c.write.json('spark-output', mode='overwrite')

But it doesn't create a file. It creates a directory with several files:

$ ls spark-output/
part-00000-cf9efe32-ee0e-41a5-a97f-442cbe3a7de8.json
part-00001-cf9efe32-ee0e-41a5-a97f-442cbe3a7de8.json
part-00002-cf9efe32-ee0e-41a5-a97f-442cbe3a7de8.json
_SUCCESS

The real output is the concatenation of those files.

Inspecting DataFrames

One thing we have seen on a DataFrame: the .show() method. It prints the first rows (20 by default) so you can see what's going on.

c.show()
+---------+----------+
|     city|population|
+---------+----------+
| Montreal|   4098927|
|Vancouver|   2463431|
+---------+----------+

Inspecting DataFrames

It's often useful to inspect a DataFrame and then bail out of the computation so you can see the output near the end. A common pattern I find myself writing:

def main(input_dir, output_dir):
    ⋮ 
    some_df = …
    #some_df.show()
    #return
    ⋮

if __name__ == '__main__':
    input_dir = sys.argv[1]
    output_dir = sys.argv[2]
    main(input_dir, output_dir)

Inspecting DataFrames

Of course, you can also df.write.… to see what's in a DataFrame. There are other methods/​properties that might be useful (but .describe() may be expensive):

print(cities.dtypes)
print(cities.schema) # or cities.printSchema()
cities.describe().show()
[('city', 'string'), ('population', 'int'), ('area', 'double')]
StructType(List(StructField(city,StringType,true),StructField(population,IntegerType,true),StructField(area,DoubleType,true)))
+-------+---------+------------------+-----------------+
|summary|     city|        population|             area|
+-------+---------+------------------+-----------------+
|  count|        5|                 5|                5|
|   mean|     null|         2857279.4|         4799.002|
| stddev|     null|2196201.7527625505|1176.229890612375|
|    min|  Calgary|            403390|          2878.52|
|    max|Vancouver|           5928040|          5905.71|
+-------+---------+------------------+-----------------+

Operating on DataFrames

In the previous code, we saw two methods on DataFrames:

c = cities.filter(cities['area'] < 5000)
c = c.select(c['city'], c['population'])

… and they feel very SQL-like. (Actually .where() is a synonym for .filter()very SQL-like.)

Operating on DataFrames

The .select() method creates a new DataFrame of the columns you specify: either existing or a calculation.

some_values = cities.select(
    cities['city'],
    cities['area'] * 1000000
)
some_values.show()
+---------+----------------+
|     city|(area * 1000000)|
+---------+----------------+
|Vancouver|       2.87852E9|
|  Calgary|       5.11021E9|
|  Toronto|       5.90571E9|
| Montreal|       4.60426E9|
|  Halifax|       5.49631E9|
+---------+----------------+

Operating on DataFrames

That could have been prettier…

some_values = cities.select(
    cities['city'],
    (cities['area'] * 1000000).alias('area_m2')
)
some_values.show()
+---------+---------+
|     city|  area_m2|
+---------+---------+
|Vancouver|2.87852E9|
|  Calgary|5.11021E9|
|  Toronto|5.90571E9|
| Montreal|4.60426E9|
|  Halifax|5.49631E9|
+---------+---------+

Operating on DataFrames

The .filter() method keeps rows where the condition is true.

some_values = cities.filter(cities['population'] % 2 == 1)
some_values.show()
+---------+----------+-------+
|     city|population|   area|
+---------+----------+-------+
|Vancouver|   2463431|2878.52|
|  Calgary|   1392609|5110.21|
| Montreal|   4098927|4604.26|
+---------+----------+-------+

Operating on DataFrames

These methods all create a new DataFrame object. These are exactly equivalent:

cities = spark.read.csv('cities.csv', header=True, inferSchema=True)
c_small = cities.filter(cities['area'] < 5000)
c_droparea = c_small.select(c_small['city'], c_small['population'])
c_droparea.show()
cities = spark.read.csv('cities.csv', header=True, inferSchema=True)
cities.filter(cities['area'] < 5000).select(cities['city'], cities['population']).show()

This is typical: operations tend to build new DataFrames, and it's common to have many as you construct the final result. Giving them a name is just a question of style.

Operating on DataFrames

There are many methods on DataFrames that you'll find useful.

# Return a new DataFrame...
c = cities.withColumn('area_m2', cities['area'] * 1000000)
c = cities.drop('area') # DF without 'area' column
c = cities.drop_duplicates() # remove duplicate rows
c = cities.na.drop() # remove any rows with NaN values
c = cities.sort([cities['city'], cities['population']])
c = cities.sample(withReplacement=False, fraction=0.5)
 
# Returns a number...
r = cities.stat.corr(cities['population'], cities['area'])

DataFrames are Partitioned

Let's start thinking about big data…

The underlying assumption is that a Spark Data­Frame will not fit in any single computer's memory or disk. All we can hope for is to store pieces of it on many different computers.

All Spark DataFrames are partitioned to make this work.

DataFrames are Partitioned

Subsets of rows are handled separately by different processes/​threads. Each piece can (hopefully) be operated on in parallel.

If operations can truly be done in parallel without much coordination, \(n\) processes can do the work almost \(n\) times faster. (Unfortunately, that won't always be true.)

DataFrames are Partitioned

Important point: a partition is the smallest unit that can be operated on in parallel.

If you have 2 partitions, you'll be using at most 2 processor cores to work on that DataFrame.

Spark Input & Output

This extends to input and output. Each thread/​process/​core/​executor is responsible for reading individual input files. When writing, they can each write in parallel.

Thus the output: each partition was written as a separate file in the output directory.

We will need a shared filesystem to make that work out: more later.

Spark Input & Output

Here's roughly what happened to get us three output files in the earlier example:

DataFrame partitions

One partition is empty: that's odd but not a disaster.

Spark Input & Output

Generally, we will give a directory as our input.

Spark will read all files in that directory, and automatically decompress them if needed. The semantics: the “data” is the combination of everything in those files.

The way the data is split between files is the way the DataFrame will be partitioned initially.

Spark Input & Output

We'll often have a collection of (possibly compressed) files as output. Usually you can inspect them with something like:

cat output/part-*.csv | less
cat output/part-*.csv.gz | zless

Hadoop + Spark

Running Spark on a local machine isn't crazy: it will use all the processor cores available. That's not easy with NumPy/Pandas.

… but it's not the point. We want to be able to attack larger problems.

Hadoop + Spark

The Hadoop infrastructure is what runs our cluster. Review:

  • YARN: manages compute jobs on the cluster. Responsible for getting computation done.
  • HDFS: Hadoop Distributed File System, for storing data on the cluster's nodes.

We'll be using YARN to run Spark jobs on the cluster, and HDFS for input and output.

Hadoop + Spark

Running Spark jobs in the two ways we'll see is generally the same.

  • Locally, on your computer, with the local filesystem.
  • On the cluster, controlled by YARN, with HDFS.

… but a few differences will confuse you if you're not aware of them.

Hadoop + Spark

On your computer, when you do something like this:

spark-submit program.py /some/path/inputs output

… your job runs on your computer using files (/some/path/inputs and output) from your computer's filesystem. That's the default.

On the cluster it is submitted to YARN and the path names refer to files on the cluster HDFS. (There is a configuration there that tells Spark to use YARN/​HDFS.)

Hadoop + Spark

Here's how things are arranged on our cluster:

our cluster layout
* * *

HDFS

HDFS is maybe badly named: it's not a filesystem in the standard Unix way.

It's not mounted: you can't do ls, cp, cat, tab completion, etc.

It does store files in directories with owners, permissions, etc.

HDFS

There are commands for HDFS that are analogous:

hdfs dfs -ls  # list files in your HDFS home
hdfs dfs -ls /courses/353  # list course data sets
hdfs dfs -copyFromLocal data  # copy file/directory to HDFS
hdfs dfs -copyToLocal output  # copy file/directory to gateway
hdfs dfs -cat output/part* | less  # show output files
hdfs dfs -cat output/part* | zless  # show compressed output

HDFS

What HDFS is actually doing:

  • Storing files on the \(n\) worker nodes.
  • … split into blocks (of 128 MB on our cluster).
  • … with each block replicated in a couple of places (by default 3; default 2 on our cluster).
  • Making them available to our compute jobs when needed.

HDFS

Whenever we create a file in HDFS, it will take care of replicating it. We end up with something like (with 5 files, assuming one block each, replication 3):

HDFs replication

The file part-00000 can be accessed locally on nodes 1, 3, 4.

YARN

It's YARN's responsibility to get the compute work done: manage the CPU and memory resources.

When you start a job on the cluster, it's a request to YARN to give you the resources you need.

i.e. the spark-submit command is the way we generally interact with YARN.

YARN

It's easier to move the compute work to the data than to move the data.

YARN tries to access data from a node where the data can be found on HDFS.

YARN

Spark asks YARN: I need to do some work on HDFS files f1, f2, f3. Please give me three cores near them with 2GB each of memory.

Or simply I need to do work on 20 partitions, so please give me 20 cores with 2GB each.

YARN can respond I have two available and start the tasks there.

YARN

You can have a look at what's going on in the HDFS and YARN web front-ends if you have SSH port forwards set up, as the instructions suggest.