Avatar

Go multithreading with go routines, channels and waitgroups: a Golang kata

← Back to list
Posted on 09.06.2023
Image by AI on Midjourney
Refill!

It's a well-established fact, that Chuck Norris counted from zero to the infinity. Twice. Today I'm gonna prove it using Golang.

What I plan to achieve:

  • Make a routine that counts from 0 onwards.
  • Run that routine in threads in amount that is equal to the number of CPU cores.
  • Read the output of the threads by consuming a channel, and display that in the main process.
  • The main process should wait for all the threads to complete.
  • On pressing Ctrl+C implement graceful shutdown of the threads.
  • Have a cancellable context the threads will check on, to make sure to come to a halt when the context expires.

In order to achieve this, the following instruments of the language must be used:

  • Go routines that allow running a piece of code in a thread.
  • Channels to communicate with those threads. They work somewhat similar to the NodeJS pipes.
  • Wait groups to sync between the threads and the main process.

Cool, sounds like a plan.

The code

I've spent some time trying to figure the solution, but here is the code eventually:

package main
import (
"context"
"fmt"
"os"
"os/signal"
"runtime"
"sync"
"syscall"
"time"
)
type CounterValue struct {
ChuckID string
Value int32
}
func main() {
numCPUs := runtime.NumCPU()
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()
// the data channel is used to retrieve the results produced by the other threads
dataChannel := make(chan *CounterValue)
signalChannel := GetSignalChan()
// start another thread that will monitor the signalChannel
go func() {
// reading from the channel. this is a blocking operation
<-signalChannel
fmt.Println("Terminating...")
// on terminate we cancel the context and close the data channel, so the main thread could move on
cancelCtx()
close(signalChannel)
}()
wg := sync.WaitGroup{}
wg.Add(numCPUs) // add numCPUs locks to the wait group
for i := 0; i < numCPUs; i++ {
// fly, my friend
go ChuckNorris(ctx, &wg, dataChannel, fmt.Sprintf("Chuck #%d", i), int32(i*10))
}
// try reading from the channel in an endless cycle. This is a blocking operation,
// but the main thread doesn't do anything useful anyway
for {
msg, open := <-dataChannel
if !open {
break
}
fmt.Printf("%s counts %d\n", msg.ChuckID, msg.Value)
}
// wait until all threads gracefully shut down
wg.Wait()
}
func ChuckNorris(ctx context.Context, wg *sync.WaitGroup, dataChannel chan *CounterValue, id string, increment int32) {
counter := int32(0)
sent := true
for {
// check if the context wasn't cancelled
select {
case <-ctx.Done():
fmt.Printf("%s has left the building\n", id)
wg.Done() // release 1 lock from the wait group
return
default:
}
// imitate some heavy duty
time.Sleep(2 * time.Second)
// do actual work only if the previous one was sent
if sent {
counter += increment
}
// try sending to the channel
select {
case dataChannel <- &CounterValue{
ChuckID: id,
Value: counter,
}:
sent = true
default:
sent = false
}
}
}
// GetSignalChan returns a channel that informs about pressing Ctrl+C
func GetSignalChan() chan os.Signal {
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)
return signalChannel
}
The code is licensed under the MIT license

Here is the breakdown of what the code does:

  • It gets the amount of CPU cores.
  • Then it creates a cancellable context.
  • Then it creates an unbuffered data channel, and a signal channel, and starts a go routine that listens to the termination events. When the termination event is given, that go routine cancels the context and closes the data channel.
  • Then it defines a wait group and adds N locks, where N is the amount of CPU cores.
  • After that it launches N go routines that count.
  • And after that, the main process tries to read from the data channel in an endless cycle. This is a blocking operation, but that's fine, since the main thread does not need to do pretty much anything useful after.
  • When the data channel is finally closed, the main process moves on and waits until all the counter threads exit.

Each go routine that counts does the following:

  • It counts non-stop, trying to write the result to the data channel.
  • It checks the context on every iteration. If the context was cancelled, the routine removes one lock from the wait group and exits.

I know, it looks complicated and a bit difficult to wrap the head around, but eventually it turns to be quite straight forward.

I need keep in mind the following for the future:

  • I always need to use make() when creating channels. This will not work: var channel chan int32, but due to some funny reason it won't panic either. It may have something to do with the fact, that channels are "special, reference-like" objects.

That's all, folks!

As usual, the code of the kata is available here. Have fun!


Avatar

Sergei Gannochenko

Business-oriented fullstack engineer, in ❤️ with Tech.
React, Node, Go, Docker, AWS, Jamstack.
15+ years in dev.