Why MapReduce?

CMPT 732, Fall 2023

Why MapReduce?

Why did MapReduce get created the way it was?

Why is the Hadoop cluster infrastructure (YARN, HDFS) structured the way it is?

This is a good time for a little context…

MapReduce History

The original publication: MapReduce: Simplified Data Processing on Large Clusters, 2004.

Highlights:

  • Many real world tasks are expressible in this model, as shown in the paper.
  • Programs written in this functional style are automatically parallelized and executed on a large cluster of commodity machines. The run-time system takes care of the details of partitioning…, scheduling…, failures…, communication.

MapReduce History

  • Our abstraction is inspired by the map and reduce primitives present in Lisp and many other functional languages. We realized that most of our computation involved applying a map operation… and then applying a reduce operation…
  • map: (k1,v1) → list(k2,v2)
    reduce: (k2,list(v2)) → list(v2)
  • A cluster consists of hundreds or thousands of machines, and therefore machine failures are common.

MapReduce History

The Hadoop implementation of MapReduce (that we used) was inspired by this paper.

… and it seems to be a fairly faithful implementation.

MapReduce History

MapReduce lets do computation in parallel in a very scalable way. Compared to other options at the time, it was very easy and flexible.

But now it seems very restrictive.

MapReduce History

How would you implement an iterative algorithm (gradient descent, logistic regression, PageRank, etc) with MapReduce?

Probably something like:


bool converged = False;
int iter = 0;
while ( !converged && iter < iter_limit ) {
  Job job = Job.getInstance(conf, "gradient descent");
  ⋮
  TextInputFormat.addInputPath(job, new Path('temp' + (iter-1)));
  TextOutputFormat.setOutputPath(job, new Path('temp' + iter));
  job.waitForCompletion(true);
  converged = …;
  iter++;
}

Fault Tolerance

Computers fail. In a cluster, something failing starts to be more and more likely…

Fault Tolerance

If each machine fails with probability 0.1% (in one day/​week/​month/​year), then…

Machines
in Cluster
Failure
Probability
10.1%
101.0%
1009.5%
100063.2%
10,00099.996%
\(n\)\(1 - (1-0.001)^n\)

Fault Tolerance

Lesson: In a cluster, failure is a real possibilty we have to deal with.

What happens if a cluster node disappears?

Fault Tolerance

HDFS handles faults by keeping multiple copies of each block. Number of copies is controled by the dfs.replication configuration: default 3.

This also makes it easier to schedule jobs local to data: there are more options.

Fault Tolerance

In Hadoop 3, HDFS fault tolerance can also be done with erasure coding.

This decreases the storage overhead of fault tolerance, but requires reading several nodes to get the data. Better for infrequently-accessed data.

Fault Tolerance

If one of the nodes holding a particular block fails, HDFS can declare the node missing and restore the desired replication.

If all of them disappear, that block will be inaccessible, but other filles (available on ≥1 node) will keep working.

Fault Tolerance

For YARN, what happens if some work is happening and the node dies? The task tight have done nothing. It might have written incomplete output (to a file/​database/​wherever).

Short answer: it will do its best to recover.

Fault Tolerance

Remember that our jobs are broken up into smaller pieces: map or reduce tasks, Spark tasks on one partition.

If the task writing part-00005 fails, then it should be safe to delete it and start that task on another node (because creating it is idempotent).

Fault Tolerance

But what if our output is a database or something else? What if getting the prerequisites of part-00005 is a huge amount of work?

These depend on the details: you should occasionally ask yourself what if part of this fails?

Fault Tolerance

In Spark, you can cache to disk, or checkpoint contents of a DataFrame (or RDD) so that it can be read and computation resumed if a task fails.


from pyspark.storagelevel import StorageLevel
step1 = horrible_long_computation()
step1 = step1.persist(MEMORY_AND_DISK)
step2 = step1.…

sc.setCheckpointDir('./spark-checkpoints')
step1 = horrible_long_computation()
step1 = step1.cache().checkpoint()
step2 = step1.…

Fault Tolerance

The difference: checkpointing completely disconnects the execution plan from the ancestors; caching/​persisting doesn't. Checkpointing can also help break up a big execution plan from a big calculation.

For a DataFrame, .cache() is a shorthand for .persist(MEMORY_AND_DISK). For an RDD, it is shorthand for .persist(MEMORY_ONLY).

Where Your Data Might Be

We have seen several places our data might be stored (permanently or temporarily) while we're working on it.

There are massive speed differences between them: having some (approximate) idea of the tradeoffs is important.

Where Your Data Might Be

Tech Size Latency
(cycles)
Throughput
(B/s)
Registers few kB 1
L1 cache <100 kB 4 * 250 G–1 T **
RAM 10s GB 50–100 ** 20 G–50 G *
SSD 100s GB 106 * 500M (per disk) *
Spinning HD TB 107 * 100 M (per disk) *
Network PB? 107 + disk 120 M (shared?) *

Where Your Data Might Be

MapReduce's habit of putting map output onto disk seems like an obvious bottleneck.

Keeping data in memory should be much faster (if it fits). Spark gives you that option. You can use it to make your logic run faster, but you may either run out of memory and fail, or spill data to disk anyway.