Go provides some very nice tools to produce safe, concurrent code: goroutines and channels (so far). Those are useful, but it's not always clear how to create every kind of concurrent logic with them.
There are a few common patterns for concurrent code. Let's look at ways to implement a few of them…
Channels let us write something very much like concurrent coroutines. The notable difference between a plain
function is that we have to give back results in a channel instead of returning.
We have seen some examples like this, but one more wouldn't hurt…
A sample problem: add up prime numbers from 2 to the largest prime \(\le n\).
It's probably easier to think of this as two parts: (1) produce the primes; (2) add them up. We might be able to reuse (1) for some other task later.
In this case the coroutine
pattern is just helping us structure our code better: reusable logic, easy-to-understand pieces, no extra memory to store an array.
We can produce the prime numbers in a fairly standard way:
// Produce prime numbers 2 to <=n on the channel. func primesUpTo(n int, results chan int) { results <- 2 for p := 3; p <= n; p += 2 { if isPrime(p) { results <- p } } close(results) }
Full code: coroutine.go
.
Then we can consume those values to get the total we want:
// Return the sum of primes from 2 up to <=n. func addPrimesTo(n int) (total int) { primes := make(chan int) go primesUpTo(n, primes) for p := range primes { total += p } return total }
The parallelism is nice, but not really doing much: there isn't much calculation happening in this coroutine
.
Another example: from A Tour of Go, the Equivalent Binary Trees exercise.
There's another structure we'll need to get concurrent algorithms working correctly: the sync.WaitGroup
. The goal is to be able to tell when all of our goroutines have completed.
A WaitGroup
is essentially a thread-safe counter where we can start the counter at the number of goroutines, and count down to zero.
On a WaitGroup
, calling .Add(n)
increases the number of tasks we're waiting on; .Done()
indicates that one thing has completed; .Wait()
waits for the counter to hit zero.
const n = 10 wg := sync.WaitGroup{} wg.Add(n) for i := 0; i < n; i++ { go func(i int) { defer wg.Done() fmt.Println(i) }(i) } wg.Wait()
There is a limit on how much speedup you can get with even a huge number of processor cores. It depends on the fraction of the work that can be done concurrently: see Amdahl's law. Tasks that can be (almost) completely done in parallel are called embarrassingly parallel.
One thing that is embarrassingly parallel: mapping a function over a list.
The obvious non-concurrent way to do this:
type operType func(float64) float64 func SequentialMap(op operType, values []float64) []float64 { result := make([]float64, len(values)) for pos, val := range values { result[pos] = op(val) } return result }
Full code for this example: parmap_test.go
.
We should do the operations in parallel: start goroutines to do the calculations and wait for them to finish.
This is exactly what a WaitGroup
is for…
We can do each task in a goroutine like this:
func GoroutineMap(op operType, values []float64) []float64 { result := make([]float64, len(values)) wg := sync.WaitGroup{} wg.Add(len(values)) for pos, val := range values { go func(pos int, val float64) { defer wg.Done() result[pos] = op(val) }(pos, val) } wg.Wait() return result }
Now that it's running in parallel we should see a speedup…
BenchmarkSequentialMap-12 316 3659042 ns/op BenchmarkGoroutineMap-12 52 23275163 ns/op
… but we don't because the overhead of creating the goroutine (creating the thread, calling the function, updating the WaitGroup
, etc) is much larger than the work being done.
Once again, too small a piece of work doesn't make sense to do in parallel. We need to break the task into larger pieces. We can choose a larger size…
func ChunkedMap(op operType, values []float64) []float64 { const chunkSize = 100 length := len(values) result := make([]float64, length) wg := sync.WaitGroup{} for p := 0; p < length; p += chunkSize { wg.Add(1) go func(start int) { defer wg.Done() end := min(start+chunkSize, length) for pos, val := range values[start:end] { result[start+pos] = op(val) } }(p) } wg.Wait() return result }
That calls a smaller number of goroutines, and we see the speedup we expect (with 6 cores/12 threads):
BenchmarkSequentialMap-12 316 3659042 ns/op BenchmarkGoroutineMap-12 52 23275163 ns/op BenchmarkChunkedMap-12 1437 741993 ns/op
Any kind of divide-and-conquer algorithm is an obvious candidate for concurrency.
We will have to figure out what parts of the work can/should be done concurrently, and again have the problem of doing too many things concurrently.
As an example, Merge sort. The strategy: recursively sort halves of the array, and then merge the sorted halves into a single sorted result.
We will return copies of the data in a new sorted slice. Because we're using goroutines, we'll mostly send the slice back in a channel (not a return
value).
[It would probably be faster to pre-allocate result arrays and fill them in, but we'll use this as another example of using channels.]
In order to keep the concurrency reasonable, we'll have a threshold of how many levels deep to recurse in parallel. After that, do things sequentially.
To adapt the concurrent code to the API we'd like, a wrapper:
// Merge sort data in slice, with concurrent recursive // steps concurDepth levels deep func MergeSort(data []int, concurDepth int) []int { result := make(chan []int, 1) // must not block mergeSortConcur(data, concurDepth, result) return <-result }
Full code for this example: mergesort_test.go
.
Base case: slices of length 0, 1 are already sorted:
func mergeSortConcur(data []int, depth int, res chan []int) { if len(data) < 2 { res <- data return }
Then, do the recursive steps: we need to get the results on a channel, and can re-use the same logic without goroutines for the sequential calls.
mid := len(data) / 2 leftChan := make(chan []int, 1) rightChan := make(chan []int, 1) if depth > 0 { go mergeSortConcur(data[:mid], depth-1, leftChan) go mergeSortConcur(data[mid:], depth-1, rightChan) } else { mergeSortConcur(data[:mid], depth-1, leftChan) mergeSortConcur(data[mid:], depth-1, rightChan) } left := <-leftChan right := <-rightChan
Then, merge the sorted halves. We need somewhere to put the result:
result := make([]int, len(data))
See the full code for the merge implementation. Then, send back the result:
res <- result
Notes:
As in previous examples, there's a fairly wide range of best
amounts of concurrency that satisfy (1) at least as many threads as cores and (2) not so many threads that their creation adds up to a lot of overhead.
Timing here with depth = 0, 2, 4, 8, 16:
BenchmarkMergeSort0-12 344 3604242 ns/op BenchmarkMergeSort2-12 847 1506233 ns/op BenchmarkMergeSort4-12 998 1303747 ns/op BenchmarkMergeSort8-12 919 1389049 ns/op BenchmarkMergeSort16-12 318 3434642 ns/op
That was a fast tour of concurrent programming and one language that helps make it easier.
Depending on your language choice for Assignment 2, you may have another basis for comparison. Other languages approach thread safety in different ways, but many new
languages are also designed around the idea that concurrency is a thing that programmers should actually do.
Go summary: it shows a similar design philosophy to C, which is also very minimal. But, it is nicely modernized and well designed.
Concurrency summary: it's not as hard as an OS course makes it seem. Not every problem is hard to solve with concurrently. The language can help, but can't solve all of our problems.