
Go multithreading with go routines, channels and waitgroups: a Golang kata
It's a well-established fact, that Chuck Norris counted from zero to the infinity. Twice. Today I'm gonna prove it using Golang.
What I plan to achieve:
- Make a routine that counts from 0 onwards.
- Run that routine in threads in amount that is equal to the number of CPU cores.
- Read the output of the threads by consuming a channel, and display that in the main process.
- The main process should wait for all the threads to complete.
- On pressing Ctrl+C implement graceful shutdown of the threads.
- Have a cancellable context the threads will check on, to make sure to come to a halt when the context expires.
In order to achieve this, the following instruments of the language must be used:
- Go routines that allow running a piece of code in a thread.
- Channels to communicate with those threads. They work somewhat similar to the NodeJS pipes.
- Wait groups to sync between the threads and the main process.
Cool, sounds like a plan.
I've spent some time trying to figure the solution, but here is the code eventually:
package mainimport ("context""fmt""os""os/signal""runtime""sync""syscall""time")type CounterValue struct {ChuckID int32Value int32}func main() {numCPUs := runtime.NumCPU()ctx, cancelCtx := context.WithCancel(context.Background())defer cancelCtx()// the data channel is used to retrieve the results produced by the other threadsdataChannel := make(chan *CounterValue)signalChannel := GetSignalChan()// start another thread that will monitor the signalChannelgo func() {// reading from the channel. this is a blocking operation<-signalChannelfmt.Println("Terminating...")// the context cancellation triggers the shutdown procedurecancelCtx()}()wg := sync.WaitGroup{}wg.Add(numCPUs) // add numCPUs locks to the wait groupfor i := 0; i < numCPUs; i++ {// fly, my friendgo ChuckNorris(ctx, &wg, dataChannel, i, int32(i*10))}// try reading from the channel in an endless cycle. This is a blocking operation,// but the main thread doesn't do anything useful anywayfor {select {case <-ctx.Done():breakcase msg, open := <-dataChannel:if !open {break}fmt.Printf("%d counts %d\n", msg.ChuckID, msg.Value)}}// wait until all threads gracefully shut downwg.Wait()// only now we close the data channel, to avoid panics-on-closed-channel-read in the Chucksclose(dataChannel)}func ChuckNorris(ctx context.Context, wg *sync.WaitGroup, dataChannel chan<- *CounterValue, id int32, increment int32) {defer wg.Done() // release 1 lock from the wait groupcounter := int32(0)sent := truefor {// check if the context wasn't cancelledselect {case <-ctx.Done():fmt.Printf("Chuck #%d has left the building\n", id)returndefault:}// imitate some heavy dutytime.Sleep(2 * time.Second)// do actual work only if the previous one was sentif sent {counter += increment}// try sending to the channelselect {case dataChannel <- &CounterValue{ChuckID: id,Value: counter,}:sent = truedefault:sent = false}}}// GetSignalChan returns a channel that informs about pressing Ctrl+Cfunc GetSignalChan() chan os.Signal {signalChannel := make(chan os.Signal, 1)signal.Notify(signalChannel,syscall.SIGHUP,syscall.SIGINT,syscall.SIGTERM,syscall.SIGQUIT)return signalChannel}
Here is the breakdown of what the code does:
- It gets the amount of CPU cores.
- Then it creates a cancellable context.
- Then it creates an unbuffered data channel, and a signal channel, and starts a go routine that listens to the termination events. When the termination event is given, that go routine cancels the context and closes the data channel.
- Then it defines a wait group and adds N locks, where N is the amount of CPU cores.
- After that it launches N go routines that count.
- And after that, the main process tries to read from the data channel in an endless cycle. This is a blocking operation, but that's fine, since the main thread does not need to do pretty much anything useful after.
- When the data channel is finally closed, the main process moves on and waits until all the counter threads exit.
Each go routine that counts does the following:
- It counts non-stop, trying to write the result to the data channel.
- It checks the context on every iteration. If the context was cancelled, the routine removes one lock from the wait group and exits.
I know, it looks complicated and a bit difficult to wrap the head around, but eventually it turns to be quite straight forward.
I need keep in mind the following for the future:
- I always need to use
make()
when creating channels. This will not work:var channel chan int32
, but due to some funny reason it won't panic either. It may have something to do with the fact, that channels are "special, reference-like" objects.
Lazy init is a solid pattern, but when it comes to multi-threading it should be implemented in the right way, to prevent duplicate initialization on concurrent requests. Consider the following code:
package mainimport ("fmt""sync")type Service struct {subSvc *SubServiceinitMutex sync.Mutex}func (s *Service) getSubService() (*SubService, error) {if s.subSvc == nil {s.initMutex.Lock()defer s.initMutex.Unlock()if s.subSvc == nil {fmt.Println("Init!")s.subSvc = &SubService{}}}return s.subSvc, nil}func (s *Service) GetToken() (string, error) {svc, err := s.getSubService()if err != nil {return "", err}return svc.GetToken(), nil}type SubService struct {}func (ss *SubService) GetToken() string {return "Hello there"}func main() {serv := &Service{}var wg sync.WaitGroupwg.Add(3)concurrentCall := func() {token, err := serv.GetToken()if err != nil {panic(err)}fmt.Printf("Token: %s\n", token)wg.Done()}go concurrentCall()go concurrentCall()go concurrentCall()wg.Wait()}
Here, despite the fact that the concurrentCall()
is done three times simultaneously, the InitSubService()
function will be initialised exactly
once, thanks to the mutex.
There is a way to implement an analog of setInterval() using go routines and channels without sleep(). Here is how it is done.
package mainimport ("context""fmt""sync""time")const Interval = time.Second * 3func main() {ctx := context.Background()ctx, cancelCtx := context.WithCancel(ctx)defer cancelCtx()go func() {for {select {case <-ctx.Done():fmt.Printf("context cancelled\n")returncase <-time.After(Interval):err := doStuff()if err != nil {cancelCtx()return}fmt.Printf("tick!\n")}}}()wg := sync.WaitGroup{}wg.Add(1)wg.Wait()}func doStuff() error {return nil}
You normally want to pass a context to a goroutine to be able to interrupt its execution, and also communicate with the goroutine using channels.
package mainimport ("context""fmt""sync""time")func main() {ctx, cancelCtx := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*80))defer cancelCtx()wg := sync.WaitGroup{}wg.Add(1)commChannel := make(chan int)go getEm(ctx, &wg, commChannel)commChannel <- 1commChannel <- 2commChannel <- 3close(commChannel)wg.Wait()}func getEm(ctx context.Context, wg *sync.WaitGroup, commChannel chan int) {defer wg.Done()for {select {case <- ctx.Done():fmt.Printf("context cancelled\n")returncase number, ok := <- commChannel:if !ok {// channel is closedreturn}fmt.Printf("number is %d!\n", number)select {case <- ctx.Done():fmt.Printf("context cancelled\n")returndefault:}}}}
What is important here:
- The
commChannel
is closed after the last message is sent. If this is not done, the goroutine will never exit and this will lead to a deadlock. - The
ctx
is passed to the goroutine, and if it timeouts, the goroutine exits. - The
comma ok
idiom is used to check if the channel is closed. Without it, the goroutine will never know that the channel is closed, thus thecase number := <- commChannel:
will always trigger first, and the for loop will continue indefinitely. Reading from a closed channel is always successful and returns the zero value of the channel type.
- Reading from an open unbuffered or empty buffered channel is always blocking.
- Writing to an open unbuffered or full buffered channel is always blocking.
- Reading from a closed channel is always non-blocking, successful and returns a zero value of the channel type.
- Writing to a closed channel panics.
- Closing a closed or nil (unitialized) channel panics (use
sync.Once
to close a channel only once). - Always use the
comma ok
idiom inside a select statement to check if the channel is closed, to be on a safe side. - The select statement is undeterministic, so the order of the cases isn't guaranteed.
- The select statement chooses one ready case pseudo-randomly when multiple cases are ready, but doesn't guarantee equal probability, so it may lead to condition starvation.
- The select statement becomes non-blocking when there is a default case.
Since the select statement is undeterministic, it can lead to a situation when one of the cases is never triggered. This is called "condition starvation", and can lead to a go routine becoming unresponsive. To avoid this, a nested select statement is used:
func someWorker(ctx context.Context, aVeryBusyChannel chan int) {for {select {case <- ctx.Done():returncase val, ok := <- aVeryBusyChannel:if !ok {return}doSomeHeavyWork(val)select {case <- ctx.Done(): // always checkreturndefault: // move on}}}}
An empty default case in a select statement inside an endless for loop is a CPU trasher:
for {select {case data := <-ch:// process datacase <-ctx.Done():returndefault:// Empty default - if ch and ctx aren't ready,// this will spin in a tight loop consuming CPU!}}
Don't use the default case in a select statement inside an endless for loop.
Avoid adding a delay in the default case either (in attempt to reduce CPU usage), it's a bad pattern and can lead to delayed exit of the go routine. Having such a delay usually signals that you don't need the default case at all.
That's all, folks!
As usual, the code of the kata is available here. Have fun!
