Go Concurrency Patterns

Go Concurrency Patterns

Go provides some very nice tools to produce safe, concurrent code: goroutines and channels (so far). Those are useful, but it's not always clear how to create every kind of concurrent logic with them.

There are a few common patterns for concurrent code. Let's look at ways to implement a few of them…

Goroutines as Coroutines

Channels let us write something very much like concurrent coroutines. The notable difference between a plain function is that we have to give back results in a channel instead of returning.

We have seen some examples like this, but one more wouldn't hurt…

Goroutines as Coroutines

A sample problem: add up prime numbers from 2 to the largest prime \(\le n\).

It's probably easier to think of this as two parts: (1) produce the primes; (2) add them up. We might be able to reuse (1) for some other task later.

In this case the coroutine pattern is just helping us structure our code better: reusable logic, easy-to-understand pieces, no extra memory to store an array.

Goroutines as Coroutines

We can produce the prime numbers in a fairly standard way:

// Produce prime numbers 2 to <=n on the channel.
func primesUpTo(n int, results chan int) {
	results <- 2
	for p := 3; p <= n; p += 2 {
		if isPrime(p) {
			results <- p
		}
	}
	close(results)
}

Full code: coroutine.go.

Goroutines as Coroutines

Then we can consume those values to get the total we want:

// Return the sum of primes from 2 up to <=n.
func addPrimesTo(n int) (total int) {
	primes := make(chan int)
	go primesUpTo(n, primes)
	for p := range primes {
		total += p
	}
	return total
}

The parallelism is nice, but not really doing much: there isn't much calculation happening in this coroutine.

Goroutines as Coroutines

Another example: from A Tour of Go, the Equivalent Binary Trees exercise.

WaitGroups

There's another structure we'll need to get concurrent algorithms working correctly: the sync.WaitGroup. The goal is to be able to tell when all of our goroutines have completed.

A WaitGroup is essentially a thread-safe counter where we can start the counter at the number of goroutines, and count down to zero.

WaitGroups

On a WaitGroup, calling .Add(n) increases the number of tasks we're waiting on; .Done() indicates that one thing has completed; .Wait() waits for the counter to hit zero.

const n = 10
wg := sync.WaitGroup{}
wg.Add(n)
for i := 0; i < n; i++ {
	go func(i int) {
		defer wg.Done()
		fmt.Println(i)
	}(i)
}
wg.Wait()

Parallel Mapping

There is a limit on how much speedup you can get with even a huge number of processor cores. It depends on the fraction of the work that can be done concurrently: see Amdahl's law. Tasks that can be (almost) completely done in parallel are called embarrassingly parallel.

One thing that is embarrassingly parallel: mapping a function over a list.

Parallel Mapping

The obvious non-concurrent way to do this:

type operType func(float64) float64

func SequentialMap(op operType, values []float64) []float64 {
	result := make([]float64, len(values))
	for pos, val := range values {
		result[pos] = op(val)
	}
	return result
}

Full code for this example: parmap_test.go.

Parallel Mapping

We should do the operations in parallel: start goroutines to do the calculations and wait for them to finish.

This is exactly what a WaitGroup is for…

Parallel Mapping

We can do each task in a goroutine like this:

func GoroutineMap(op operType, values []float64) []float64 {
	result := make([]float64, len(values))
	wg := sync.WaitGroup{}
	wg.Add(len(values))
	for pos, val := range values {
		go func(pos int, val float64) {
			defer wg.Done()
			result[pos] = op(val)
		}(pos, val)
	}
	wg.Wait()
	return result
}

Parallel Mapping

Now that it's running in parallel we should see a speedup…

BenchmarkSequentialMap-12    	     316	   3659042 ns/op
BenchmarkGoroutineMap-12     	      52	  23275163 ns/op

… but we don't because the overhead of creating the goroutine (creating the thread, calling the function, updating the WaitGroup, etc) is much larger than the work being done.

Once again, too small a piece of work doesn't make sense to do in parallel. We need to break the task into larger pieces. We can choose a larger size…

Parallel Mapping

