We have spent most of our time with two tools for storing/manipulating data in Python: Pandas and Spark.
In some ways they do the same job…
They had a lot in common:
But are also different in some important ways:
It might be nice to have a mix of the things we like about Pandas (the API) and about Spark (multi-threaded, distributed, lazy).
This thinking leads to Dask.
The idea of Dask: recreate the API from Pandas (and NumPy and Scikit-Learn), but do lazy evaluation, and allow distributed computation.
The way we work with Dask is going to diverge a little from Pandas, but the goal was obviously to change as little as possible.
Basic setup: import the Dask DataFrame functionality, and create a
client to work on the local machine.
import sys import dask.dataframe as dd from dask.distributed import Client def main(input_dir, output_dir): client = Client()
Dask demands the Python main function trick:
if __name__ == '__main__': main(sys.argv, sys.argv)
I'm going to recreate the weather ETL task from exercise 9…
We can construct a DataFrame from a collection of CSV files (like Spark), using almost the Pandas function call:
colnames = ['station', 'date', 'observation', 'value', 'mflag', 'qflag', 'sflag', 'obstime'] weather = dd.read_csv(input_dir + '/*.csv.gz', blocksize=None, compression='gzip', header=None, names=colnames)
The file glob (
*.csv.gz) wasn't necessary in Spark. The
blocksize argument doesn't exist in Pandas, but can be used to give information about how to partition unpartitioned input.
After that, the work is done with code that would be exactly the same with Pandas:
good_q = weather[ weather['qflag'].isnull() ] only_canada = good_q[ good_q['station'].str.startswith('CA') ] tmax = only_canada[ only_canada['observation'] == 'TMAX' ] cleaned = tmax[['station', 'date']] cleaned['tmax'] = tmax['value'] / 10
Saving the results is also exactly like Pandas.
cleaned.to_json(output_dir, orient='records', lines=True, compression='gzip')
… except all calculations happen now, and it produces a directory of files, like Spark.
Run it like Pandas code, but with the right stuff installed first:
pip3 install dask[complete] python3 dask_weather.py weather-1 output
Result: same output as Pandas, split between files, and calculated using all processor cores/threads.
While it's running, you can access
http://localhost:8787/ to see the job progress.
client is implicitly in charge of making sure the calculations happen… some time, somewhere.
client = Client()
This default runs tasks on the local computer, as parallel as possible. Without this line, things run on the local computer, single-threaded like Pandas.
But there are also other ways to run Dask work…
You can manually start a Dask
cluster with one scheduler (with IP address 192.168.1.2):
And several workers on other computers:
Then, use that cluster in your code:
client = Client('192.168.1.2:8786')
Or you can deploy Dask jobs on on Kubernetes, YARN/Hadoop, supercomputer infrastructure (MPI, Slurm). Overall, they seem to assume shared storage (either a shared filesystem, or HDFS, or S3, or similar).
[I successfully ran a Dask job with YARN on our cluster, but not with HDFS inputs/outputs.]
Dask also gives you:
Dask looks like an extremely promising data science tool: probably worth more attention than it gets.
It's not clear (to me) that Dask has the same tools as Spark to tweak job performance: the optimizer, caching, repartitioning, etc.
The Dask documentation feels incomplete. Or maybe there's an implicit qualifier
everything else works like Pandas for every function. Except things that don't, e.g. no
After a very superficial look at Dask, my gut tells me that:
My best guess at default choices:
… depending dozens of other factors, of course.