Go multithreading with go routines, channels and waitgroups: a Golang kata
It's a well-established fact, that Chuck Norris counted from zero to the infinity. Twice. Today I'm gonna prove it using Golang.
What I plan to achieve:
- Make a routine that counts from 0 onwards.
- Run that routine in threads in amount that is equal to the number of CPU cores.
- Read the output of the threads by consuming a channel, and display that in the main process.
- The main process should wait for all the threads to complete.
- On pressing Ctrl+C implement graceful shutdown of the threads.
- Have a cancellable context the threads will check on, to make sure to come to a halt when the context expires.
In order to achieve this, the following instruments of the language must be used:
- Go routines that allow running a piece of code in a thread.
- Channels to communicate with those threads. They work somewhat similar to the NodeJS pipes.
- Wait groups to sync between the threads and the main process.
Cool, sounds like a plan.
I've spent some time trying to figure the solution, but here is the code eventually:
package mainimport ("context""fmt""os""os/signal""runtime""sync""syscall""time")type CounterValue struct {ChuckID stringValue int32}func main() {numCPUs := runtime.NumCPU()ctx, cancelCtx := context.WithCancel(context.Background())defer cancelCtx()// the data channel is used to retrieve the results produced by the other threadsdataChannel := make(chan *CounterValue)signalChannel := GetSignalChan()// start another thread that will monitor the signalChannelgo func() {// reading from the channel. this is a blocking operation<-signalChannelfmt.Println("Terminating...")// on terminate we cancel the context and close the data channel, so the main thread could move oncancelCtx()close(signalChannel)}()wg := sync.WaitGroup{}wg.Add(numCPUs) // add numCPUs locks to the wait groupfor i := 0; i < numCPUs; i++ {// fly, my friendgo ChuckNorris(ctx, &wg, dataChannel, fmt.Sprintf("Chuck #%d", i), int32(i*10))}// try reading from the channel in an endless cycle. This is a blocking operation,// but the main thread doesn't do anything useful anywayfor {msg, open := <-dataChannelif !open {break}fmt.Printf("%s counts %d\n", msg.ChuckID, msg.Value)}// wait until all threads gracefully shut downwg.Wait()}func ChuckNorris(ctx context.Context, wg *sync.WaitGroup, dataChannel chan *CounterValue, id string, increment int32) {counter := int32(0)sent := truefor {// check if the context wasn't cancelledselect {case <-ctx.Done():fmt.Printf("%s has left the building\n", id)wg.Done() // release 1 lock from the wait groupreturndefault:}// imitate some heavy dutytime.Sleep(2 * time.Second)// do actual work only if the previous one was sentif sent {counter += increment}// try sending to the channelselect {case dataChannel <- &CounterValue{ChuckID: id,Value: counter,}:sent = truedefault:sent = false}}}// GetSignalChan returns a channel that informs about pressing Ctrl+Cfunc GetSignalChan() chan os.Signal {signalChannel := make(chan os.Signal, 1)signal.Notify(signalChannel,syscall.SIGHUP,syscall.SIGINT,syscall.SIGTERM,syscall.SIGQUIT)return signalChannel}
Here is the breakdown of what the code does:
- It gets the amount of CPU cores.
- Then it creates a cancellable context.
- Then it creates an unbuffered data channel, and a signal channel, and starts a go routine that listens to the termination events. When the termination event is given, that go routine cancels the context and closes the data channel.
- Then it defines a wait group and adds N locks, where N is the amount of CPU cores.
- After that it launches N go routines that count.
- And after that, the main process tries to read from the data channel in an endless cycle. This is a blocking operation, but that's fine, since the main thread does not need to do pretty much anything useful after.
- When the data channel is finally closed, the main process moves on and waits until all the counter threads exit.
Each go routine that counts does the following:
- It counts non-stop, trying to write the result to the data channel.
- It checks the context on every iteration. If the context was cancelled, the routine removes one lock from the wait group and exits.
I know, it looks complicated and a bit difficult to wrap the head around, but eventually it turns to be quite straight forward.
I need keep in mind the following for the future:
- I always need to use make() when creating channels. This will not work: var channel chan int32, but due to some funny reason it won't panic either. It may have something to do with the fact, that channels are "special, reference-like" objects.
Lazy init is a solid pattern, but when it comes to multi-threading it should be implemented in the right way, to prevent duplicate initialization on concurrent requests. Consider the following code:
package mainimport ("fmt""sync")type Service struct {subSvc *SubServiceinitMutex sync.Mutex}func (s *Service) getSubService() (*SubService, error) {if s.subSvc == nil {s.initMutex.Lock()defer s.initMutex.Unlock()if s.subSvc == nil {fmt.Println("Init!")s.subSvc = &SubService{}}}return s.subSvc, nil}func (s *Service) GetToken() (string, error) {svc, err := s.getSubService()if err != nil {return "", err}return svc.GetToken(), nil}type SubService struct {}func (ss *SubService) GetToken() string {return "Hello there"}func main() {serv := &Service{}var wg sync.WaitGroupwg.Add(3)concurrentCall := func() {token, err := serv.GetToken()if err != nil {panic(err)}fmt.Printf("Token: %s\n", token)wg.Done()}go concurrentCall()go concurrentCall()go concurrentCall()wg.Wait()}
Here, despite the fact that the concurrentCall() is done three times simultaneously, the InitSubService() function will be initialised exactly once, thanks to the mutex.
There is a way to implement an analog of setInterval() using go routines and channels without sleep(). Here is how it is done.
package mainimport ("context""fmt""sync""time")const Interval = time.Second * 3func main() {ctx := context.Background()ctx, cancelCtx := context.WithCancel(ctx)defer cancelCtx()go func() {for {select {case <-ctx.Done():fmt.Printf("context cancelled\n")returncase <-time.After(Interval):err := doStuff()if err != nil {cancelCtx()return}fmt.Printf("tick!\n")}}}()wg := sync.WaitGroup{}wg.Add(1)wg.Wait()}func doStuff() error {return nil}
You normally want to pass a context to a goroutine to be able to interrupt its execution.
package mainimport ("context""fmt""sync""time")func main() {ctx, cancelCtx := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*80))defer cancelCtx()wg := sync.WaitGroup{}wg.Add(1)go getEm(ctx, &wg)wg.Wait()}func getEm(ctx context.Context, wg *sync.WaitGroup) {for {select {case <- ctx.Done():wg.Done()returncase <- someChannel:// do something usefuldefault:time.Sleep(time.Second)}}}
That's all, folks!
As usual, the code of the kata is available here. Have fun!
Sergei Gannochenko
Golang, React, TypeScript, Docker, AWS, Jamstack.
20+ years in dev.