Working With Spark

CMPT 353, Fall 2019

Moving Data

Why avoid shuffling? Network is slower than memory.

Tech Size Latency
(cycles)
Throughput
(B/s)
L1 cache <100 kB 4 * 250 G–1 T **
RAM GB 50–100 ** 20 G–50 G *
SSD 100s GB 106 * 500 M (per disk) *
Spinning HD TB 107 * 100 M (per disk) *
Network ∞? 107 120 M (shared?) *

Moving Data

If we want to sort a 1 TB DataFrame, we will have to move most of the data around the cluster to get it in order. So, a rough time estimate:

\[ \frac{1000\ \mathrm{GB} \times 8\ \mathrm{b}/\mathrm{B}}{1\ \mathrm{Gb}/\mathrm{s}} = 8000\ \mathrm{s} \approx 130\ \mathrm{minutes}\,, \]

just to move the data. Assuming 1G Ethernet, blocking; data either in memory or the disks can keep up.

≈13 minutes if you upgrade to 10G Ethernet.

Moving Data

Joining two 1 TB DataFrames will about double that, to get equal keys together.

At some scale, shuffling the data is something you either can't do, or can only do very carefully.

Column Expressions

When doing calculations with Spark DataFrames, we have been writing expressions that operate on columns:

df['col1']
df['col1'] + 1
df['col1'] * df['col2']
(df['col1'] == 1) & (df['col2'].isNull())
functions.length(df['col1'])

Each of these is a Spark column expression, represented as a Column object. Like with DataFrames, these are lazily evaluated.

Column Expressions

Just about everywhere we have done some kind of calculation, it was a column expression.

df.select(colexpr, colexpr, colexpr)
df.filter(colexpr)
df.groupBy(colexpr).agg(colexpr)
df1.join(df2, on=colexpr)

Column Expressions

There are some useful methods on Column objects, and Python operators are overloaded to work on them:

df['col1'] ** 2
df['col1'].isNotNull()
(df['col1'] / 10).alias('onetenth')
df['str'].astype(types.IntegerType())

Column Expressions

Column expressions can be produced by extracting a column from a DataFrame, doing some calculation on a column, or by calling a column function.

df['col1']
df['col1'] - df['col2']
functions.sin(df['col1'])

Column Functions

Many column functions are provided in the pyspark.sql.functions module we have been importing.

from pyspark.sql import SparkSession, functions, types

Column Functions

There is a huge variety of functions in that module.

functions.abs(df['number'])
functions.datediff(df['date1'], df['date2'])
functions.format_string('%d-%s', df['number'], df['label'])
functions.length(df['str'])
functions.concat(df['str1'], df['str2'])

Column Functions

Some of the functions are aggregation functions that are likely to be used near a .groupBy().

groups = df.groupBy(df['col1'])
groups.agg(functions.approx_count_distinct(df['col2']))
groups.agg(functions.countDistinct(df['col2']))
groups.agg(functions.avg(df['col2']))
groups.agg(functions.collect_list(df['col2']))

Column Functions

There seems to be no real consistency in what is a Column method and what is a function, or the naming convention.

df['col1'].between(10, 100)
(df['col1'] >= 10) & (df['col1'] <= 100)

df['col1'].startswith('x')
functions.substring(df['col1'], 0, 1) == 'x'

groups.agg(functions.approx_count_distinct(df['col2']))
groups.agg(functions.countDistinct(df['col2']))

Who Calculates?

When you create a column expression/​DataFrame, and eventually the actual calculation happens, how does it get done?

We have been writing Python code to describe the logic as a DataFrame operation, but that isn't the whole story…

Who Calculates?

For Spark DataFrames, the actual implementation is in Scala, which compiles to the Java Virtual Machine (JVM).

All of our DataFrame data and work has been sent out to some JVM process: our Python code has been doing nothing but building execution plans.

Who Calculates?

There are Spark implementations for other languages: Scala, Java, R.

As long as we stay with DataFrames, there is almost no difference in what happens: same plan, optimization, speed, etc.

There are a few more API options in Scala/​Java because of the type system and ties to the JVM.