func ChunkedMap(op operType, values []float64) []float64 {
	const chunkSize = 100
	length := len(values)
	result := make([]float64, length)
	wg := sync.WaitGroup{}
	for p := 0; p < length; p += chunkSize {
		wg.Add(1)
		go func(start int) {
			defer wg.Done()
			end := min(start+chunkSize, length)
			for pos, val := range values[start:end] {
				result[start+pos] = op(val)
			}
		}(p)
	}
	wg.Wait()
	return result
}

Parallel Mapping

That calls a smaller number of goroutines, and we see the speedup we expect (with 6 cores/​12 threads):

BenchmarkSequentialMap-12    	     316	   3659042 ns/op
BenchmarkGoroutineMap-12     	      52	  23275163 ns/op
BenchmarkChunkedMap-12       	    1437	    741993 ns/op

Divide and Conquer

Any kind of divide-and-conquer algorithm is an obvious candidate for concurrency.

We will have to figure out what parts of the work can/​should be done concurrently, and again have the problem of doing too many things concurrently.

Divide and Conquer

As an example, Merge sort. The strategy: recursively sort halves of the array, and then merge the sorted halves into a single sorted result.

Divide and Conquer

We will return copies of the data in a new sorted slice. Because we're using goroutines, we'll mostly send the slice back in a channel (not a return value).

[It would probably be faster to pre-allocate result arrays and fill them in, but we'll use this as another example of using channels.]

Divide and Conquer

In order to keep the concurrency reasonable, we'll have a threshold of how many levels deep to recurse in parallel. After that, do things sequentially.

To adapt the concurrent code to the API we'd like, a wrapper:

// Merge sort data in slice, with concurrent recursive
// steps concurDepth levels deep
func MergeSort(data []int, concurDepth int) []int {
	result := make(chan []int, 1) // must not block
	mergeSortConcur(data, concurDepth, result)
	return <-result
}

Full code for this example: mergesort_test.go.

Divide and Conquer

Base case: slices of length 0, 1 are already sorted:

func mergeSortConcur(data []int, depth int, res chan []int) {
	if len(data) < 2 {
		res <- data
		return
	}

Divide and Conquer

Then, do the recursive steps: we need to get the results on a channel, and can re-use the same logic without goroutines for the sequential calls.

mid := len(data) / 2
leftChan := make(chan []int, 1)
rightChan := make(chan []int, 1)
if depth > 0 {
	go mergeSortConcur(data[:mid], depth-1, leftChan)
	go mergeSortConcur(data[mid:], depth-1, rightChan)
} else {
	mergeSortConcur(data[:mid], depth-1, leftChan)
	mergeSortConcur(data[mid:], depth-1, rightChan)
}
left := <-leftChan
right := <-rightChan

Divide and Conquer

Then, merge the sorted halves. We need somewhere to put the result:

result := make([]int, len(data))

See the full code for the merge implementation. Then, send back the result:

res <- result

Divide and Conquer

Notes:

  • We can use channels to communicate between non-concurrent code, but we have to make sure the channel doesn't block.
  • Slices are cheap to make (since they are references, not copies); using them is much easier than passing around array bounds.
  • Goroutine calls are a lot like function calls: sometimes, the difference between concurrent and not is very small.

Divide and Conquer

As in previous examples, there's a fairly wide range of best amounts of concurrency that satisfy (1) at least as many threads as cores and (2) not so many threads that their creation adds up to a lot of overhead.

Timing here with depth = 0, 2, 4, 8, 16:

BenchmarkMergeSort0-12     	     344	   3604242 ns/op
BenchmarkMergeSort2-12     	     847	   1506233 ns/op
BenchmarkMergeSort4-12     	     998	   1303747 ns/op
BenchmarkMergeSort8-12     	     919	   1389049 ns/op
BenchmarkMergeSort16-12    	     318	   3434642 ns/op

Concurrent Wrap-Up

That was a fast tour of concurrent programming and one language that helps make it easier.

Those who chose Rust or Julia for Assignment 2 will have another basis for comparison: they approach thread safety a different way, but are also designed around the idea that concurrency is a thing that programmers should actually do.

Concurrent Wrap-Up

Go summary: it shows a similar design philosophy to C, which is also very minimal. But, it is nicely modernized and well designed.

Concurrency summary: it's not as hard as an OS course makes it seem. Not every problem is hard to solve with concurrently. The language can help, but can't solve all of our problems.