Other DataFrame Tools

CMPT 353

Other DataFrame Tools

DataFrames are great: both Pandas and Spark give us DataFrames with many tools to work with the data in useful ways.

Let's look at a few more DataFrame tools that can be used in Python. They could variously be considered big data tools: some work on partitioned data, most don't distribute computation.

Other DataFrame Tools

For each one, I'll reproduce the weather ETL problem from Exercise 9.

Plus I'll convert the dates (i.e. "20131231" to a real date type) and aggregate by date to find the maximum Canadian temperature on each day.

Pandas

Let's review what we know first…

Pandas is the original DataFrame library for Python (as far as I know). It's the one where you'll find the most docs/​tutorials/​Stackoverflow answers.

Just about every other Python data tool is interoperable with Pandas: if you can imagine it, there's a way to convert to/from a Pandas DataFrame.

Pandas

But it's the opposite of what I'd expect in a big data tool:

  • Everything stored in memory (Pandas Series ≈ in‑memory array).
  • Strictly (not lazily) evaluated: each intermediate result calculated and stored.
  • Mostly single-threaded.

Pandas

Pandas can't read the directory-of-partitions style input: I had to manually concatenate them before reading:

colnames = ['station', 'date', 'observation', 'value',
    'mflag', 'qflag', 'sflag', 'obstime']
weather = pd.read_csv(input, header=None,
    names=colnames, dtype={'date': str})

Without the dtype argument, the dates (like "20131231") got detected as integers.

Pandas

The filtering spelled with Pandas:

good_rows = weather[
    weather['qflag'].isnull()
    & weather['station'].str.startswith('CA')
    & (weather['observation'] == 'TMAX')
]

Pandas

The selecting/​calculation, doing my best to calculate in-place to not waste memory:

filtered = good_rows[['station', 'date', 'value']].copy()
filtered['date'] = pd.to_datetime(filtered['date'],
    format='%Y%m%d')
filtered['value'] = filtered['value'] / 10
filtered.rename(columns={'value': 'tmax'}, inplace=True)

Pandas

And the addition: max temp by day. Then output newline-delimited JSON.

maxtemp = filtered.groupby('date').agg({'tmax': 'max'})
maxtemp = maxtemp.reset_index().sort_values('date')
maxtemp = maxtemp.rename(columns={'tmax': 'max_temp'})
maxtemp['date'] = maxtemp['date'].dt.strftime('%Y-%m-%d')
maxtemp.to_json(output, orient='records', lines=True)

Pandas

The result: it takes about 15 seconds to complete this on our weather-3 data set, and it uses almost 9 GB of memory to do it (measured by /usr/bin/time -v).

Spark

We have seen Spark as a big data mostly-DataFrame tool. It's lazily evaluated and better on larger data sets, but…

  • Spark is slow to start up and can be cumbersome on non-big data sets.
  • The JVM (where the Scala implementation runs) is designed to be reasonably fast, but not usually to the limit of your processor's performance.
  • The JVM tends to be memory-hungry.
  • Most data sets can be stored/​processed just fine on one computer.

Spark

For comparison: my Spark DataFrame implementation takes 3.6 s and needs just over 1 GB of memory.

Spark's lazy evaluation means we don't calculate/​store intermediate results, and the whole computation can be optimized. Also it uses all available CPU cores to do the work.

Polars

Another newer DataFrame tool: Polars is a lightning-fast DataFrame library for Rust and Python.

It's implemented in Rust and designed to be fast (efficient code + multithreading). There's a Rust API, but I'll demo in Python.

Polars

This gets us a Polars DataFrame:

colnames = ['station', 'date', 'observation', 'value',
    'mflag', 'qflag', 'sflag', 'obstime']
weather = pl.read_csv(input, has_header=False,
    new_columns=colnames, schema_overrides={'date': pl.Utf8})

Polars

The operations are spelled differently, but conceptually like what we did with Spark or Pandas. The filtering:

good_rows = weather.filter(
    (pl.col('qflag').is_null())
    & pl.col('station').str.starts_with('CA')
    & (pl.col('observation') == 'TMAX')
)

Polars

The selecting/​computation:

filtered_weather = good_rows.select(
    pl.col('station'),
    date=pl.col('date').str.to_date("%Y%m%d"),
    tmax=pl.col('value')/10
)

Polars

The aggregation:

maxtemp = filtered_weather.group_by('date') \
    .agg(max_temp=pl.max('tmax')) \
    .sort(pl.col('date'))

