Aside: Dask

CMPT 353

Python Data Tools

We have spent most of our time with two tools for storing/​manipulating data in Python: Pandas and Spark.

In some ways they do the same job…

Python Data Tools

They had a lot in common:

  • Gave us DataFrames.
  • Working with data was generally a whole DataFrame operation.
  • Operations were specified at a high level and implementation details weren't (usually) our problem.

Python Data Tools

But are also different in some important ways:

  • Pandas is eagerly evaluated (operations done when the line of code runs); Spark is lazily evaluated (operations not done until later).
  • Pandas keeps everything in memory on one computer; Spark can distribute across a cluster.
  • Pandas computes single-threaded; Spark uses all cores or a compute cluster.
  • Different API: the details of functions/​methods are different. Pandas is generally richer.

Dask

It might be nice to have a mix of the things we like about Pandas (the API) and about Spark (multi-threaded, distributed, lazy).

This thinking leads to Dask.

Dask

The idea of Dask: recreate the API from Pandas (and NumPy and Scikit-Learn), but do lazy evaluation, and allow distributed computation.

The way we work with Dask is going to diverge a little from Pandas, but the design goal was obviously to change as little as possible.

Working With Dask

Basic setup: import the Dask DataFrame functionality, and create a client to work on the local machine.

import sys
import dask.dataframe as dd
from dask.distributed import Client

def main(input_dir, output_dir):
    client = Client()

Dask demands the Python main function trick:

if __name__ == '__main__':
    main(sys.argv[1], sys.argv[2])

Working With Dask

I'm going to recreate the weather ETL task from exercise 9…

Working With Dask

We can construct a DataFrame from a collection of CSV files (like Spark), using almost the Pandas function call:

colnames = ['station', 'date', 'observation', 'value', 'mflag', 'qflag', 'sflag', 'obstime']
weather = dd.read_csv(input_dir + '/*.csv.gz', blocksize=None,
    compression='gzip', header=None, names=colnames,
    dtype={'qflag': 'str'})

The file glob (*.csv.gz) wasn't necessary in Spark. The blocksize argument doesn't exist in Pandas, but can be used to give information about how to partition unpartitioned input. Dask is pickier about types.

Working With Dask

After that, the work is done with code that would be exactly the same with Pandas:

good_q = weather[ weather['qflag'].isnull() ]
only_canada = good_q[ good_q['station'].str.startswith('CA') ]
tmax = only_canada[ only_canada['observation'] == 'TMAX' ]

cleaned = tmax[['station', 'date']]
cleaned['tmax'] = tmax['value'] / 10

Working With Dask

Saving the results is also exactly like Pandas.

cleaned.to_json(output_dir, orient='records', lines=True,
    compression='gzip')

… except all calculations happen now, and it produces a directory of files, like Spark.

Working With Dask

Run it like Pandas code, but with the right stuff installed first:

pip3 install dask[complete]
python3 dask_weather.py weather-1 output

Result: same output as Pandas, split between files, and calculated using all processor cores/​threads.

While it's running, you can access http://localhost:8787/ to see the job progress.

Scheduling Dask

The Dask client is implicitly in charge of making sure the calculations happen… some time, somewhere.

client = Client()

This default runs tasks on the local computer, as parallel as possible. Without this line, things run on the local computer, single-threaded like Pandas.

But there are also other ways to run Dask code

Scheduling Dask

You can manually start a Dask cluster with one scheduler (with IP address 192.168.1.2):

dask-scheduler

And several workers on other computers:

dask-worker tcp://192.168.1.2:8786

Then, use that cluster in your code:

client = Client('192.168.1.2:8786')

Scheduling Dask

Or you can deploy Dask jobs on on Kubernetes, YARN/​Hadoop, supercomputer infrastructure (MPI, Slurm). Overall, they seem to assume shared storage (either a shared filesystem, or HDFS, or S3, or similar).

[I successfully ran a Dask job with YARN on our cluster, but not with HDFS inputs/​outputs.]

More Dask Features

Dask also gives you:

  • Arrays: like NumPy, but distributed.
  • Bags: like Spark RDDs, where you have a collection of Python objects to work with.
  • ML: like Scikit-Learn, but on Dask data structures.
  • Delayed & Futures: more advanced scheduling of your computation.

Dask Summary

Dask looks like an extremely promising data science tool: probably worth more attention than it gets.

Dask Summary

It's not clear (to me) that Dask has the same tools as Spark to tweak job performance: the optimizer, caching, repartitioning, etc.

The Dask documentation feels incomplete. Or maybe there's an implicit qualifier everything else works like Pandas for every function. Except things that don't, e.g. there was no .sort_values() method, but now there is, but the docs are unchanged.

Dask Summary

After a very superficial look at Dask, my gut tells me that:

  1. Dask is a great way to adapt NumPy/​Pandas code to a data set that became too big to work with using NumPy/​Pandas.
  2. Spark is probably better designed to work at the truly-big scale.

Dask Summary

My best guess at default choices:

  • Megabytes: Pandas.
  • Gigabytes: Pandas or Dask.
  • Many gigabytes: Dask or Spark.
  • Terabytes: Spark.

… depending dozens of other factors, of course.

Dask Summary

Dask is one of many tools in the broad and growing category like Pandas, but faster. Others I know about:

It's hard to predict which of these (if any) will be important in a few years.

Dask Summary

No, there won't be anything about Dask on the exam.