Dask & Ray

CMPT 732, Fall 2023

Dask

Another tool that deserves mention: Dask.

The idea: recreate the API from Pandas (and NumPy and Scikit-Learn) as much as possible, but do lazy evaluation, and allow distributed computation (like Spark).

As a demo, let's recreate the Most-Viewed Wikipedia Pages solution in Dask…

[Complete code: dask_wikipedia.py]

Dask

Basic setup: import the Dask DataFrame functionality, and create a client to work on the local machine.

import sys
import dask.dataframe as dd
from dask.distributed import Client


def main(input_dir, output_dir):
    client = Client()

Dask

We can construct a DataFrame from a collection of files (like Spark), using almost the Pandas function call:

colnames = ['timestamp', 'lang', 'title', 'views', 'bytes']
types = {'timestamp': 'str', 'lang': 'str', 'title': 'str'}
pageviews = dd.read_csv(input_dir + '/*.gz', sep=' ',
    blocksize=None, compression='gzip', header=None,
    names=colnames, dtype=types)

The file glob (*.gz) wasn't necessary in Spark and isn't possible in Pandas. The blocksize argument doesn't exist in Pandas, but can be used to give information about how to partition unpartitioned input.

Dask

After that, the work is done with code that would be exactly the same in Pandas:

en_only = pageviews[pageviews['lang'] == 'en']
pages = en_only[(en_only['title'] != 'Main_Page')
    & (~en_only['title'].str.startswith('Special:'))]
max_views = pages[['timestamp', 'views']] \
    .groupby('timestamp').max().reset_index()
only_max = pages.merge(max_views, on=['timestamp', 'views'])

Dask

Saving the results is also exactly like Pandas.

only_max.to_json(output_dir, orient='records', lines=True,
    compression='gzip')

… except all calculations happen now, and it produces a directory of files, like Spark.

Dask

Run it like normal Python code, but with the right stuff installed first:

pip3 install dask[complete]
python3 dask_wikipedia.py pagecounts-with-time-1 output

Roughly the same results as Spark: output split between files, and calculated using all processor cores/​threads.

Dask

While it's running, you can access http://localhost:8787/ to see the job progress.

The DataFrame.visualize() method will produce a graph of the execution plan. [Note: analysis is more global and .cache() isn't needed.]

Scheduling Dask

The Dask client is implicitly in charge of making sure the calculations happen… some time, somewhere.

client = Client()

This default runs tasks on the local computer, as parallel as possible. Without this line, things run on the local computer, single-threaded like Pandas.

But there are also other ways to run Dask code

Scheduling Dask

You can manually start a Dask cluster with one scheduler (with IP address 192.168.1.2):

dask-scheduler

And several workers on other computers:

dask-worker tcp://192.168.1.2:8786

Then, use that cluster in your code:

client = Client('192.168.1.2:8786')

Scheduling Dask

Or you can deploy Dask jobs on on Kubernetes, YARN/​Hadoop, supercomputer infrastructure (MPI, Slurm). Overall, they seem to assume shared storage (either a shared filesystem, or HDFS, or S3, or similar).

[I successfully ran a Dask job with YARN on our cluster, but not with HDFS inputs/​outputs.]

More Dask Features

Dask also gives you:

  • Arrays: like NumPy, but distributed.
  • Bags: like Spark RDDs, where you have a collection of Python objects to work with.
  • ML: like Scikit-Learn, but on Dask data structures.
  • Delayed & Futures: more advanced scheduling of your computation.

Dask Summary

Dask looks like an extremely promising data science tool.

It's not clear (to me) that Dask has the same tools as Spark to tweak job performance: the optimizer, caching, repartitioning, etc.

The Dask documentation feels incomplete. Or maybe there's an implicit qualifier everything else works like Pandas for every function. Except things that don't, e.g. no .sort_values() method.

Dask Summary

After a very superficial look at Dask, my gut tells me that:

  1. Dask is a great way to adapt NumPy/​Pandas code to a data set that became too big to work with using NumPy/​Pandas.
  2. Spark is probably better designed to work at the truly-big scale.

Ray + Modin

Another: Ray and Modin.

With Ray, you compose logic with functions that can run remotely and asynchronously (tasks), and classes that hold some state (actors).

Ray + Modin

The Ray tasks and actors can be scheduled by Ray and run remotely.

For more: Modern Parallel and Distributed Python: A Quick Tutorial on Ray.

Ray + Modin

Modin provides the functionality of Pandas, on Ray (or Dask). The goal is to be a drop-in replacement:

import modin.pandas as pd

Not all Pandas operations are implemented in Modin, but they will call Pandas implementations otherwise.

Summary

Dask and Ray/Modin are two of many tools in the broad and growing category like Pandas, but faster. Others I know about:

Also maybe Spark DataFrames.

It's hard to predict which of these will be important in a few years.