
Go multithreading with go routines, channels and waitgroups: a Golang kata
It's a well-established fact, that Chuck Norris counted from zero to the infinity. Twice. Today I'm gonna prove it using Golang.
What I plan to achieve:
- Make a routine that counts from 0 onwards.
- Run that routine in threads in amount that is equal to the number of CPU cores.
- Read the output of the threads by consuming a channel, and display that in the main process.
- The main process should wait for all the threads to complete.
- On pressing Ctrl+C implement graceful shutdown of the threads.
- Have a cancellable context the threads will check on, to make sure to come to a halt when the context expires.
In order to achieve this, the following instruments of the language must be used:
- Go routines that allow running a piece of code in a thread.
- Channels to communicate with those threads. They work somewhat similar to the NodeJS pipes.
- Wait groups to sync between the threads and the main process.
Cool, sounds like a plan.
I've spent some time trying to figure the solution, but here is the code eventually:
package mainimport ("context""fmt""os""os/signal""runtime""sync""syscall""time")type CounterValue struct {ChuckID int32Value int32}func main() {numCPUs := runtime.NumCPU()ctx, cancelCtx := context.WithCancel(context.Background())defer cancelCtx()// the data channel is used to retrieve the results produced by the other threadsdataChannel := make(chan *CounterValue)signalChannel := GetSignalChan()// start another thread that will monitor the signalChannelgo func() {// reading from the channel. this is a blocking operation<-signalChannelfmt.Println("Terminating...")// the context cancellation triggers the shutdown procedurecancelCtx()}()wg := sync.WaitGroup{}wg.Add(numCPUs) // add numCPUs locks to the wait groupfor i := 0; i < numCPUs; i++ {// fly, my friendgo ChuckNorris(ctx, &wg, dataChannel, i, int32(i*10))}// try reading from the channel in an endless cycle. This is a blocking operation,// but the main thread doesn't do anything useful anywayfor {select {case <-ctx.Done():breakcase msg, open := <-dataChannel:if !open {break}fmt.Printf("%d counts %d\n", msg.ChuckID, msg.Value)}}// wait until all threads gracefully shut downwg.Wait()// only now we close the data channel, to avoid panics-on-closed-channel-read in the Chucksclose(dataChannel)}func ChuckNorris(ctx context.Context, wg *sync.WaitGroup, dataChannel chan<- *CounterValue, id int32, increment int32) {defer wg.Done() // release 1 lock from the wait groupcounter := int32(0)sent := truefor {// check if the context wasn't cancelledselect {case <-ctx.Done():fmt.Printf("Chuck #%d has left the building\n", id)returndefault:}// imitate some heavy dutytime.Sleep(2 * time.Second)// do actual work only if the previous one was sentif sent {counter += increment}// try sending to the channelselect {case dataChannel <- &CounterValue{ChuckID: id,Value: counter,}:sent = truedefault:sent = false}}}// GetSignalChan returns a channel that informs about pressing Ctrl+Cfunc GetSignalChan() chan os.Signal {signalChannel := make(chan os.Signal, 1)signal.Notify(signalChannel,syscall.SIGHUP,syscall.SIGINT,syscall.SIGTERM,syscall.SIGQUIT)return signalChannel}
Here is the breakdown of what the code does:
- It gets the amount of CPU cores.
- Then it creates a cancellable context.
- Then it creates an unbuffered data channel, and a signal channel, and starts a go routine that listens to the termination events. When the termination event is given, that go routine cancels the context and closes the data channel.
- Then it defines a wait group and adds N locks, where N is the amount of CPU cores.
- After that it launches N go routines that count.
- And after that, the main process tries to read from the data channel in an endless cycle. This is a blocking operation, but that's fine, since the main thread does not need to do pretty much anything useful after.
- When the data channel is finally closed, the main process moves on and waits until all the counter threads exit.
Each go routine that counts does the following:
- It counts non-stop, trying to write the result to the data channel.
- It checks the context on every iteration. If the context was cancelled, the routine removes one lock from the wait group and exits.
I know, it looks complicated and a bit difficult to wrap the head around, but eventually it turns to be quite straight forward.
I need keep in mind the following for the future:
- I always need to use
make()
when creating channels. This will not work:var channel chan int32
, but due to some funny reason it won't panic either. It may have something to do with the fact, that channels are "special, reference-like" objects.
Lazy init is a solid pattern, but when it comes to multi-threading it should be implemented in the right way, to prevent duplicate initialization on concurrent requests. Consider the following code:
package mainimport ("fmt""sync")type Service struct {subSvc *SubServiceinitMutex sync.Mutex}func (s *Service) getSubService() (*SubService, error) {if s.subSvc == nil {s.initMutex.Lock()defer s.initMutex.Unlock()if s.subSvc == nil {fmt.Println("Init!")s.subSvc = &SubService{}}}return s.subSvc, nil}func (s *Service) GetToken() (string, error) {svc, err := s.getSubService()if err != nil {return "", err}return svc.GetToken(), nil}type SubService struct {}func (ss *SubService) GetToken() string {return "Hello there"}func main() {serv := &Service{}var wg sync.WaitGroupwg.Add(3)concurrentCall := func() {token, err := serv.GetToken()if err != nil {panic(err)}fmt.Printf("Token: %s\n", token)wg.Done()}go concurrentCall()go concurrentCall()go concurrentCall()wg.Wait()}
Here, despite the fact that the concurrentCall()
is done three times simultaneously, the InitSubService()
function will be initialised exactly
once, thanks to the mutex.
There is a way to implement an analog of setInterval() using go routines and channels without sleep(). Here is how it is done.
package mainimport ("context""fmt""sync""time")const Interval = time.Second * 3func main() {ctx := context.Background()ctx, cancelCtx := context.WithCancel(ctx)defer cancelCtx()go func() {for {select {case <-ctx.Done():fmt.Printf("context cancelled\n")returncase <-time.After(Interval):err := doStuff()if err != nil {cancelCtx()return}fmt.Printf("tick!\n")}}}()wg := sync.WaitGroup{}wg.Add(1)wg.Wait()}func doStuff() error {return nil}
To prevent a go routine from going astray, you normally want to pass a context to a goroutine to be able to interrupt its execution, and also communicate with the goroutine using channels.
package mainimport ("context""fmt""sync""time")func main() {ctx, cancelCtx := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*80))defer cancelCtx()wg := sync.WaitGroup{}wg.Add(1)commChannel := make(chan int)go getEm(ctx, &wg, commChannel)commChannel <- 1commChannel <- 2commChannel <- 3close(commChannel)wg.Wait()}func getEm(ctx context.Context, wg *sync.WaitGroup, commChannel chan int) {defer wg.Done()for {select {case <- ctx.Done():fmt.Printf("context cancelled\n")returncase number, ok := <- commChannel:if !ok {// channel is closedreturn}fmt.Printf("number is %d!\n", number)select {case <- ctx.Done():fmt.Printf("context cancelled\n")returndefault:}}}}
What is important here:
- The
commChannel
is closed after the last message is sent. If this is not done, the goroutine will never exit and this will lead to a deadlock. - The
ctx
is passed to the goroutine, and if it timeouts, the goroutine exits. - The
comma ok
idiom is used to check if the channel is closed. Without it, the goroutine will never know that the channel is closed, thus thecase number := <- commChannel:
will always trigger first, and the for loop will continue indefinitely. Reading from a closed channel is always successful and returns the zero value of the channel type.
- Reading from an open unbuffered or empty buffered channel is always blocking.
- Writing to an open unbuffered or full buffered channel is always blocking.
- Reading from a closed channel is always non-blocking, successful and returns a zero value of the channel type.
- Writing to a closed channel panics.
- Closing a closed or nil (unitialized) channel panics (use
sync.Once
to close a channel only once). - Always use the
comma ok
idiom inside a select statement to check if the channel is closed, to be on a safe side. - The select statement is undeterministic, so the order of the cases isn't guaranteed.
- The select statement chooses one ready case pseudo-randomly when multiple cases are ready, but doesn't guarantee equal probability, so it may lead to condition starvation.
- The select statement becomes non-blocking when there is a default case.
Since the select statement is undeterministic, it can lead to a situation when one of the cases is never triggered. This is called "condition starvation", and can lead to a go routine becoming unresponsive. To avoid this, a nested select statement is used:
func someWorker(ctx context.Context, aVeryBusyChannel chan int) {for {select {case <- ctx.Done():returncase val, ok := <- aVeryBusyChannel:if !ok {return}doSomeHeavyWork(val)select {case <- ctx.Done(): // always checkreturndefault: // move on}}}}
An empty default case in a select statement inside an endless for
loop is a CPU trasher:
for {select {case data := <-ch:// process datacase <-ctx.Done():returndefault:// Empty default - if ch and ctx aren't ready,// this will spin in a tight loop consuming CPU!}}
Don't use the default case in a select statement inside an endless for loop.
Avoid adding a delay in the default case either (in attempt to reduce CPU usage), it's a bad pattern and can lead to delayed exit of the go routine. Having such a delay usually signals that you don't need the default case at all.
If you need to drain a channel for leftovers before shutting the application down, you need to make sure the worker does that upon exiting.
type Service struct {tasksChannel chan Taskdone chan struct{}}func (s *Service) doWork(ctx context.Context, aVeryBusyChannel chan int) {defer func() {// drain remaining tasks if needed during shutdownfor {select {case task := <-s.tasksChannel:s.processTask(task)default:return}}}()for {select {case <- ctx.Done():returncase <- s.done:returncase task, ok := <- s.tasksChannel:if !ok {return}s.processTask(task)select {case <- ctx.Done(): // check if the context is donereturncase <- s.done: // check if the service is exitingreturndefault:}}}}
It's also a good practice to add an exit channel (s.done
) to the service, to be able to exit the service from the main process without relying on the context.
In order to trigger this situation, just close the s.done
channel.
That's all, folks!
As usual, the code of the kata is available here. Have fun!
