CMPT 732, Fall 2024
We have looked at basically two ways to store data: files (CSV, JSON, Parquet on HDFS, S3, etc) and databases (Cassandra and friends).
Those are good at what they do, but strengths/weaknesses depends what we're actually doing with the data.
All of the Spark (and other DataFrame tools) are designed to do some kind of read all of the data and calculate on it
tasks.
Most of the data we worked with in DataFrames was from files, but there are other options.
Reading files from disk is fine, but if you have 1000 calculations/summaries/reports to do on a data set, the work of reading+uncompressing+parsing the files adds up.
Another category of data tools: databases. On the non-big-data side, the most common choices are relational dtabases like PostgreSQL, MySQL, Microsoft SQL Server. For larger data sets, the NoSQL databases offer scaling-out: Cassandra, HBase, CouchDB.
Databases tend to be designed more for not-whole-table operations. e.g. select the sales for one customer, insert a row when an item is added to a shopping cart, etc.
Or maybe it's the usual database schemas we design (i.e. normalized, expecting joins) that are designed for these uses.
In other words, databases as we usually think of them are tuned for OLTP (OnLine Transaction Processing) use cases.
This is what you'd expect/need from a database behind a web app or similar.
For data science and big data work, we're usually imagining OLAP (OnLine Analytical Processing) workloads.
e.g. consider all sales since 2020 and find the product purchased the most commonly by users who … .
OLAP-ish queries could be done for many reasons: generating reports, machine learning (e.g. collecting training data), data mining, business intelligence, data science, … .
I'll just say analytics
to mean some combination of those things
.
OLTP and OLAP aren't mutually exclusive: a database or data layout could do both if queried appropriately.
Maybe we could query the production OLTP database analytics things?
Problem #1: the database is probably already busy.
The web site (or whatever) probably does a lot of queries and expects them to complete quickly. Everybody's going to be mad if the data science team throws a bunch of huge queries in, slowing down the web site.
Problem #2: it's not really what most relational databases are designed for.
The common relational databases (and the data schemas people usually design for them) are optimized for OLTP.
Problem #3: the data you need might not all be in the same place. It's common to have multiple system running within an organization, so it's not going to be possible to query across them.
e.g. at SFU, goSFU contains student/course information, HAP has HR and payroll stuff, KUALI keeps track of research grants, … .
But maybe having data in its original format and in many places is okay.
We could just read the data from wherever it already lives, do whatever we need to do with it, and get on with our lives.
This approach is a data lake: data stays in its natural
format and location. That could include files, databases, structured/unstructured data, etc. Then just load it as you need to and work with it.
We could imagine using Spark as the core of a data lake: read the data as it is and do the analysis in Spark.
customers = spark.read.jdbc('jdbc:postgresql://dbsvr/sales', dbtable='customers') sales = spark.read.jdbc('jdbc:postgresql://dbsvr/sales', query='SELECT * FROM sales JOIN products ON …') employees = spark.read.csv('//hr_share/employees.csv', schema=employee_schema) logs = spark.read.text('s3://our_webserver_logs/current/') ⋮
Let the Spark optimizer do its best to read efficiently and get the queries done. [There's subtlety to .jdbc
reads and its options.]
This is also what we'll see Amazon Spectrum doing in assignment 9: read data from S3 (where we imagine it is naturally
stored by our organization) and do SQL stuff on it.
Or we could imagine just about any other data analysis tool doing data lake things if it can read the data from wherever it is, including Hive, Apache Impala, Apache Drill, Snowflake.
The data lake approach can make sense if (some combination of) …
goodformat naturally;
Aside #1… if the data you have isn't in the format you want, then what?
e.g. schema for OLTP might not be what you need for OLAP: maybe it makes sense to denormalize or aggregate before doing any OLAP work.
e.g. data from multiple sources need to be combined before real
analysis can begin.
e.g. data arrived in a slow-to-process format (like monolithic .json.bz2
) but you want something faster (like partitioned .parquet.lz4
). This is easy in Spark:
spark.read.json('horrible-download.json.bz2', schema=schema) \ .repartition(120) \ .write.parquet('usable-data', compression='lz4')
e.g. that, but split at the command line:
bunzip2 -d horrible.json.bz2 | split -a 5 -d -C 1024M - part-
e.g. Or load into Cassandra before further processing.
Generally, the solution is to take the format you have, transform to the format you need, and save.
Or Extract-Transform-Load, ETL.
Some things that might be relevant as part of ETL:
ETL can be done with tools you know: Spark, Spark Streaming, just programming.
There are also dedicated tools to define data processing steps: Apache Airflow, Apache Flume, Apache NiFi, AWS Glue, … .
Basically: you probably don't want to do a lot of work on data in a bad format. You can fix it.
If calling it ETL
makes you feel better, then great.
Aside #2: not all data has a beautiful schema that you think of a database table or DataFrame having.
The term unstructured data is generally used for natural language text, or images. Before doing analysis, we probably need to apply other tools to them: natural language processing, image recognition, etc.
There's a lot of data out there that has some regular features, but not as much as a traditional database table: semi-structured data.
This could be something like XML or JSON files where there's variety in the fields present, but not always in the same ways.
For example: OpenStreetMaps data, which is the collection of all facts in OSM.
It's XML containing several elements, including <node>
s. Some nodes represent amenities
. All amenities have a latitude and longitude. Some have a street address. A few are charging stations
that have an amperage
, but most aren't.
Do all <node>
s have similar internal structures? Probably but I'm not sure exactly what can be guaranteed.
If I wanted to analyze that data, I'd probably do an ETL-like pass over it to filter only the data I want and enforce a schema I understand (and convert to a format that's more efficient to read).
Another possibilty where data isn't as perfectly tabular
as we usually imagine: schemaless or flexible/irregular schemas.
This could be as simple as a field that's often missing (null). It could mean fields that are found only in a small subset of rows.
Imagine you have been collecting web logs: requesting IP address, path, response code, etc. Then one day you realize you'd also like to capture the server that handled the request.
In a traditional database that would involve adding a column (default null) and using it. In a tool designed for flexible schema it might just mean starting to insert records with a server
field and letting it handle it.
Or maybe you only can only collect some fields outside the EU, or some aren't relevant unless there's an error, or … .
Some tools are designed with flexible schemas in mind, like Redis, Mongo.
Some can if needed, like PostgreSQL's JSONB
type and JSON_TABLE
.
Maybe the OSM data could be thought of more like this: we could think of the data having an amperage
column that's null almost everywhere, except in rows that are car charger amenities.
For example, using PostgreSQL's JSONB
type, the OSM amenity data might be stored like this:
CREATE TABLE amenities ( latitude FLOAT, longitude FLOAT, data JSONB );
Data can be inserted like this:
INSERT INTO amenities VALUES (49.2, -122.9, '{"name": "thing", "foo": 1, "bar": "x"}'), (49.3, -123.1, '{"name": "charger", "amperage": 4}');
Then we can query against the JSON data:
SELECT data -> 'name' AS name FROM amenities; SELECT * FROM amenities WHERE JSON_EXISTS(data, '$.amperage'); SELECT * FROM amenities WHERE (data -> 'amperage') :: float > 2;
The JSON_TABLE
feature (new in Postgres 17) can be used to temporarily apply a schema to JSON data so you can work with it as a structured view.
Other flexible/unstructured tools like Mongo can do similar tricks to let you do structured-like things with less structured data.
Also note Spark functions like from_json
that will parse JSON strings into data structures that could then be turned into structured columns.
If you have lots of analysis to do, it probably makes sense to have the data in a tool designed for that. Thus we invent a data warehouse.
The idea: take all of the data you need to query for analytics, and get it all into one place (the data warehouse) so you can query efficiently.
A data warehouse could be a regular relational database that mirrors the original data (e.g. a read-only replica or nightly data dump) and is used for analytic queries.
It could be a dump of relevant data into Parquet files (possibly Hive paritioned in a relevant way) or similar reasonably-efficient format.
Or it could be a distinct data warehouse system that aggregates data from many sources and lets them be queried efficiently. In Assignment 9, we'll see Amazon Redshift doing this. Others: Google BigQuery, Teradata, … .
Many (most? all?) of the data warehouse tools use SQL as their primary interaction with the data. Many people know it, so it seems like an obvious choice.
Getting data from its home
to a data warehouse could be a simple copy, or could be an ETL task that reshapes the data into something that's better for your queries.
Or you could load the raw data into the warehouse, then query to reshape, and save those results as a new table, an ELT (extract-load-transform).
The decision to have a data warehouse probably depends on how much data, how many data sources, how many analytic queries are being done, etc.
As an example, I'm going to do the Weather ETL task from assignment 4 with a few extras: I'll convert the dates (i.e. "20131231"
) to an actual date type and aggregate by date to find the maximum Canadian temperature on each day.
That seems like a not-totally-unreasonable data analysis task.
Doing this in Spark on my computer for the weather-3
data (CSV files) took ≈4 s (just the calculation: ignoring Spark startup time). The same operation in Pandas took ≈15 s.
Let's look at ClickHouse, which is described as a fast open-source OLAP
database.
It seems like it would be a good basis for a data warehouse. I'm going to do the weather analysis with a data warehouse workflow. (Technically an ELT, I guess, since I'm putting the results into a materialized table.)
First, we need a Clickhouse server.
docker run -p 8123:8123 clickhouse/clickhouse-server
Then get the data from the CSV files into it. I started with an appropriate
.CREATE TABLE weather…
I loaded the data into a Polars DataFrame, converted that to Arrow, and that can be loaded into a ClickHouse table.
weather_df = pl.read_csv(input, has_header=False, new_columns=colnames, schema_overrides={'date': pl.Utf8}) client.insert_arrow('weather', weather_df.to_arrow())
That worked (in ≈18 s) and got the data into my data warehouse
.
We imagine that I now have many queries to do on that data, but I only have one. The whole task can be done as one big query, materializing it into a table…
client.command('''CREATE TABLE maxdate Engine=Log AS SELECT date, MAX(tmax) AS max_temp FROM (SELECT station, parseDateTime(date, '%Y%m%d') AS date, value/10 AS tmax FROM weather WHERE observation='TMAX' AND station LIKE 'CA%' AND qflag='') GROUP BY date ORDER BY date''')
Then I can query into a Pandas DataFrame and have a look, or query in various other ways and do whatever I want with the data.
maxtemp = client.query_df('SELECT * FROM maxdate')
Time taken (excluding the data import): 0.1 s.
I could have also just done .query_df
with the original query, thus getting the results directly into a Pandas DataFrame without storing it.
Or I could have materalized the sub-SELECT
in the original query into a table (
or something) if I wanted to use it again for something else.cleaned_weather
There's a web frontend where you can run queries directly (much less full-featured than Redshift/Spectrum, but probably still useful):
Individual tables can be stored with one of several table engines designed for different workloads.
They have put a lot of work into a JSON data type for efficient flexible-schema data.
Clickhouse can be deployed as a cluster and can then shard & replicate tables across multiple servers. [I haven't tried it, but it puts Clickhouse into the big data tools
category in my mind.]
Data lakehouse is another category people refer to. As best as I can tell through marketing drivel, it means:
The term is from Databricks and is often paired with the delta lake format (also from Databricks). Delta lake is Parquet files with an extra layer to provide ACID guarantees.
Delta lake plays well with Spark and might be worth remembering, even if you don't know what a data lakehouse is.
The costs of a data warehouse: another system to run, another copy of the data to store, data must be injested regularly to keep it fresh, etc.
But the benefit: it's now in a system designed for fast OLAP work.
Don't fall into the trap of thinking analytics tool == data warehouse
. There's a tradeoff to be made when introducing any new piece of technology.
You know many ways to query data. Choose the right one for the situation you're in.
Consider transforming the data before analysis: it might be worth it.