Who Calculates?

The good: we get our calculations done at the speed of the underlying JVM implementation (fast). We can switch programming languages more-or-less freely.

The bad: when something goes wrong, the chances of getting a coherent stack trace between two languages, across a cluster, and with lazy evaluation are poor.

User-Defined Functions

If we want to do some calculation that the Spark API can't express (easily), then it would be nice to just write a Python function to do the work.

The data has to be sent into a Python process and back again. That can be done, but isn't cheap.

User-Defined Functions

The function functions.udf will turn a Python function into a user-defined function that can work on Column objects (similar to np.vectorize).

You must specify the returnType, so the execution plan can be built sensibly.

def complicated_function(a, b):
    return a + 2*b  # pretend this is Python-specific logic.

complicated_udf = functions.udf(complicated_function,
                        returnType=types.IntegerType())

User-Defined Functions

Then use it like any other column function:

ints = spark.range(10000)
result = ints.select(
    ints['id'],
    complicated_udf(ints['id'], ints['id']+1).alias('res')
)
result.show(5)
+---+---+
| id|res|
+---+---+
|  0|  2|
|  1|  5|
|  2|  8|
|  3| 11|
|  4| 14|
+---+---+
only showing top 5 rows

User-Defined Functions

Your UDF logic is sent out to the executors. Data is converted from JVM representation to Python, the function is called in a Python process, and result sent back into the JVM.

It's going to be much slower that is than doing arithmetic directly in the JVM. * * *

Spark Python communication

User-Defined Functions

A UDF can be useful to bridge some other Python module into Spark.

e.g. need a DataFrame of RGB values converted to LAB? Option 1: implement the RGB to LAB conversion with DataFrame methods. Option 2: Create a two-line UDF that uses a function from scikit-image or python-colormath.

User-Defined Functions

There is a new option in Spark 2.3: a Vectorized UDFs where you get a Pandas DataFrame of a partition at a time, which can be created efficiently because of Apache Arrow. You do Python work and return the new partition.

Much faster than Python UDFs. Probably still slower than Spark DataFrame logic.

User-Defined Functions

How will they compare? Let's try a simple example.

Remember the first option: do it in Spark DataFrame calculations and never run any Python logic:

res = df.select(
    df['a'] + 2*df['b']*functions.log2(df['a'])
)

User-Defined Functions

But if the computation was much easier to implement with NumPy or Pandas DataFrame operations, we could:

@functions.pandas_udf(returnType=types.DoubleType())
def pandas_logic(a, b):
    return a + 2*b*np.log2(a)
res = df.select(pandas_logic(df['a'], df['b']))

Or with pure Python operations if we must:

@functions.udf(returnType=types.DoubleType())
def python_logic(a, b):
    return a + 2*b*math.log2(a)
res = df.select(python_logic(df['a'], df['b']))

User-Defined Functions

How long?

ImplementationTime
Spark DataFrame operations20 s
Pandas UDF32 s
Python UDF71 s

Others' examples suggest that the differences can be much larger than this. It depends on the calculation.

Or Option 4: find a Java/​Scala library to do work you need, or write the UDF there.

SQL?

There have been some hints that some concepts from SQL are in here somewhere.

  • The original import:
    from pyspark.sql import …
  • Terminology: row, column, UDF.
  • The operations we are doing: .select(), .filter() (== .where()), .groupBy(), .sort() (== .orderBy()), .join().

SQL?

There is no relational database in Spark anywhere, but the ideas are there. Why?

  • SQL was created in 1986 and still in use: it's obviously something useful.
  • It's highly-evolved to do what people actually need to do with data, and is well understood by developers.
  • It is high-level enough to let an optimizer do its job very well. Compiler writers have decades of literature on optimizing SQL to draw from.

SQL?

In some sense, SQL is the ultimate data science language.

As various NoSQL databases matured, a curious thing happened to their APIs: they started looking more like SQL. This is because SQL is a pretty direct implementation of relational set theory, and math is hard to fool. Carlos Bueno, Cache is the new RAM

SQL?

