## Imports and Setup

In [None]:
import sys, os
import glob
import timeit
import numpy as np
import pandas as pd
import numexpr
#mport modin.pandas as mpd
import dask.dataframe as dd

In [None]:
from dask.distributed import Client
client = Client()

In [None]:
path_base = os.path.join(os.environ['HOME'], 'osm-data')
monolith_file = glob.glob(path_base + '-monolith/part*')[0]
partitioned_directory = path_base + '-partitioned'

## Haversine Distance Calculations
These implement the "path distance" calculation, returning a number in metres.

In [None]:
def distance(points):
    """
    Return sum of distances between points, in metres.
    """
    # adapted from http://stackoverflow.com/a/21623206/1236542
    p_shift = points.shift(-1)
    lat1 = points['lat']
    lon1 = points['lon']
    lat2 = p_shift['lat']
    lon2 = p_shift['lon']
    
    p = 0.017453292519943295
    a = 0.5 - np.cos((lat2 - lat1) * p)/2 + np.cos(lat1 * p) * np.cos(lat2 * p) * (1 - np.cos((lon2 - lon1) * p)) / 2
    dist = 1000 * 12742 * np.arcsin(np.sqrt(a))
    
    return dist.sum()

In [None]:
def distance_numexpr(points):
    """
    Return sum of distances between points, in metres, using numexpr to do the work
    """
    # adapted from http://stackoverflow.com/a/21623206/1236542
    p_shift = points.shift(-1)
    lat1 = points['lat']
    lon1 = points['lon']
    lat2 = p_shift['lat']
    lon2 = p_shift['lon']
    last_index = lat1.tail(1).index # numexpr doesn't like the nan at the end after the shift: drop that row
    lat1.drop(last_index, inplace=True)
    lon1.drop(last_index, inplace=True)
    lat2.drop(last_index, inplace=True)
    lon2.drop(last_index, inplace=True)

    p = 0.017453292519943295
    dist = '''sum(
        1000 * 12742 * arcsin(sqrt(
            0.5 - cos((lat2 - lat1) * p)/2 + cos(lat1 * p) * cos(lat2 * p) * (1 - cos((lon2 - lon1) * p)) / 2
        ))
    )'''
    
    return numexpr.evaluate(dist).item()

## Implementations WIth Different Libraries
Various implementations of (1) read data from Parquet, and (2) calculate and return the distance between points.

In [None]:
def go_pandas(inputs):
    points = pd.read_parquet(inputs, engine='pyarrow')
    d = distance(points)
    return d

In [None]:
def go_dask(inputs):
    # Notes:
    # - uses Dask's read_parquet to create a Dask DataFrame
    # - call to distance() identical to Pandas: that code is completely unchanged
    # - calls .compute() to fire off the lazy calculation
    points = dd.read_parquet(inputs, engine='pyarrow')
    d = distance(points)
    return d.compute()

In [None]:
def go_modin(inputs):
    # Modin seems like it should work too... but it doesn't.
    points = mpd.read_parquet(inputs)
    d = distance(points)
    return d

In [None]:
def go_numexpr(inputs):
    # Notes:
    # - uses a Pandas DataFrame
    # - had to reimplement distance calculation to use numexpr.
    points = pd.read_parquet(inputs, engine='pyarrow')
    d = distance_numexpr(points)
    return d

## Check the Implementations
For each implementation, call it to make sure we get the right result, and then time it.

### Pandas reading a single input file

In [None]:
go_pandas(monolith_file)

In [None]:
%timeit go_pandas(monolith_file)

### Pandas + numexpr reading a single input file

In [None]:
go_numexpr(monolith_file)

In [None]:
%timeit go_numexpr(monolith_file)

### Dask reading a single input file

In [None]:
go_dask(monolith_file)

In [None]:
%timeit go_dask(monolith_file)

### Dask reading data partitioned into several files

In [None]:
go_dask(partitioned_directory)

In [None]:
%timeit go_dask(partitioned_directory)

## Dask Task Visualization

In [None]:
points = dd.read_parquet(partitioned_directory, engine='pyarrow')
d = distance(points)

In [None]:
d.visualize()