Avatar

Go multithreading with go routines, channels and waitgroups: a Golang kata

← Back to list
Posted on 09.06.2023
Last updated on 04.12.2024
Image by AI on Midjourney
Refill!

Table of contents

It's a well-established fact, that Chuck Norris counted from zero to the infinity. Twice. Today I'm gonna prove it using Golang.

What I plan to achieve:

  • Make a routine that counts from 0 onwards.
  • Run that routine in threads in amount that is equal to the number of CPU cores.
  • Read the output of the threads by consuming a channel, and display that in the main process.
  • The main process should wait for all the threads to complete.
  • On pressing Ctrl+C implement graceful shutdown of the threads.
  • Have a cancellable context the threads will check on, to make sure to come to a halt when the context expires.

In order to achieve this, the following instruments of the language must be used:

  • Go routines that allow running a piece of code in a thread.
  • Channels to communicate with those threads. They work somewhat similar to the NodeJS pipes.
  • Wait groups to sync between the threads and the main process.

Cool, sounds like a plan.

# The code

I've spent some time trying to figure the solution, but here is the code eventually:

package main
import (
"context"
"fmt"
"os"
"os/signal"
"runtime"
"sync"
"syscall"
"time"
)
type CounterValue struct {
ChuckID string
Value int32
}
func main() {
numCPUs := runtime.NumCPU()
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()
// the data channel is used to retrieve the results produced by the other threads
dataChannel := make(chan *CounterValue)
signalChannel := GetSignalChan()
// start another thread that will monitor the signalChannel
go func() {
// reading from the channel. this is a blocking operation
<-signalChannel
fmt.Println("Terminating...")
// on terminate we cancel the context and close the data channel, so the main thread could move on
cancelCtx()
close(signalChannel)
}()
wg := sync.WaitGroup{}
wg.Add(numCPUs) // add numCPUs locks to the wait group
for i := 0; i < numCPUs; i++ {
// fly, my friend
go ChuckNorris(ctx, &wg, dataChannel, fmt.Sprintf("Chuck #%d", i), int32(i*10))
}
// try reading from the channel in an endless cycle. This is a blocking operation,
// but the main thread doesn't do anything useful anyway
for {
msg, open := <-dataChannel
if !open {
break
}
fmt.Printf("%s counts %d\n", msg.ChuckID, msg.Value)
}
// wait until all threads gracefully shut down
wg.Wait()
}
func ChuckNorris(ctx context.Context, wg *sync.WaitGroup, dataChannel chan *CounterValue, id string, increment int32) {
counter := int32(0)
sent := true
for {
// check if the context wasn't cancelled
select {
case <-ctx.Done():
fmt.Printf("%s has left the building\n", id)
wg.Done() // release 1 lock from the wait group
return
default:
}
// imitate some heavy duty
time.Sleep(2 * time.Second)
// do actual work only if the previous one was sent
if sent {
counter += increment
}
// try sending to the channel
select {
case dataChannel <- &CounterValue{
ChuckID: id,
Value: counter,
}:
sent = true
default:
sent = false
}
}
}
// GetSignalChan returns a channel that informs about pressing Ctrl+C
func GetSignalChan() chan os.Signal {
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)
return signalChannel
}
The code is licensed under the MIT license

Here is the breakdown of what the code does:

  • It gets the amount of CPU cores.
  • Then it creates a cancellable context.
  • Then it creates an unbuffered data channel, and a signal channel, and starts a go routine that listens to the termination events. When the termination event is given, that go routine cancels the context and closes the data channel.
  • Then it defines a wait group and adds N locks, where N is the amount of CPU cores.
  • After that it launches N go routines that count.
  • And after that, the main process tries to read from the data channel in an endless cycle. This is a blocking operation, but that's fine, since the main thread does not need to do pretty much anything useful after.
  • When the data channel is finally closed, the main process moves on and waits until all the counter threads exit.

Each go routine that counts does the following:

  • It counts non-stop, trying to write the result to the data channel.
  • It checks the context on every iteration. If the context was cancelled, the routine removes one lock from the wait group and exits.

I know, it looks complicated and a bit difficult to wrap the head around, but eventually it turns to be quite straight forward.

I need keep in mind the following for the future:

  • I always need to use make() when creating channels. This will not work: var channel chan int32, but due to some funny reason it won't panic either. It may have something to do with the fact, that channels are "special, reference-like" objects.

# Case 1: implementing the lazy init pattern

Lazy init is a solid pattern, but when it comes to multi-threading it should be implemented in the right way, to prevent duplicate initialization on concurrent requests. Consider the following code:

package main
import (
"fmt"
"sync"
)
type Service struct {
subSvc *SubService
initMutex sync.Mutex
}
func (s *Service) getSubService() (*SubService, error) {
if s.subSvc == nil {
s.initMutex.Lock()
defer s.initMutex.Unlock()
if s.subSvc == nil {
fmt.Println("Init!")
s.subSvc = &SubService{}
}
}
return s.subSvc, nil
}
func (s *Service) GetToken() (string, error) {
svc, err := s.getSubService()
if err != nil {
return "", err
}
return svc.GetToken(), nil
}
type SubService struct {
}
func (ss *SubService) GetToken() string {
return "Hello there"
}
func main() {
serv := &Service{}
var wg sync.WaitGroup
wg.Add(3)
concurrentCall := func() {
token, err := serv.GetToken()
if err != nil {
panic(err)
}
fmt.Printf("Token: %s\n", token)
wg.Done()
}
go concurrentCall()
go concurrentCall()
go concurrentCall()
wg.Wait()
}
The code is licensed under the MIT license

Here, despite the fact that the concurrentCall() is done three times simultaneously, the InitSubService() function will be initialised exactly once, thanks to the mutex.

# Case 2: implementing an analog of setInterval() from JS

There is a way to implement an analog of setInterval() using go routines and channels without sleep(). Here is how it is done.

package main
import (
"context"
"fmt"
"sync"
"time"
)
const Interval = time.Second * 3
func main() {
ctx := context.Background()
ctx, cancelCtx := context.WithCancel(ctx)
defer cancelCtx()
go func() {
for {
select {
case <-ctx.Done():
fmt.Printf("context cancelled\n")
return
case <-time.After(Interval):
err := doStuff()
if err != nil {
cancelCtx()
return
}
fmt.Printf("tick!\n")
}
}
}()
wg := sync.WaitGroup{}
wg.Add(1)
wg.Wait()
}
func doStuff() error {
return nil
}
The code is licensed under the MIT license

# Case 3: controlling the goroutine from outside

You normally want to pass a context to a goroutine to be able to interrupt its execution.

package main
import (
"context"
"fmt"
"sync"
"time"
)
func main() {
ctx, cancelCtx := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*80))
defer cancelCtx()
wg := sync.WaitGroup{}
wg.Add(1)
go getEm(ctx, &wg)
wg.Wait()
}
func getEm(ctx context.Context, wg *sync.WaitGroup) {
for {
select {
case <- ctx.Done():
wg.Done()
return
case <- someChannel:
// do something useful
default:
time.Sleep(time.Second)
}
}
}
The code is licensed under the MIT license

That's all, folks!

As usual, the code of the kata is available here. Have fun!


Avatar

Sergei Gannochenko

Business-oriented fullstack engineer, in ❤️ with Tech.
Golang, React, TypeScript, Docker, AWS, Jamstack.
19+ years in dev.