And write newline-delimited JSON (on E9, we required multiple files and GZIP compression: Polars won't do either):

maxtemp.write_ndjson(output)

Polars

As used in that code, Polars is strictly evaluated like Pandas, not lazy like Spark. Sometimes that's good, but often not. We have seen that optimizing a whole chain of calculations at once can be very effective.

Polars can be lazily evaluated: we just need to create a LazyFrame instead of a DataFrame.

Polars

In our code, this line created a Polars DataFrame, and all operations on it were strict:

weather = pl.read_csv(input, has_header=False,
    new_columns=colnames, schema_overrides={'date': pl.Utf8})

But this would create a LazyFrame out of multiple files, and the same code on it would be lazily evaluated:

weather = pl.scan_csv(inputs + '/*', glob=True,
    has_header=False, new_columns=colnames,
    schema_overrides={'date': pl.Utf8})

Polars

Then we end differently to trigger the evaulation. Before:

maxtemp.write_ndjson(output)

Becomes:

maxtemp.collect().write_ndjson(output)

Otherwise, the code is exactly the same, and the result is the same. LazyFrames have almost exactly the same methods as Polars DataFrames.

Polars

Running the Polars DataFrame version on weather-3: maximum memory usage is 5.5 GB and takes 0.6 s.

The LazyFrame version: 700 MB used in 0.2 s.

Once again, lazy evaluation is usually a good idea on data analysis tasks. (Compare Pandas: 15 s and 9 GB.)

Polars

My overview:

  • The Polars API is beautiful.
  • It's implemented carefully in Rust. With that comes maximum speed and minimum memory.
  • Multi-threaded + SIMD implementation: use all of your local cores.
  • Lazy or not: you choose. Line-by-line if you want.
  • Sadly, no partitioning or clustering (yet).

Polars

Polars (when working on LazyFrames) can generally work on data sets larger than memory. It will work through the data row-by-row as much as possible and produce results without storing the whole data set in memory.

Its (still unstable) streaming mode is intended to help with this.

Polars

We can ask Polars to use streaming mode to write without collecting the whole data set in memory:

maxtemp.collect().write_ndjson(output)

Becomes:

maxtemp.sink_ndjson(output)

This takes longer: 0.9 s (vs 0.2 s), but uses less memory: 450 MB (vs 700 MB). I predict future improvements.

Polars

The project founder has announced that they're starting a company and in the announcement:

We believe that the Polars API can be used for both local and cloud/distributed environments. Our API is designed to work well on multiple cores, this design also makes it well poised for a distributed environment.

Keep an eye on Polars: it might not exactly be a big data tool today, but maybe in a few years.

DuckDB

DuckDB isn't exactly designed as a DataFrame tool, but as an in-process SQL database (like SQLite) that is optimized for analytics (i.e. OLAP not OLTP).

The idea is to get a local relational database you can do analytics with (and persist as a file), without having to install an entire database server.

But it's also a fast way to manipulate tabular data, so if we want to treat it like a DataFrame tool…

DuckDB

DuckDB operations tend to be SQL-like, so you give a lot of long strings describing SQL queries. It can read multiple files into one table with a /* like Polars' scan_csv:

con = duckdb.connect()
weather = con.sql("""SELECT *
    FROM read_csv('""" + input + """/*', header=false,
        columns={'station': 'VARCHAR', 'date': 'VARCHAR', 'observation': 'VARCHAR', 'value': 'INTEGER', 'mflag': 'VARCHAR', 'qflag': 'VARCHAR', 'sflag': 'VARCHAR', 'obstime': 'VARCHAR'}
    )""")

DuckDB

Operations on the data also tend to be SQL-based:

filtered_weather = con.sql('''
    SELECT
        station,
        strptime(date, '%Y%m%d') AS date,
        value/10 AS tmax
    FROM weather
    WHERE observation='TMAX'
        AND station LIKE 'CA%'
        AND qflag IS NULL
    ''')

DuckDB

Python variable names for tables (weather and filtered_weather here) are automatically understood as table names in queries.

maxtemp = con.sql('''
    SELECT
        strftime(date, '%Y-%m-%d') AS date,
        MAX(tmax) AS max_temp
    FROM filtered_weather
    GROUP BY date ORDER BY date
''')

DuckDB

And then save the table to a file: [I feel like there must be a better way to do this, but I couldn't find it.]

con.sql('''COPY maxtemp TO ''' + repr(output))

Or a table can be converted to a Pandas, Polars, or Arrow DataFrame.

DuckDB

Doing this on weather-3 took 1.0 s with peak memory usage of 160 MB.

Like Polars, DuckDB is very interested in maximum performance. They publish many benchmarks against competing tools.

DuckDB

Likely conclusion from what I have seen from DuckDB and Polars: if you don't absolutely need a compute cluster, both are much faster than Spark. If you don't need the compatibilty of Pandas, they have much better performance and memory usage.

If you pass what's possible on one computer, Spark can deal and Polars/​DuckDB can't.

Dask

Another Python data tool. Its goal is to recreate the API from Pandas (and NumPy and Scikit-Learn) as much as possible, but do lazy evaluation, and allow distributed computation (like Spark).

Dask

We have to create a Dask Client object to put it in local cluster mode, but then creating a DataFrame is almost like Pandas with /* to read a whole directory:

import dask.dataframe as dd
from dask.distributed import Client
client = Client()
colnames = ['station', 'date', 'observation', 'value', 'mflag', 'qflag', 'sflag', 'obstime']
dtypes = {'date': str, 'qflag': str, 'mflag': str}
weather = dd.read_csv(inputs + '/*', header=None,
    names=colnames, dtype=dtypes)

Dask

The rest of the logic is exactly and same as Pandas: Dask deliberately recreates its API as much as possible.

good_rows = weather[
    weather['qflag'].isnull()
    & weather['station'].str.startswith('CA')
    & (weather['observation'] == 'TMAX')
]

Dask

Small change: couldn't do inplace=True since Dask doesn't support that option. Otherwise, we finish exactly like Pandas:

filtered = good_rows[['station', 'date', 'value']].copy()
filtered['date'] = dd.to_datetime(filtered['date'],
    format='%Y%m%d')
filtered['value'] = filtered['value'] / 10
filtered = filtered.rename(columns={'value': 'tmax'})
maxtemp = filtered.groupby('date').agg({'tmax': 'max'})
maxtemp = maxtemp.reset_index().sort_values('date')
maxtemp = maxtemp.rename(columns={'tmax': 'max_temp'})
maxtemp['date'] = maxtemp['date'].dt.strftime('%Y-%m-%d')
maxtemp.compute().to_json(output, orient='records', lines=True)

Dask

The Dask implementation runs in about 9.5 s and takes about 1 GB memory: similar to Spark.

Dask doesn't just have almost-Pandas DataFrames. It also implements arrays that are almost-NumPy, bags that are similar to Spark RDDs, and delayed tasks and futures for more general concurrent calculations.

Dask

Dask can also be deployed on a cluster in many ways:

  • local cluster: use all local cores;
  • across many machines: run dask scheduler on one machine and dask worker on many;
  • on supercomputer infrastructure (e.g. slurm scheduler);
  • on Kubernetes;
  • on YARN.

Dask

Dask is aiming to scale as well as Spark, but be mostly-compatible with traditional Python data tools (Pandas, NumPy).

If you come across existing Pandas code that's struggling with speed/​memory, Dask might be exactly the tool you want.

It's not clear to me that the Dask API can deal with the weird stuff that Spark can (e.g. repartitioning, caching, broadcast hints).

Rapids' cuDF

Bonus incomplete summary: from the RAPIDS ecosystem that runs on NVIDIA GPUs, the cuDF library. The Pandas code runs as-is, simply search-and-replace pd with cudf.

I'm not timing it: this task is too compute-light to make sense on a GPU.

Summary

Identical task and results on our weather-3 data set.

ToolTimeMemory
Pandas15.0 s8700 MB
Spark DataFrame3.6 s1000 MB
Polars DataFrame0.6 s5500 MB
Polars LazyFrame0.2 s700 MB
DuckDB1.0 s160 MB
Dask DataFrame9.5 s1000 MB

Summary

Proposed explanations: Dask and Spark are more complex tools ready to cluster, so some of the time and memory usage is more about starting the complex system, and would pay off at some scale.

They seem to not be as well optimized for single-machine use: Polars and DuckDB care about every byte and CPU cycle. Spark and Dask care about working on arbitrarily large data sets.

Summary

Compare some much better benchmarking, Spark, Dask, DuckDB, Polars: TPC-H Benchmarks at Scale, a DataFrame comparison from a Dask perspective.

The lesson: there are many data tools out there. Like Pandas but faster is a huge and competitive category. Some others: