CMPT 732, Fall 2024
So far, we have HDFS to store “files” but no way to read or write records: specifically-selected subsets of our data set that we can get at quickly.
That's usually the job of a database.
For big data, we need a database that will split the storage & queries across many nodes.
But full ACID guarantees are much harder in a distributed system where there is no central controller. Also, some operations are going to be hard in a distributed database…
We (probably) need something simpler than a traditional SQL database.
or NoSQL databases.
Motivated by the need for distributed databases: simplified data model, operations limited to what can be distributed efficiently.
e.g. Cassandra, CouchDB, Mongo, Redis, HBase.
There are many different technologies described as “NoSQL” but we generally have to give up some/most of:
In exchange, we get:
We will be using the Cassandra database. It's not the only non-relational database that makes sense for big data, but it's a good one.
JOIN
, very restricted GROUP BY
.The usual Cassandra setup is to have n nodes each acting as part of the cluster. They coordinate with each other and decide how to partition the data so it's evenly distributed and redundant.
A client can (initially) connect to any of them to make queries.
If we have many independent database nodes, it seems like a good match for YARN's many independent workers.
And it is: it's natural to create a MapReduce/Spark job that reads or writes Cassandra data. Each worker can read/write to a Cassandra node to distribute the load. The worker and database could even be on the same machine.
A Cassandra cluster has many keyspaces. Each keyspace has many tables.
A table is what you expect: columns of data; rows that have a cell for each column; columns have a fixed type; a primary key determinies how the data is organized.
The keyspace is a container for some tables but also…
A keyspace has rules about how it's replicated: can be a simple replication factor, or a description of how many replicas should be in each datacentre around the world.
For example, in the cqlsh
shell:
cqlsh> CREATE KEYSPACE demo WITH REPLICATION =
... { 'class': 'SimpleStrategy', 'replication_factor': 3 };
cqlsh> USE demo;
cqlsh:demo> CREATE TABLE test (
... i1 INT, i2 INT, data TEXT,
... PRIMARY KEY (i1,i2) );
In this keyspace, every row will be replicated on three nodes.
The first component of the primary key is the partition key. It controls which node(s) store the record. Records with the same partition key will all be on the same nodes.
We had PRIMARY KEY (i1,i2)
, so i1
is the partition key. Any records with the same i1
will be stored on the same nodes.
The primary key has to be unique: (i1,i2)
must be unique for each row.
The first part of the primary key controls how data is partitioned: i1
.
The primary key has two roles: unique row identifier, and decider of data partitioning.
Some of the data types you can have in Cassandra columns are what you'd expect: INT
, BIGINT
, BLOB
, DATE
, TEXT
=VARCHAR
.
Some you might not: LIST<t>
, SET<t>
, MAP<t,u>
.
Cassandra's query language is CQL. It is not entirely unlike SQL.
Basically: it's as much like SQL as possible, while expressing Cassandra's semantics. We saw CREATE TABLE
, which looks familiar.
Inserting and selecting seems to work the way you'd expect:
cqlsh:demo> INSERT INTO test (i1,i2,data) VALUES (1,2,'aaa');
cqlsh:demo> INSERT INTO test (i1,i2,data) VALUES (2,3,'bbb');
cqlsh:demo> SELECT * FROM test;
i1 | i2 | data
----+----+------
1 | 2 | aaa
2 | 3 | bbb
… and it works like you'd expect, right up until it doesn't.
cqlsh:demo> SELECT * FROM test WHERE i1=1;
i1 | i2 | data
----+----+------
1 | 2 | aaa
cqlsh:demo> SELECT * FROM test WHERE i2=2;
InvalidRequest: Error from server: code=2200 [Invalid query]
message=Cannot execute this query as it might involve data
filtering and thus may have unpredictable performance. If you
want to execute this query despite the performance
unpredictability, use ALLOW FILTERING
Even though i2
is part of the primary key, we haven't filtered by i1
, so WHERE i2=2
implies a full table scan.
The primary key determines the layout of data on the nodes/disk. Accessing data in any other way is potentially very expensive.
cqlsh:demo> SELECT * FROM test WHERE i2=2 ALLOW FILTERING;
i1 | i2 | data
----+----+------
1 | 2 | aaa
CQL INSERT
isn't really an insert: it's “insert or update by primary key”. Since we had PRIMARY KEY (i1,i2)
:
cqlsh:demo> INSERT INTO test (i1,i2,data) VALUES (1,2,'ccc');
cqlsh:demo> SELECT * FROM test;
i1 | i2 | data
----+----+------
1 | 2 | ccc
2 | 3 | bbb
(2 rows)
That implies that primary keys must be unique. Sometimes the easiest way around that is to just use a UUID.
cqlsh:demo> CREATE TABLE test2 ( id UUID PRIMARY KEY, data TEXT );
cqlsh:demo> INSERT INTO test2 (id,data) VALUES (UUID(), 'ddd');
cqlsh:demo> INSERT INTO test2 (id,data) VALUES (UUID(), 'eee');
cqlsh:demo> SELECT * FROM test2;
id | data
--------------------------------------+------
aa2364f4-a2c6-4cc1-bb8a-0810f614da6c | ddd
c7b473f6-cf4b-47cd-935d-f5249864973c | eee
(2 rows)
You can GROUP BY
in CQL, but must include the partition key. This allows aggregation to happen locally on each node, with no data shuffling.
This was added in Cassandra 3.10. [Our cluster is running 4.1.1.]
The replication factor in the keyspace lets Cassandra handle failures.
… and recover gracefully from failures of nodes or networks.
Copies of the data are made according to replication settings (3 here, with first field as partition key):
If n-1 nodes fail, we can still read and write data.
For writes, live nodes storing that partition key do the write; others catch up when they are back.
If the cluster partitions, we may still be able to read/write.
… datacentre-aware replication will help.
If there are writes, data will be inconsistent during the partition.
Again, writes will catch up when nodes can communicate.
What happens to conflicting writes during a partition?
Every cell in Cassandra has a timestamp: when it was inserted/updated.
cqlsh:demo> SELECT data, WRITETIME(data) FROM test;
data | writetime(data)
------+------------------
ccc | 1729620581060409
bbb | 1729620551253732
(2 rows)
Most-recent timestamp wins if there's a consistency question.
This means that Cassandra has eventual consistency: the data will be consistent, but with some delay.
Usually the delay will be short (network latency + milliseconds). If there's a network partition, it will be longer (after communication is restored).
When the cluster had failed nodes or a partition, we may still be able to read/write
. May
?
Cassandra gives us a choice of how consistent we demand to be with our reads and writes. We can have different requirements for consistency for each session/query.
The levels of consistency let you be very expressive about your requirements.
With the cluster partitioned, we would expect:
cqlsh:demo> CONSISTENCY ONE;
cqlsh:demo> SELECT * FROM test WHERE id=3; -- succeeds
cqlsh:demo> CONSISTENCY ALL;
cqlsh:demo> SELECT * FROM test WHERE id=3; -- fails
There are many options. For SELECT
/read operations:
ONE
: any one node (with each record) can tell us.TWO
: any two nodes (with each record) can tell us.LOCAL_QUORUM
: >half of the nodes in this data centre with that data must respond.QUORUM
: >half of nodes with that data must respond.ALL
: every node with that data must respond.Which you choose will depend how much consistency you want to wait for.
For INSERT
/UPDATE
/write operations to succeed/return:
ONE
: any one node must write.TWO
: any two nodes must write.LOCAL_QUORUM
: >half in this data centre that will store the data must write.QUORUM
: >half of nodes that will store must writeEACH_QUORUM
: >half in every data centre.ALL
: every node that will store this record must write.Which consistency level you choose will likely depend on the application.
Cassandra gives the choice of “real” consistency if you need it (and are willing to trade working while partitioned), or you can trade speed for a low probability of data loss.
Since Cassandra has no JOIN
operation, working with relational data is going to be tricky. We have lots of experience building tables that are related by foreign keys and joining them.
That was a good way to model lots of kinds of data, and a convenient way to work with it. Sadly, we have to give it up to easily do distributed computation.
If we have relational data, there are now several options to deal with it.
.join()
).joinand
SELECT
them).One common trick: abandon data normalization, and have more than one copy of a particular fact.
The goal is to have the data you need right there, with the other data you're about to query.
Example: the usual thing in a relational database would be to join these tables to produce a class list.
Course | Grade | Student # |
---|---|---|
CMPT 123 | B- | 400000123 |
CMPT 123 | A | 400000124 |
CMPT 189 | C- | 400000124 |
Student # | Name |
---|---|
400000123 | Aaron Anderson |
400000124 | Barb Brooks |
If joins are impossible/expensive and we know we need to produce class lists often, we could store the table as:
Course | Grade | Student # | Name |
---|---|---|---|
CMPT 123 | B- | 400000123 | Aaron Anderson |
CMPT 123 | A | 400000124 | Barb Brooks |
CMPT 189 | C- | 400000124 | Barb Brooks |
Denormalizing the data effectively requires that we know ahead of time what queries we will perform. (e.g. if we didn't need to produce class lists, that structure would be stupid.)
When updating data, we have to be careful to update every instance of it. That could be very tricky.
Lesson: denormalizing data is a bad idea. But sometimes it's the least-bad idea available.