Google PubSub and Go: a Golang kata
Time for a new Go kata, a really brief one! This time, I've decided to refresh my knowledge on the Pub/Sub service of GCP.
The Pub/Sub service is extremely popular to enable indirect communication between microservices hosted in the GCP cloud. It works like a chat, but for applications. If a service wants to communicate, it sends a message to a specific topic. Other services, should they be interested in receiving messages, subscribe to that topic. A subscription can have a filter based on message attributes.
A few notes on the way incoming events are handled. Most of the time, we just create an event handler that receives a message, does some work and exits. While in some cases this may seem fine, there are many caveats and pitfalls actually:
- Concurrency safety. What happens if two identical (or similar) events arrive at the same time? The events are typically retrieved by the underlying library in bulks and then thrown into a bunch of parallel workers. In this case a race condition can easily occur, especially if the handler relies on the current database state.
- When you decide to get rid of parallel processing and execute events one-by-one to avoid race conditions, you are risking to cause event pile-up: the system will start processing events slower than they arrive.
- In certain situations, when the load on the database is too high, you may want to consider throttling the consumer. But beware of pile-ups.
- If you are certain that data the handler receives can be split onto non-intersecting sub-sets, you can use go channels in combination with partitioning. E.g. all sample product SKUs you send to channel samples, the others you send to channel regular. Then within each bucket the events will be processed one-after-another, but you will still have parallel processing in the system.
- When you channel similar events to go channels, you can also implement debouncing and merge several small events into a bigger chunk. This can help to eliminate double work and optimise database queries, effectively improving the performance.
At the end of the day the most important thing here is to "think concurrently" and find good architecture for every particular case.
Anyway.
For this kata I haven't gone too much into abstractions, everything remains in just a single main() function. Also, I have the resources created by hand, not with Terraform, for I wanted this kata to be fully dedicated to Go.
There are two ways working with Google Pub/Sub locally: with an emulator, or using the actual cloud. While with the emulator it is more or less clear, when using the cloud, a service account (basically unmanned account) must be created. There are millions of articles on how to do it, here is just the very first one I stumbled upon.
After the account is created, download the JSON credentials file and place it somewhere on your local machine. You are gonna need it.
The service is really simple, here it is:
package mainimport ("context""log""os"googlePubSub "cloud.google.com/go/pubsub""google.golang.org/api/option")func main() {projectID := os.Getenv("PROJECT_ID")topicID := os.Getenv("TOPIC_ID")subscriptionID := os.Getenv("SUBSCRIPTION_ID")credentialsFile := os.Getenv("CREDENTIALS_FILE")credentials, err := os.ReadFile(credentialsFile)ctx := context.Background()ctx, cancelCtx := context.WithCancel(ctx)pubSubClient, err := googlePubSub.NewClient(ctx, projectID, option.WithCredentialsJSON(credentials))if err != nil {panic(err)}topic := pubSubClient.Topic(topicID)attributes := map[string]string{"Hello": "There",}message := &googlePubSub.Message{Data: []byte("Hello!"),Attributes: attributes,}publishResult := topic.Publish(ctx, message)messageID, err := publishResult.Get(ctx)if err != nil {panic(err)}log.Printf("Message was published with id %s", messageID)subscription := pubSubClient.Subscription(subscriptionID)err = subscription.Receive(ctx, func(receiveCtx context.Context, message *googlePubSub.Message) {log.Printf("Message received %s", string(message.Data))message.Ack()cancelCtx()})log.Printf("Done")if err != nil {panic(err)}}
What it does is the following:
- First, it reads the content of the JSON credentials file.
- Then, it makes an instance of the Pub/Sub client, and after that makes a reference to the topic. The topic itself must be created beforehand, either with Terraform or via the GCP web interface! It is a resource.
- Then it makes a message and publishes it to the topic.
- Shortly after, it creates a reference to a subscription. Just like with the topic, the subscription must be explicitly provisioned, as it's a resource too. I used the one that was automatically created when I made the topic.
- Then we wait for a message to arrive. As soon as we have it, we acknowledge it. Acknowledgment tells GCP the message was successfully consumed and now should not be delivered anywhere else. If the message is not acknowledged (message.Nask()), it will be re-delivered immediately to another subscriber, with little delay, until the TTL expires. You should always either ask or nask a message.
- Inside of the callback we call cancelCtx as we don't want to receive more messages and wish to interrupt the execution.
Simple as it is.
You can read more about the Pub\Sub to understand it better. Here is a set of laconic yet informative articles: one, two, three.
Here goes my .env.local file, use it as a template:
PROJECT_ID=project-idTOPIC_ID=test-topicSUBSCRIPTION_ID=test-topic-subCREDENTIALS_FILE=/Users/my-user/credentials.json
And the Makefile:
run:@godotenv -f ./.env.local go run ./cmd/main.go
That's all, folks!
As usual, the code of the kata is available here. Have fun!
Sergei Gannochenko
Golang, React, TypeScript, Docker, AWS, Jamstack.
20+ years in dev.