CMPT 732, Fall 2021
Another tool that deserves mention: Dask.
The idea: recreate the API from Pandas (and NumPy and Scikit-Learn) as much as possible, but do lazy evaluation, and allow distributed computation (like Spark).
As a demo, let's recreate the Most-Viewed Wikipedia Pages solution in Dask…
Basic setup: import the Dask DataFrame functionality, and create a
client to work on the local machine.
import sys import dask.dataframe as dd from dask.distributed import Client def main(input_dir, output_dir): client = Client()
We can construct a DataFrame from a collection of files (like Spark), using almost the Pandas function call:
colnames = ['timestamp', 'lang', 'title', 'views', 'bytes'] pageviews = dd.read_csv(input_dir + '/*.gz', sep=' ', blocksize=None, compression='gzip', header=None, names=colnames)
The file glob (
*.gz) wasn't necessary in Spark and isn't possible in Pandas. The
blocksize argument doesn't exist in Pandas, but can be used to give information about how to partition unpartitioned input.
After that, the work is done with code that would be exactly the same in Pandas:
en_only = pageviews[pageviews['lang'] == 'en'] pages = en_only[(en_only['title'] != 'Main_Page') & (~en_only['title'].str.startswith('Special:'))] max_views = pages[['timestamp', 'views']] \ .groupby('timestamp').max().reset_index() only_max = pages.merge(max_views, on=['timestamp', 'views'])
Saving the results is also exactly like Pandas.
only_max.to_json(output_dir, orient='records', lines=True, compression='gzip')
… except all calculations happen now, and it produces a directory of files, like Spark.
Run it like normal Python code, but with the right stuff installed first:
pip3 install dask[complete] python3 dask_wikipedia.py pagecounts-with-time-1 output
Roughly the same results as Spark: output split between files, and calculated using all processor cores/threads.
While it's running, you can access
http://localhost:8787/ to see the job progress.
DataFrame.visualize() method will produce a graph of the execution plan. [Note: analysis is more global and
.cache() isn't needed.]
client is implicitly in charge of making sure the calculations happen… some time, somewhere.
client = Client()
This default runs tasks on the local computer, as parallel as possible. Without this line, things run on the local computer, single-threaded like Pandas.
But there are also other ways to run Dask code…
You can manually start a Dask
cluster with one scheduler (with IP address 192.168.1.2):
And several workers on other computers:
Then, use that cluster in your code:
client = Client('192.168.1.2:8786')
Or you can deploy Dask jobs on on Kubernetes, YARN/Hadoop, supercomputer infrastructure (MPI, Slurm). Overall, they seem to assume shared storage (either a shared filesystem, or HDFS, or S3, or similar).
[I successfully ran a Dask job with YARN on our cluster, but not with HDFS inputs/outputs.]
Dask also gives you:
Dask looks like an extremely promising data science tool.
It's not clear (to me) that Dask has the same tools as Spark to tweak job performance: the optimizer, caching, repartitioning, etc.
The Dask documentation feels incomplete. Or maybe there's an implicit qualifier
everything else works like Pandas for every function. Except things that don't, e.g. no
After a very superficial look at Dask, my gut tells me that:
With Ray, you compose logic with functions that can run remotely and asynchronously (tasks), and classes that hold some state (actors).
The Ray tasks and actors can be scheduled by Ray and run remotely.
Modin provides the functionality of Pandas, on Ray (or Dask). The goal is to be a drop-in replacement:
import modin.pandas as pd
Not all Pandas operations are implemented in Modin, but they will call Pandas implementations otherwise.
Dask and Ray/Modin are two of many tools in the broad and growing category
like Pandas, but faster. Others I know about:
Also maybe Spark DataFrames.
It's hard to predict which of these will be important in a few years.