It's actually possible to use SQL syntax (with their SQL dialect) for Spark DataFrame operations.

ints = spark.range(10000)
ints.createOrReplaceTempView('int_table')
result = spark.sql(
    "SELECT id, id+1 AS id1 FROM int_table WHERE id%2 = 0")
result.show(5)
+---+---+
| id|id1|
+---+---+
|  0|  1|
|  2|  3|
|  4|  5|
|  6|  7|
|  8|  9|
+---+---+
only showing top 5 rows

SQL?

The SQL syntax produces the same execution plan as the Python method calls.

result.explain()
result2 = ints.filter(ints['id']%2 == 0) \
    .select(ints['id'], (ints['id']+1).alias('id1'))
result2.explain()
== Physical Plan ==
*(1) Project [id#14L, (id#14L + 1) AS id1#16L]
+- *(1) Filter ((id#14L % 2) = 0)
   +- *(1) Range (0, 10000, step=1, splits=12)
== Physical Plan ==
*(1) Project [id#14L, (id#14L + 1) AS id1#27L]
+- *(1) Filter ((id#14L % 2) = 0)
   +- *(1) Range (0, 10000, step=1, splits=12)

SQL?

A SQL WHERE clause can be used with .filter()/​.where() calls:

result3 = ints.filter('id%2 = 0') \
    .select(ints['id'], (ints['id']+1).alias('id1'))
result3.explain()
== Physical Plan ==
*(1) Project [id#14L, (id#14L + 1) AS id1#31L]
+- *(1) Filter ((id#14L % 2) = 0)
   +- *(1) Range (0, 10000, step=1, splits=12)

SQL?

Personally, I find the SQL syntax easy for simple cases, but much harder for any complicated multi-step computation.

Your call.

Note: you can't .cache() in the SQL syntax: you have to come back to Python and .cache() the DataFrame object. *

RDDs

The high-level API for DataFrames in Spark is great: we can express calculations we want easily; Spark can optimize and plan well; we get the results we needed.

But, there's a lot more happening behind the scenes. Some of it's worth exploring a little: we will have a little more flexibility.

RDDs

The underlying data structure that Spark manages is the RDD or Resilient Distributed Dataset.

An RDD is fundamentally one-dimensional: it holds a collection of whatever values you put into it. Something like a list/​array, but distributed.

RDDs

DataFrames are implemented as a specific type of RDDs: they are an RDD of Row objects. *

We have never seen the Rows explicitly in Python: they are managed by (faster) Scala code but we can get at the Row objects if we want…

RDDs

You can ask for df.rdd to get the RDD:

cities.show()
from pprint import pprint
pprint(cities.rdd.take(5))
+---------+----------+-------+
|     city|population|   area|
+---------+----------+-------+
|Vancouver|   2463431|2878.52|
|  Calgary|   1392609|5110.21|
|  Toronto|   5928040|5905.71|
| Montreal|   4098927|4604.26|
|  Halifax|    403390|5496.31|
+---------+----------+-------+

[Row(city='Vancouver', population=2463431, area=2878.52),
 Row(city='Calgary', population=1392609, area=5110.21),
 Row(city='Toronto', population=5928040, area=5905.71),
 Row(city='Montreal', population=4098927, area=4604.26),
 Row(city='Halifax', population=403390, area=5496.31)]

RDDs

That RDD contained five values: each a Row object.

When we're working in Python, the RDD is storing Python objects (serialized with pickle). The Scala code doing the work just treats them as an opaque collection of bytes.

RDDs

Therefore, if we want to work on an RDD, we're going to do the work in Python. Things become much more manual.

sc = spark.sparkContext
city_rdd = sc.textFile('cities.csv') # an RDD
pprint(city_rdd.take(6))
['city,population,area',
 'Vancouver,2463431,2878.52',
 'Calgary,1392609,5110.21',
 'Toronto,5928040,5905.71',
 'Montreal,4098927,4604.26',
 'Halifax,403390,5496.31']

Note: data is just six Python strings.

RDDs

We can now do whatever Python logic we want.

def first_field_upper(line):
    return line.split(',')[0].upper()
def is_not_header(city):
    return city != 'CITY'

city_names = city_rdd.map(first_field_upper) \
        .filter(is_not_header)
pprint(city_names.take(6))
['VANCOUVER', 'CALGARY', 'TORONTO', 'MONTREAL', 'HALIFAX']

RDDs

RDD methods we saw there:

  • rdd.take(n): retrieve the first n elements from the RDD as a Python list.
  • rdd.map(f): apply function f to each element, creating a new RDD from the returned values.
  • rdd.filter(f): apply function f to each element, keep rows where it returned True.

Not so different than df.show(n), df.select(…), and df.filter(…), but we had to write more code.

RDDs

Things we can infer:

  • The methods on RDDs and DataFrames are different: they're dealing with different data structures.
  • All DataFrame operations are implemented as Scala/JVM operations, not as Python logic.
  • Operating on RDDs (from Python) will generally be slower than on DataFrames: we lose the JVM speed and the optimizer.
  • Doing df.rdd isn't free: JVM Row objects must be converted to Python Row objects.

RDDs

The default choice for most Spark work will probably still be DataFrames.

The flexibility of RDD can be easier for extracting data in a non-DataFrame-friendly format. e.g.

Canada:5, 6.324
USA:9, 12.310
France:10, 8.9003
Thailand:2, 14.291

We can create an RDD of Rows and then convert that to a DataFrame.

RDDs

Can do arbitrary Python logic with minimal slowdown.

from pyspark.sql import Row
def extract_from_lines(line):
    city, tail = line.split(':')
    i, n = tail.split(', ')
    return Row(city=city, count=int(i), measure=float(n))

lines = spark.sparkContext.textFile('odd_data.txt')
rows = lines.map(extract_from_lines)
data = spark.createDataFrame(rows, schema='city:string, count:int, measure:double')
data.show()
+--------+-----+-------+
|    city|count|measure|
+--------+-----+-------+
|  Canada|    5|  6.324|
|     USA|    9|  12.31|
|  France|   10| 8.9003|
|Thailand|    2| 14.291|
+--------+-----+-------+

RDDs

It was possible to do that with DataFrames functions, but it would have been a pain (but possibly faster).

Or with a DataFrame UDF, but a little harder and probably about the same performance.

Row-Oriented Data

We previously saw that Pandas was column-oriented: the thing stored together in memory was a column (series in Pandas' terminology) from a DataFrame.

From the way Spark deals with DataFrames/​RDDs, we can guess that they are row-oriented.

Row-Oriented Data

Being row-oriented gives some hint why Pandas and Spark DataFrame methods are a little different.

With Spark, it makes more sense to do all of the operations on a row at once, so we tend to be specifying a whole DataFrame operation (like .select() that can use any of the columns).

Row-Oriented Data

Except row-oriented storage is worse: memory layout isn't right for using a processor's SIMD (vector/​SSE) instructions. Memory locality is worse for individual-column operations.

Those need values where you want to do the same operations (usually a whole column) adjacent in memory.

Row-Oriented Data

It turns out this can be optimized away too: the Tungsten optimizer can store columns if it wants. We still see a row-oriented API.

Also, DataFrames being an RDD of Row objects might be a lie: the optimizer might not implement what the API makes us imagine. [That's why Vectorized UDFs are fast.]

Spark ↔ Python

Until now, the data we have been manipulating in Spark has been handled only by Spark, never as a Pandas DataFrame, or NumPy array, or Python list, or anything else.

That's usually good: if we actually have big data, then we should let Spark do what it's good at. But sometimes the big/​small data boundary gets crossed.

Spark ↔ Python

Example where small data becomes big: maybe we have a large data set and want to (broadcast) join a small table we just generated in Python. We would need to turn it into a DataFrame.

Example of big → small: we just did a .groupBy() and only have a few thousand rows left. The next analysis we're doing would be easier in Pandas than Spark.

Spark ↔ Python

The spark.createDataFrame function can take many inputs. We previously saw an RDD of rows, but it can also take a Pandas DataFrame:

import pandas as pd
pd_data = pd.DataFrame([[1,2], [3,4], [5,6]],
                       columns=['width', 'height'])
data = spark.createDataFrame(pd_data)
data.show()
+-----+------+
|width|height|
+-----+------+
|    1|     2|
|    3|     4|
|    5|     6|
+-----+------+

Spark ↔ Python

Or a Python list (or other iterable) of lists (or tuples or Rows or similar):

data = spark.createDataFrame([[1,2], [3,4], [5,6]],
                             schema=['width', 'height'])
data.show()
+-----+------+
|width|height|
+-----+------+
|    1|     2|
|    3|     4|
|    5|     6|
+-----+------+

Spark ↔ Python

In the other direction, we can turn a Spark DataFrame (partitioned across the executors) into a Pandas DataFrame (in the driver process) with .toPandas():

pd_data = data.toPandas()
print(pd_data)
print(type(pd_data))
   width  height
0      1       2
1      3       4
2      5       6
<class 'pandas.core.frame.DataFrame'>

Spark ↔ Python

Or .collect() to create a Python list of Row objects (also in the driver process):

py_list = data.collect()
print(py_list)
print(type(py_list))
[Row(width=1, height=2), Row(width=3, height=4), Row(width=5, height=6)]
<class 'list'>

Spark ↔ Python

Same rule as decreasing paritions but stronger: if you bring a DataFrame into the driver process, you must include a comment justifying why it is a safe thing to do. You should say something about an upper-bound for the possible inputs.

Remember: from driver to Spark, the data must be distributed/​partitioned to the executors. From Spark, it must fit in memory in the driver process.

More Spark I/O

We have worked with a few input/​output methods in Spark:

  • CSV and JSON (one object per line) with spark.read and df.write.
  • Lines from a file with spark.read.text to a DataFrame or spark.sparkContext.textFile to an RDD; from an DataFrame with df.write.text.
  • To/from the driver process as lists/​Pandas DataFrames.

More Spark I/O

There are other formats that spark.read and df.write can handle:

  • JDBC: any database the Java process can connect to.
  • ORC files: native format of Apache Hive
  • Parquet files: more next…

Or other things handled by additional Spark packages.

More Spark I/O

Parquet is a column-based file format that is designed to store tabular data, just like a Spark DataFrame.

Because it's column-oriented, Spark can read only some columns from the files in a very efficient way.

More Spark I/O

It might be sensible to think of Parquet as a very efficient intermediate format.

Perhaps: as part of an ETL process, take the files you have, load into Spark, do some simple processing, write Parquet files compressed with a fast compression algorithm. That can be read very quickly as you build your next processing pipelines steps.

Pandas can read and write Parquet (as of 0.21).

More Spark I/O

Speaking of compression… We have been using no compression (for small files) or gzip because they work everywhere.

There are several compression algorithms designed to be much faster (but produce slightly larger files): LZ4, Snappy, LZO.

Spark can use these while moving data around, or even when caching.

Big Data is annoying.

Remember the Baker Definition of Big Data™®©?

Big data: when you have so much data, it's annoying.

Let me explain a little more…

Big Data is annoying.

The Reddit Comment Corpus is distributed as about 300 GB of bzip2-compressed JSON. Here are some times (based on rough speed measurements on my Haswell i7 desktop):

  • Read from a spinning disk: ≈130 MB/s → 39 minutes.
  • Read from an (oddly slow?) SSD: ≈280 MB/s → 18 minutes.
  • Uncompress bzip2: ≈7.2 MB/s → 12 hours (single-threaded).

No computation: this is just to move/​read the data.

Big Data is annoying.

In theory, while it's compressed:

  • Transfer over 1G Ethernet: 125 MB/s → 40 minutes.
  • Transfer over 10G Ethernet: 1.25 GB/s → 4.0 minutes.
  • Copy to USB2 external disk: 35 MB/s → 146 minutes.

Roughly: doing anything with that data set takes about an hour (since there will be some overhead, and I don't have 10G to my desktop).

That's annoying, and 300 GB isn't really that big.

Being Less Annoying

Bzip2 wasn't the best way to compress if we're working with the data (as opposed to downloading it).

  • Uncompressed: ≈1800 GB
  • Uncompress 300 GB bzip2: ≈7.2 MB/s → 12 hours
  • Uncompress ≈440 GB gzip: ≈54 MB/s → 140 minutes
  • Uncompress ≈690 GB LZ4: ≈550 MB/s → 21 minutes

(All single-threaded decompressors.)

Being Less Annoying

My best guess on relative speeds, based on this extreeeeemely scientific study:

  1. transfer over 10G Ethernet
  2. uncompress LZ4 (or Snappy or LZO)
  3. read from SSD
  4. read from HDD
  5. transfer over 1G Ethernet
  6. uncompress gzip
  7. uncompress bzip2

Choice of where/how we store the data really matters when it's big.

Being Less Annoying

Some work to prep the data would help. Suppose we already un- or recompressed the JSON data. Then…

  • Uncompressed from SSD: ≈1800 GB → 110 minutes.
  • Gzip from SSD: ≈440 GB → 27 minutes (read), 140 minutes (decompress) → 140 minutes.
  • LZ4 from SSD: ≈690 GB → 42 minutes (read), 21 minutes (decompress) → 42 minutes.

… but partitioning and using multiple cores to decompress would help the gzip case.

Being Less Annoying

A compute cluster also helps.

The same data on our cluster: 440 GB gzip partitioned nicely, and an array of SSDs over 10G Ethernet.

Doing anything with it takes at least 8 minutes (in the best case: full cluster, easy calculations). It seems to be processor-bound on the decompression + JSON decoding + minimal Spark work.

How To Big Data?

If you really need big data tools in the future and don't have a cluster sitting around, what do you do?

How To Big Data?

Step 1: get a cluster.

You could buy hardware, but that's almost certainly inefficient: you don't need it 24/365, so it will often be sitting idle. Also, you'd have to configure/​maintain it.

Much better solution: rent.

How To Big Data?

Step 2: set it up.

Most of the cloud solutions are pre-configured. If you're using EC2 or similar, there are nice Hadoop distributions.

When to Big Data?

If I can buy a phone with 8 cores, 8 GB of RAM and 512 GB of storage, anything less than that must not be big.

One computer can have 100s of GB of memory and several TB of storage. Is anything less than that small?

Maybe, but does that mean you can't use the big data tools?

When to Big Data?

Perhaps your opinion matches mine: Spark is (usually) only slightly harder to work with than Pandas, and (usually) easier than manually manipulating data.

Using Spark isn't that bad.

When to Big Data?

And Spark works in parallel for free: if most of the work can be done in parallel (i.e. is pipeline operations), it will complete almost \(n\) times faster on an \(n\) thread CPU.

If that isn't fast enough, I have some confidence that I can speed it up more if I move to a cluster.

When to Big Data?

So, if there's a possibility of big data in the future, I might choose to start with Spark.

Maybe big data doesn't actually have to be that big.

… but we're always going to wait ≈10 s for a Spark job to start up. That might keep me with Pandas for truly-small data. Spark is good at medium data?

When to Big Data?

It might also be useful to think of Spark as a really flexible ETL tool.

Even when you have big data, often the first thing you want to do is aggregate/​filter/​summarize to get what you actually need to analyse further, which is a much smaller data set.

When to Big Data?

Spark can do this nicely:

data = spark.read.…
etl_data = data.filter(…).select(…).groupBy(…).agg(…)
etl_data = etl_data.coalesce(1) # at most ??? records remain
etl_data.write.json('smaller_data', compression='gzip')

Then if it's on HDFS:

hdfs dfs -copyToLocal smaller_data .

When to Big Data?

Then move to Pandas, and do something similar to what we have seen before:

import pandas as pd
import gzip, glob
etl_filename = glob.glob('smaller_data/part-*.json.gz')[0]
etl_data = pd.read_json(etl_filename, lines=True)
⋮

More Big Data?

Yeah sure, if you want.

Or, more data science.