CMPT 732, Fall 2024
Observations from the semester:
Let's look at a few more more DataFrame tools that can be used in Python. They could variously be considered big data
tools: some work on partitioned data, most don't distribute computation.
For each one, I'll reproduce the weather ETL problem from Assignment 4.
Plus I'll convert the dates (i.e. "20131231"
to a real date type) and aggregate by date to find the maximum Canadian temperature on each day.
Pandas is the original DataFrame library for Python (as far as I know). It's the one where you'll find the most docs/tutorials/Stackoverflow answers.
Just about every other Python data tool is interoperable with Pandas: if you can imagine it, there's a way to convert to/from a Pandas DataFrame.
But it's the opposite of what I'd expect in a big data
tool:
Pandas can't read the directory-of-partitions style input: I had to manually concatenate them before reading:
colnames = ['station', 'date', 'observation', 'value', 'mflag', 'qflag', 'sflag', 'obstime'] weather = pd.read_csv(input, header=None, names=colnames, dtype={'date': str})
Without the dtype
argument, the dates (like "20131231"
) got detected as integers.
The filtering spelled with Pandas:
good_rows = weather[ weather['qflag'].isnull() & weather['station'].str.startswith('CA') & (weather['observation'] == 'TMAX') ]
The selecting/calculation, doing my best to calculate in-place to not waste memory:
filtered = good_rows[['station', 'date', 'value']].copy() filtered['date'] = pd.to_datetime(filtered['date'], format='%Y%m%d') filtered['value'] = filtered['value'] / 10 filtered.rename(columns={'value': 'tmax'}, inplace=True)
And the addition: max temp by day. Then output newline-delimited JSON.
maxtemp = filtered.groupby('date').agg({'tmax': 'max'}) maxtemp = maxtemp.reset_index().sort_values('date') maxtemp = maxtemp.rename(columns={'tmax': 'max_temp'}) maxtemp['date'] = maxtemp['date'].dt.strftime('%Y-%m-%d') maxtemp.to_json(output, orient='records', lines=True)
The result: it takes about 15 seconds to complete this on our weather-3
data set, and it uses almost 9 GB of memory to do it (measured by
)./usr/bin/time -v
For comparison: my Spark DataFrame implementation takes 3.6 s and needs just over 1 GB of memory.
Spark's lazy evaluation means we don't calculate/store intermediate results, and the whole computation can be optimized. Also it uses all available CPU cores to do the work.
Another newer DataFrame tool: Polars is a lightning-fast DataFrame library for Rust and Python
.
It's implemented in Rust and designed to be fast (efficient code + multithreading). There's a Rust API, but I'll demo in Python.
Polars can read multiple files into a DataFrame, but we have to explicitly give a
to refer to all files in a directory./*
This gets us a Polars DataFrame:
colnames = ['station', 'date', 'observation', 'value', 'mflag', 'qflag', 'sflag', 'obstime'] weather = pl.read_csv(input, has_header=False, new_columns=colnames, schema_overrides={'date': pl.Utf8})
The operations are spelled differently, but conceptually like what we did with Spark or Pandas. The filtering:
good_rows = weather.filter( (pl.col('qflag').is_null()) & pl.col('station').str.starts_with('CA') & (pl.col('observation') == 'TMAX') )
The selecting/computation:
filtered_weather = good_rows.select( pl.col('station'), date=pl.col('date').str.to_date("%Y%m%d"), tmax=pl.col('value')/10 )
The aggregation:
maxtemp = filtered_weather.group_by('date') \ .agg(max_temp=pl.max('tmax')) \ .sort(pl.col('date'))
And write newline-delimited JSON (on A4, we required multiple files and GZIP compression: Polars won't do either):
maxtemp.write_ndjson(output)
As used in that code, Polars is strictly evaluated like Pandas, not lazy like Spark. Sometimes that's good, but often not. We have seen that optimizing a whole chain of calculations at once can be very effective.
Polars can be lazily evaluated: we just need to create a LazyFrame
instead of a DataFrame.
In our code, this line created a Polars DataFrame, and all operations on it were strict:
weather = pl.read_csv(input, has_header=False, new_columns=colnames, schema_overrides={'date': pl.Utf8})
But this would create a LazyFrame out of multiple files, and the same code on it would be lazily evaluated:
weather = pl.scan_csv(inputs + '/*', glob=True, has_header=False, new_columns=colnames, schema_overrides={'date': pl.Utf8})
Then we end differently to trigger the evaulation. Before:
maxtemp.write_ndjson(output)
Becomes:
maxtemp.collect().write_ndjson(output)
Otherwise, the code is exactly the same, and the result is the same. LazyFrames have almost exactly the same methods as Polars DataFrames.
Running the Polars DataFrame version on weather-3
: maximum memory usage is 5.5 GB and takes 0.6 s.
The LazyFrame version: 700 MB used in 0.2 s.
Once again, lazy evaluation is usually a good idea on data analysis tasks. (Compare Pandas: 15 s and 9 GB.)
My overview:
Polars (when working on LazyFrames) can generally work on data sets larger than memory. It will work through the data row-by-row as much as possible and produce results without storing the whole data set in memory.
Its (still unstable) streaming mode is intended to help with this.
We can ask Polars to use streaming mode to write without collecting the whole data set in memory:
maxtemp.collect().write_ndjson(output)
Becomes:
maxtemp.sink_ndjson(output)
This takes longer: 0.9 s (vs 0.2 s), but uses less memory: 450 MB (vs 700 MB). I predict future improvements.
The project founder has announced that they're starting a company and in the announcement:
We believe that the Polars API can be used for both local and cloud/distributed environments. Our API is designed to work well on multiple cores, this design also makes it well poised for a distributed environment.
Keep an eye on Polars: it might not exactly be a big data
tool today, but maybe in a few years.
DuckDB isn't exactly designed as a DataFrame tool, but as an in-process SQL database (like SQLite) that is optimized for analytics
(i.e. OLAP not OLTP).
The idea is to get a local relational database you can do analytics with (and persist as a file), without having to install an entire database server.
But it's also a fast way to manipulate tabular data, so if we want to treat it like a DataFrame tool…
DuckDB operations tend to be SQL-like, so you give a lot of long strings describing SQL queries. It can read multiple files into one table with a /*
scan_csv
:
con = duckdb.connect() weather = con.sql("""SELECT * FROM read_csv('""" + input + """/*', header=false, columns={'station': 'VARCHAR', 'date': 'VARCHAR', 'observation': 'VARCHAR', 'value': 'INTEGER', 'mflag': 'VARCHAR', 'qflag': 'VARCHAR', 'sflag': 'VARCHAR', 'obstime': 'VARCHAR'} )""")
Operations on the data also tend to be SQL-based:
filtered_weather = con.sql(''' SELECT station, strptime(date, '%Y%m%d') AS date, value/10 AS tmax FROM weather WHERE observation='TMAX' AND station LIKE 'CA%' AND qflag IS NULL ''')
Python variable names for tables (
and weather
here) are automatically understood as table names in queries.filtered_weather
maxtemp = con.sql(''' SELECT strftime(date, '%Y-%m-%d') AS date, MAX(tmax) AS max_temp FROM filtered_weather GROUP BY date ORDER BY date ''')
And then save the table to a file: [I feel like there must be a better way to do this, but I couldn't find it.]
con.sql('''COPY maxtemp TO ''' + repr(output))
Or a table can be converted to a Pandas, Polars, or Arrow DataFrame.
Doing this on weather-3
took 1.0 s with peak memory usage of 160 MB.
Like Polars, DuckDB is very interested in maximum performance. They publish many benchmarks against competing tools.
Likely conclusion from what I have seen from DuckDB and Polars: if you don't absolutely need a compute cluster, both are much faster than Spark. If you don't need the compatibilty of Pandas, they have much better performance and memory usage.
If you pass what's possible on one computer, Spark can deal and Polars/DuckDB can't.
Another Python data tool. Its goal is to recreate the API from Pandas (and NumPy and Scikit-Learn) as much as possible, but do lazy evaluation, and allow distributed computation (like Spark).
We have to create a Dask Client
object to put it in local cluster
mode, but then creating a DataFrame is almost like Pandas with
to read a whole directory:/*
import dask.dataframe as dd from dask.distributed import Client client = Client() colnames = ['station', 'date', 'observation', 'value', 'mflag', 'qflag', 'sflag', 'obstime'] dtypes = {'date': str, 'qflag': str, 'mflag': str} weather = dd.read_csv(inputs + '/*', header=None, names=colnames, dtype=dtypes)
The rest of the logic is exactly and same as Pandas: Dask deliberately recreates its API as much as possible.
good_rows = weather[ weather['qflag'].isnull() & weather['station'].str.startswith('CA') & (weather['observation'] == 'TMAX') ]
Small change: couldn't do inplace=True
since Dask doesn't support that option. Otherwise, we finish exactly like Pandas:
filtered = good_rows[['station', 'date', 'value']].copy() filtered['date'] = dd.to_datetime(filtered['date'], format='%Y%m%d') filtered['value'] = filtered['value'] / 10 filtered = filtered.rename(columns={'value': 'tmax'}) maxtemp = filtered.groupby('date').agg({'tmax': 'max'}) maxtemp = maxtemp.reset_index().sort_values('date') maxtemp = maxtemp.rename(columns={'tmax': 'max_temp'}) maxtemp['date'] = maxtemp['date'].dt.strftime('%Y-%m-%d') maxtemp.to_json(output, orient='records', lines=True)
The Dask implementation runs in about 9.5 s and takes about 1 GB memory: similar to Spark.
Dask doesn't just have almost-Pandas DataFrames. It also implements arrays that are almost-NumPy, bags that are similar to Spark RDDs, and delayed tasks and futures for more general concurrent calculations.
Dask can also be deployed on a cluster in many ways:
dask scheduler
on one machine and dask worker
on many;Dask is aiming to scale as well as Spark, but be mostly-compatible with traditional Python data tools (Pandas, NumPy).
If you come across existing Pandas code that's struggling with speed/memory, Dask might be exactly the tool you want.
It's not clear to me that the Dask API can deal with the weird stuff that Spark can (e.g. repartitioning, caching, broadcast hints).
Last minute entry, literally 30 minutes before the lecture: FireDucks.
In the Pandas code, I simply changed:
import pandas as pd
to:
import fireducks.pandas as pd
And it ran in 0.4 s and 4.6 GB
Bonus incomplete summary: from the RAPIDS ecosystem that runs on NVIDIA GPUs, the cuDF library. The Pandas code runs as-is, simply search-and-replace
with pd
.cudf
I'm not timing it: this task is too compute-light to make sense on a GPU.
Identical task and results on our weather-3
data set.
Tool | Time | Memory |
---|---|---|
Pandas | 15.0 s | 8700 MB |
Spark DataFrame | 3.6 s | 1000 MB |
Polars DataFrame | 0.6 s | 5500 MB |
Polars LazyFrame | 0.2 s | 700 MB |
DuckDB | 1.0 s | 160 MB |
Dask DataFrame | 9.5 s | 1000 MB |
FireDucks | 0.4 s | 4600 MB |
Proposed explanations: Dask and Spark are more complex tools ready to cluster, so some of the time and memory usage is more about starting the complex system, and would pay off at some scale.
They seem to not be as well optimized for single-machine use: Polars and DuckDB care about every byte and CPU cycle. Spark and Dask care about working on arbitrarily large data sets.
Compare some much better benchmarking, Spark, Dask, DuckDB, Polars: TPC-H Benchmarks at Scale
, a DataFrame comparison from a Dask perspective.
The lesson: there are many data tools out there. Like Pandas but faster/better
is a huge and competitive category. Some others: