Why avoid shuffling? Network is slower than memory.
If we want to sort a 1 TB DataFrame, we will have to move most of the data around the cluster to get it in order. So, a rough time estimate:
\[ \frac{1000\ \mathrm{GB} \times 8\ \mathrm{b}/\mathrm{B}}{1\ \mathrm{Gb}/\mathrm{s}} = 8000\ \mathrm{s} \approx 130\ \mathrm{minutes}\,, \]just to move the data. Assuming 1G Ethernet, blocking; data either in memory or the disks can keep up.
≈13 minutes if you upgrade to 10G Ethernet.
Joining two 1 TB DataFrames will about double that, to get equal keys together.
At some scale, shuffling the data is something you either can't do, or can only do very carefully.
When doing calculations with Spark DataFrames, we have been writing expressions that operate on columns:
df['col1'] df['col1'] + 1 df['col1'] * df['col2'] (df['col1'] == 1) & (df['col2'].isNull()) functions.length(df['col1'])
Each of these is a Spark column expression, represented as a Column
object. Like with DataFrames, these are lazily evaluated.
Just about everywhere we have done some kind of calculation, it was a column expression.
df.select(colexpr, colexpr, colexpr) df.filter(colexpr) df.groupBy(colexpr).agg(colexpr) df1.join(df2, on=colexpr)
There are some useful methods on Column
objects, and Python operators are overloaded to work on them:
df['col1'] ** 2 df['col1'].isNotNull() (df['col1'] / 10).alias('onetenth') df['str'].astype(types.IntegerType())
Column expressions can be produced by extracting a column from a DataFrame, doing some calculation on a column, or by calling a column function.
df['col1'] df['col1'] - df['col2'] functions.sin(df['col1'])
Many column functions are provided in the pyspark.sql.functions
module we have been importing.
from pyspark.sql import SparkSession, functions, types
There is a huge variety of functions in that module.
functions.abs(df['number']) functions.datediff(df['date1'], df['date2']) functions.format_string('%d-%s', df['number'], df['label']) functions.length(df['str']) functions.concat(df['str1'], df['str2'])
Some of the functions are aggregation functions that are likely to be used near a .groupBy()
.
groups = df.groupBy(df['col1']) groups.agg(functions.approx_count_distinct(df['col2'])) groups.agg(functions.countDistinct(df['col2'])) groups.agg(functions.avg(df['col2'])) groups.agg(functions.collect_list(df['col2']))
There seems to be no real consistency in what is a Column
method and what is a function, or the naming convention.
df['col1'].between(10, 100) (df['col1'] >= 10) & (df['col1'] <= 100) df['col1'].startswith('x') functions.substring(df['col1'], 0, 1) == 'x' groups.agg(functions.approxCountDistinct(df['col2'])) groups.agg(functions.approx_count_distinct(df['col2'])) groups.agg(functions.countDistinct(df['col2'])) groups.agg(functions.count_distinct(df['col2']))
When you create a column expression/DataFrame, and eventually the actual calculation happens, how does it get done?
We have been writing Python code to describe the logic as a DataFrame operation, but that isn't the whole story…
For Spark DataFrames, the actual implementation is in Scala, which compiles to the Java Virtual Machine (JVM).
All of our DataFrame data and work has been sent out to some JVM process: our Python code has been doing nothing but building execution plans.
There are Spark implementations for other languages: Scala, Java, R.
As long as we stay with DataFrames, there is almost no difference in what happens: same plan, optimization, speed, etc.
There are a few more API options in Scala/Java because of the type system and ties to the JVM (e.g. the Dataset type).
The good: we get our calculations done at the speed of the underlying JVM implementation (fast). We can switch programming languages more-or-less freely.
The bad: when something goes wrong, the chances of getting a coherent stack trace between two languages, across a cluster, and with lazy evaluation are poor.
If we want to do some calculation that the Spark API can't express (easily), then it would be nice to just write a Python function to do the work.
The data has to be sent into a Python process and back again. That can be done, but isn't cheap.
The function functions.udf
will turn a Python function into a user-defined function that can work on Column objects (similar to np.vectorize
).
You must specify the returnType
, so the execution plan can be built sensibly.
def complicated_function(a, b): return a + 2*b # pretend this is Python-specific logic. complicated_udf = functions.udf(complicated_function, returnType=types.IntegerType())
Then use it like any other column function:
ints = spark.range(10000) result = ints.select( ints['id'], complicated_udf(ints['id'], ints['id']+1).alias('res') ) result.show(5)
+---+---+ | id|res| +---+---+ | 0| 2| | 1| 5| | 2| 8| | 3| 11| | 4| 14| +---+---+ only showing top 5 rows
Your UDF logic is sent out to the executors. Data is converted from JVM representation to Python, the function is called in a Python process, and result sent back into the JVM.
It's going to be much slower that is than doing arithmetic directly in the JVM. * * *
A UDF can be useful to bridge some other Python module into Spark.
e.g. need a DataFrame of RGB values converted to LAB? Option 1: implement the RGB to LAB conversion with DataFrame methods. Option 2: Create a two-line UDF that uses a function from scikit-image or python-colormath.
In Spark (≥2.3 and expanded in 3.0), you can use Vectorized UDFs where you get a Pandas DataFrame of a partition at a time, which can be created efficiently because of Apache Arrow. You do Python work and return the new partition.
Much faster than Python UDFs. Probably still slower than Spark DataFrame logic.
How will they compare? Let's try a simple example.
Remember the first option: do it in Spark DataFrame calculations and never run any Python logic:
res = df.select( df['a'] + 2*df['b']*functions.log2(df['a']) )
But if the computation was much easier to implement with NumPy or Pandas DataFrame operations, we could:
@functions.pandas_udf(returnType=types.DoubleType()) def pandas_logic(a: pd.Series, b: pd.Series) -> pd.Series: return a + 2*b*np.log2(a) res = df.select(pandas_logic(df['a'], df['b']))
Or with pure Python operations if we must:
@functions.udf(returnType=types.DoubleType()) def python_logic(a: float, b: float) -> float: return a + 2*b*math.log2(a) res = df.select(python_logic(df['a'], df['b']))
How long?
Implementation | Time |
---|---|
Spark DataFrame operations | 20 s |
Pandas UDF | 32 s |
Python UDF | 71 s |
Others' examples suggest that the differences can be much larger than this. It depends on the calculation.
Or Option 4: find a Java/Scala library to do work you need, or write the UDF there.
There have been some hints that some concepts from SQL are in here somewhere.
from pyspark.sql import …
.select()
, .filter()
(== .where()
), .groupBy()
, .sort()
(== .orderBy()
), .join()
.There is no relational database in Spark anywhere, but the ideas are there. Why?
In some sense, SQL is the ultimate data science language.
As various NoSQL databases matured, a curious thing happened to their APIs: they started looking more like SQL. This is because SQL is a pretty direct implementation of relational set theory, and math is hard to fool. Carlos Bueno, Cache is the new RAM
It's actually possible to use SQL syntax (with their SQL dialect) for Spark DataFrame operations.
ints = spark.range(10000) ints.createOrReplaceTempView('int_table') result = spark.sql( "SELECT id, id+1 AS id1 FROM int_table WHERE id%2 = 0") result.show(5)
+---+---+ | id|id1| +---+---+ | 0| 1| | 2| 3| | 4| 5| | 6| 7| | 8| 9| +---+---+ only showing top 5 rows
The SQL syntax produces the same execution plan as the Python method calls.
result.explain() result2 = ints.filter(ints['id']%2 == 0) \ .select(ints['id'], (ints['id']+1).alias('id1')) result2.explain()
== Physical Plan == *(1) Project [id#14L, (id#14L + 1) AS id1#16L] +- *(1) Filter ((id#14L % 2) = 0) +- *(1) Range (0, 10000, step=1, splits=12) == Physical Plan == *(1) Project [id#14L, (id#14L + 1) AS id1#27L] +- *(1) Filter ((id#14L % 2) = 0) +- *(1) Range (0, 10000, step=1, splits=12)
A SQL WHERE
expression can be used in .filter()
/.where()
calls:
result3 = ints.filter('id%2 = 0') \ .select(ints['id'], (ints['id']+1).alias('id1')) result3.explain()
== Physical Plan == *(1) Project [id#14L, (id#14L + 1) AS id1#31L] +- *(1) Filter ((id#14L % 2) = 0) +- *(1) Range (0, 10000, step=1, splits=12)
Personally, I find the SQL syntax easy for simple cases, but much harder for any complicated multi-step computation.
Your call.
The high-level API for DataFrames in Spark is great: we can express calculations we want easily; Spark can optimize and plan well; we get the results we needed.
But, sometimes we would like a little more flexibility.
The other fundamental data structure Spark has is the RDD or Resilient Distributed Dataset.
An RDD is fundamentally one-dimensional: it holds a collection of whatever values you put into it. Something like a list/array, but distributed.
You could think of Spark DataFrames as RDDs of Row
objects. *
We have never seen the Row
s explicitly in Python: they are managed by (faster) Scala code but we can get at the Row
objects if we want…
You can ask for df.rdd
to get an equivalent RDD:
cities.show() from pprint import pprint pprint(cities.rdd.take(5))
+---------+----------+-------+ | city|population| area| +---------+----------+-------+ |Vancouver| 2463431|2878.52| | Calgary| 1392609|5110.21| | Toronto| 5928040|5905.71| | Montreal| 4098927|4604.26| | Halifax| 403390|5496.31| +---------+----------+-------+ [Row(city='Vancouver', population=2463431, area=2878.52), Row(city='Calgary', population=1392609, area=5110.21), Row(city='Toronto', population=5928040, area=5905.71), Row(city='Montreal', population=4098927, area=4604.26), Row(city='Halifax', population=403390, area=5496.31)]
That RDD contained five values: each a Row
object.
When we're working in Python, the RDD is storing Python objects (serialized with pickle). The Scala code doing the work just treats them as an opaque collection of bytes.
Therefore, if we want to work on an RDD, we're going to do the work in Python. Things become much more manual.
sc = spark.sparkContext city_rdd = sc.textFile('cities.csv') # an RDD pprint(city_rdd.take(6))
['city,population,area', 'Vancouver,2463431,2878.52', 'Calgary,1392609,5110.21', 'Toronto,5928040,5905.71', 'Montreal,4098927,4604.26', 'Halifax,403390,5496.31']
Note: data is just six Python strings.
We can now do whatever Python logic we want.
def first_field_upper(line): return line.split(',')[0].upper() def is_not_header(city): return city != 'CITY' city_names = city_rdd.map(first_field_upper) \ .filter(is_not_header) pprint(city_names.take(6))
['VANCOUVER', 'CALGARY', 'TORONTO', 'MONTREAL', 'HALIFAX']
RDD methods we saw there:
rdd.take(n)
: retrieve the first n
elements from the RDD as a Python list.rdd.map(f)
: apply function f
to each element, creating a new RDD from the returned values.rdd.filter(f)
: apply function f
to each element, keep rows where it returned True
.Not so different than df.show(n)
, df.select(…)
, and df.filter(…)
, but we had to write more code.
Things we can infer:
df.rdd
isn't free: JVM objects must be converted to Python Row
objects.The default choice for most Spark work will probably still be DataFrames.
The flexibility of RDD can be easier for extracting data in a non-DataFrame-friendly format. e.g.
Canada:5, 6.324 USA:9, 12.310 France:10, 8.9003 Thailand:2, 14.291
We can create an RDD of Row
s and then convert that to a DataFrame.
Can do arbitrary Python logic with minimal slowdown.
from pyspark.sql import Row def extract_from_lines(line): city, tail = line.split(':') i, n = tail.split(', ') return Row(city=city, count=int(i), measure=float(n)) lines = spark.sparkContext.textFile('odd_data.txt') rows = lines.map(extract_from_lines) data = spark.createDataFrame(rows, schema='city:string, count:int, measure:double') data.show()
+--------+-----+-------+ | city|count|measure| +--------+-----+-------+ | Canada| 5| 6.324| | USA| 9| 12.31| | France| 10| 8.9003| |Thailand| 2| 14.291| +--------+-----+-------+
It was possible to do that with DataFrames functions, but it would have been a pain (but possibly faster).
Or with a DataFrame UDF, but a little harder and probably about the same performance.
We previously saw that Pandas was column-oriented: the thing stored together in memory was a column (series
in Pandas' terminology) from a DataFrame.
From the way Spark deals with RDDs, we can guess that they are row-oriented.
Except row-oriented storage is worse: memory layout isn't right for using a processor's SIMD (vector/SSE) instructions. Memory locality is worse for individual-column operations.
Those need values where you want to do the same operations (usually a whole column or partition of a column) adjacent in memory.
Spark DataFrames are column-oriented in memory: it's just better. You could think of one partition of a Spark DataFrame as a Pandas DataFrame. *
That's why Vectorized UDFs are fast: the Pandas-like data is sitting there in memory.
Until now, the data we have been manipulating in Spark has been handled only by Spark, never as a Pandas DataFrame, or NumPy array, or Python list, or anything else.
That's usually good: if we actually have big data, then we should let Spark do what it's good at. But sometimes the big/small data boundary gets crossed.
Example where small data becomes big: maybe we have a large data set and want to (broadcast) join a small table we just generated in Python. We would need to turn it into a DataFrame.
Example of big → small: we just did a .groupBy()
and only have a few thousand rows left. The next analysis we're doing would be easier in Pandas than Spark.
The spark.createDataFrame
function can take many inputs. We previously saw an RDD of rows, but it can also take a Pandas DataFrame:
import pandas as pd pd_data = pd.DataFrame([[1,2], [3,4], [5,6]], columns=['width', 'height']) data = spark.createDataFrame(pd_data) data.show()
+-----+------+ |width|height| +-----+------+ | 1| 2| | 3| 4| | 5| 6| +-----+------+
Or a Python list (or other iterable) of lists (or tuples or Row
s or similar):
data = spark.createDataFrame([[1,2], [3,4], [5,6]], schema=['width', 'height']) data.show()
+-----+------+ |width|height| +-----+------+ | 1| 2| | 3| 4| | 5| 6| +-----+------+
In the other direction, we can turn a Spark DataFrame (partitioned across the executors) into a Pandas DataFrame (in the driver process) with .toPandas()
:
pd_data = data.toPandas() print(pd_data) print(type(pd_data))
width height 0 1 2 1 3 4 2 5 6 <class 'pandas.core.frame.DataFrame'>
Or .collect()
to create a Python list of Row
objects (also in the driver process):
py_list = data.collect() print(py_list) print(type(py_list))
[Row(width=1, height=2), Row(width=3, height=4), Row(width=5, height=6)] <class 'list'>
Same rule as decreasing paritions but stronger: if you bring a DataFrame into the driver process, you must include a comment justifying why it is a safe thing to do. You should say something about an upper-bound for the possible inputs.
Remember: from driver to Spark, the data must be distributed/partitioned to the executors. From Spark, it must fit in memory in the driver process.
We have worked with a few input/output methods in Spark:
spark.read
and df.write
.spark.read.text
to a DataFrame or spark.sparkContext.textFile
to an RDD; from an DataFrame with df.write.text
.There are other formats that spark.read
and df.write
can handle:
Or other things handled by additional Spark packages.
Parquet is a column-based file format that is designed to store tabular data, just like a Spark DataFrame.
Because it's column-oriented, Spark can read only some columns from the files in a very efficient way.
It might be sensible to think of Parquet as a very efficient intermediate format.
Perhaps: as part of an ETL process, take the files you have, load into Spark, do some simple processing, write Parquet files compressed with a fast compression algorithm. That can be read very quickly as you build your next processing pipelines steps.
Speaking of compression… We have been using no compression (for small files) or gzip because they work everywhere.
There are several compression algorithms designed to be much faster (but produce slightly larger files): LZ4, Snappy, LZO.
Spark can use these while moving data around, or even when caching.
Remember the Baker Definition of Big Data™®©?
Big data: when you have so much data, it's annoying.
Let me explain a little more…
The Reddit Comment Corpus iswas distributed as about 1400 GB of bzip2-compressed JSON. Here are some times (based on rough speed measurements on an older desktop):
No computation: this is just to move/read the data.
In theory, while it's compressed:
Roughly: doing anything with that data set takes about a couple of hours.
That's annoying, and 1400 GB isn't really that big.
Bzip2 wasn't the best way to compress if we're working with the data (as opposed to downloading it).
(All single-threaded decompressors.)
My best guess on relative speeds, based on this very thin evidence:
Choice of where/how we store the data really matters when it's big.
Some work to prep the data would help. Suppose we already un- or recompressed the JSON data. Then…
… but partitioning and using multiple cores to decompress would help.
A compute cluster also helps.
The same data on our cluster: 2 TB gzip partitioned nicely, and an array of SSDs over 10G Ethernet.
Doing anything with it takes around an hour. It seems to be processor-bound on the decompression + JSON decoding + minimal Spark work.
If you really need big data tools in the future and don't have a cluster sitting around, what do you do?
Step 1: get a cluster.
You could buy hardware, but that's almost certainly inefficient: you don't need it 24/365, so it will often be sitting idle. Also, you'd have to configure/maintain it.
Much better solution: rent.
Step 2: set it up.
Most of the cloud solutions are pre-configured. If you're using EC2 or similar, there are nice Hadoop distributions.
If I can buy a phone with 6 cores, 6 GB of RAM and 1 TB of storage, anything less than that must not be big
.
One computer can have 100s of GB of memory and several TB of storage. Is anything less than that small
?
Maybe, but does that mean you can't use the big data tools?
Perhaps your opinion matches mine: Spark is (usually) only slightly harder to work with than Pandas, and (usually) easier than manually manipulating data.
Using Spark isn't that bad.
And Spark works in parallel for free: if most of the work can be done in parallel (i.e. is pipeline operations), it will complete almost \(n\) times faster on an \(n\) thread CPU.
If that isn't fast enough, I have some confidence that I can speed it up more if I move to a cluster.
It's also possible to use the Pandas API on Spark, if you have existing Pandas code and want to migrate to Spark.
But caution: most of the interesting things we have seen Spark do are obscured by the Pandas API.
So, if there's a possibility of big data in the future, I might choose to start with Spark.
Maybe big data
doesn't actually have to be that big.
… but we're always going to wait ≈5 s for a Spark job to start up. That might keep me with Pandas for truly-small data. Spark is good at medium data
?
It might also be useful to think of Spark as a really flexible ETL tool.
Even when you have big data, often the first thing you want to do is aggregate/filter/summarize to get what you actually need to analyse further, which is a much smaller data set.
Spark can do this nicely:
data = spark.read.… etl_data = data.filter(…).select(…).groupBy(…).agg(…) etl_data = etl_data.coalesce(1) # at most ??? records remain etl_data.write.json('smaller_data', compression='gzip')
Then if it's on HDFS:
hdfs dfs -copyToLocal smaller_data .
Then move to Pandas, and do something similar to what we have seen before:
import pandas as pd import gzip, glob etl_filename = glob.glob('smaller_data/part-*.json.gz')[0] etl_data = pd.read_json(etl_filename, lines=True) ⋮
Yeah sure, if you want.
Or, more data science.