
Production ready service for face blurring with Go and React. Part 2: Implementation
In the previous article we have outlined the architecture of the service, now it's time to implement it. I won't go too much into details, but I'll try to cover the most important parts.
This article describes also how I would have implemented a similar service from scratch if I had to.
As it was mentioned, this cloud-native application consists of 3 services:
- API service - written in Go, it provides REST API for uploading images, getting results and checking the status of the job.
- Worker service - written in Go as well, it performs the actual face detection and blurring.
- Web app - written in React and TypeScript, it allows to upload images and see the results.
Let me "briefly" cover the code structure, the choice of technologies and other boring stuff first, before we get to the juicy part.
Let's briefly cover the main components of the application. I won't include any code snippets, otherwise the article becomes too long.
Logically, the code of the backend services follow the dependency injection pattern and principles of Hexagonal Architecture.
Basically each component of the application (be that a logger, an auth provider, a database client, etc.) is either a service, a repository or a converter (adapter). The parts implement implicit interfaces and depend on each other through them. The dependencies are created using the Factory method pattern, like in Angular or NestJS.
This setup makes the system loosely coupled, modular and easily composable. At any time any dependency can be swapped with a mock or stub, which makes the system easily testable.
More about this way of organizing the code you can read in my other article "Dependency management for the dependency injection pattern: a Golang kata".
The service exposes the following REST endpoints:
POST /v1/image/upload-url/get
- get a presigned URL for uploading an image.POST /v1/image/submit
- submits an image for processing.POST /v1/image/list
- get the jobs and their statuses.
Note that all endpoints have the POST method, it helps avoiding problems with support of HTTP body and limitations of query parameters. Also note, that every path begins with the version number and ends with a verb, which makes the whole setup close to RPC-style API. It's a reasonable approach unless the API isn't publicly advertised.
The API is declared using Protobuf v3 and compiled into Go and TypeScript using Protoc and Protoweb respectively, see the root Makefile. Protoweb is a tool I wrote specifically to generate code runnable in a browser.
In this setup the gRPC methods are available at the same time, and HTTP endpoints are exposed using an internal in-app proxy. That's not a very highload setup, I know, but for this task it's okay.
Besides the API itself, the /liveness
and /healthcheck
endpoins are exposed, in order to inform the orchestrator about the status of the service.
Every well established project has a dedicated authentication microservice instead of logic built into each API. For my project I've decided to go with Auth0. Auth0 is a great auth provider that offers a generous free tier.
Authentication is covered by the auth service, which is a wrapper around the Auth0 client.
On the backend there are two middlewares written to accommodate the auth logic.
ExtractToken
- extracts the token from the request.ValidateToken
- validates the token.
Upon validation, the user serivce is used to pull the user data into the request context. The registration pipleine isn't implemented, because it involves implementing a webhook that can't be exposed from the local machine without tunneling.
The configuration is pulled from the environment variables using the config service, which is a wrapper around envconfig. There isn't any additional magic to it, except that the service is easily mockable thanks to the dependency injection pattern mentioned above.
Custom errors are used in the project. The errors introduce error codes that can be easily mapped to the HTTP status codes and log levels. The errors support Wrapping and Unwrapping, and capturing the stack trace. They implement the standard error interface, so they are returnable by any function that returns a generic error.
Read more about this way of error handling in my other article "Handling errors in a Go based microservice".
The logger is just another injectable service, a wrapper around standard log/slog.
It has a useful LogError()
method that converts a custom error into a log entry and maps the severity to the log level.
The database is chosen to be Postgres. The logic of interacting with the database is implemented in a set of repositories, injectable into the services. The repositories use gorm.io/gorm to interact with the database. The names of the methods are somewhat uninfied, limited to Create, Update, Get, List, Count, Delete. Each method supports working with transactions and thus accepts a transaction handle. The transactions are managed on the service level.
The business logic is held within the domain services. One service can use another service or a repository as a dependency. The domain service operate with the domain level structs, the structs are not aware of the database or the API. It's a task of the database or API level structures to convert themeselves to the domain level structures, but not the other way around.
This approach ensures good horizontal separation of concerns and isolation of the business logic from the rest of the application.
The observability is implemented with an injectable monitoring service, that wraps the Open Telemetry libraries for Go
and exposes metrics in the Prometheus format via the /metrics
endpoint.
The service provides simple functions to log counters, gauges and histograms.
The spans are not exported.
Websockets are used to notify the client when an image is processed. The logic of the websocket server could have been a service too, but actually it's more of an adapter, and it also has a state, so I've placed it to the network package.
The github.com/gorilla/websocket is used as an underlying websocket library, and a typical /ws
endpoint is exposed. For every connection a new goroutine is started.
The goroutine is also associated with two channels: for the incoming and the outgoing messages. The references to the channels are stored in the connection object, and the object itself is placed into a pool.
This allows outgoing message multicast to several clients at once, when needed.
There is also a notable mechanism of checking the authentication. In case of regular API, the authentication is checked before serving each new incoming request, and the process is thus finite and deterministic. In case of websockets, the connection can be held potentially indefinitely, so there may be situations when a client looses credibility while the connection remains alive.
To remedy this, a client is obliged to send a periodic heartbeat message to the server, with a valid JWT token attached. If the token isn't valid anymore, or the client stopped transmitting the message, the connection is dropped. The first message the client transmits is also expected to be the token message, as the Websocket protocol doesn't allow sending the token via HTTP headers.
This part was somewhat tricky to build, so I'll share the code here for self-indulgence.
package networkimport ("context""fmt""net/http""sync""time""github.com/google/uuid""github.com/gorilla/websocket""github.com/samber/lo""google.golang.org/protobuf/encoding/protojson""google.golang.org/protobuf/types/known/timestamppb""backend/interfaces""backend/internal/domain"ctxUtil "backend/internal/util/ctx""backend/internal/util/logger""backend/internal/util/syserr""backend/internal/util/types"payloadV1 "backend/proto/websocket/payload/server/v1"v1 "backend/proto/websocket/v1")const (TokenProvisionTimeout = 3)type WebsocketConnection struct {id uuid.UUIDuserID *uuid.UUIDuserSup *stringoutgoingChan chan *v1.ServerMessageincomingChan chan incomingMessageexpiresAt *time.TimewsConnection *websocket.Conn}func (c *WebsocketConnection) IsValid() bool {return c.expiresAt.After(time.Now())}func (c *WebsocketConnection) NotValid() <-chan struct{} {done := make(chan struct{})if c.expiresAt == nil {close(done)return done}duration := time.Until(*c.expiresAt)if duration <= 0 {close(done)return done}go func() {time.Sleep(duration)close(done)}()return done}func (c *WebsocketConnection) Close() error {types.CloseChannelSafely(c.outgoingChan)types.CloseChannelSafely(c.incomingChan)err := c.wsConnection.Close()if err != nil {return syserr.Wrap(err, "could not close websocket connection")}return nil}func (c *WebsocketConnection) GetKey() string {return fmt.Sprintf("%s-%s", c.userID.String(), c.id.String())}type incomingMessage struct {messageType intmessage []byteerr error}type WebsocketServer struct {configService interfaces.ConfigServiceauthService interfaces.AuthServiceloggerService interfaces.LoggerServiceuserService interfaces.UserServiceeventBusService interfaces.EventBusServiceconnections sync.Map}func NewWebsocketServer(configService interfaces.ConfigService,authService interfaces.AuthService,loggerService interfaces.LoggerService,userService interfaces.UserService,eventBusService interfaces.EventBusService,) *WebsocketServer {return &WebsocketServer{configService: configService,authService: authService,loggerService: loggerService,userService: userService,eventBusService: eventBusService,}}func (s *WebsocketServer) Start(ctx context.Context) error {callback := func(event *domain.EventBusEvent) {s.loggerService.Info(ctx, "websocket: new event received", logger.F("event", event))err := s.DispatchMessage(ctx, event)if err != nil {s.loggerService.LogError(ctx, syserr.Wrap(err, "could not dispatch event"))}}err := s.eventBusService.AddEventListener(domain.EventBusEventTypeImageProcessed, callback)if err != nil {return syserr.Wrap(err, "could not start listening to events")}select {case <-ctx.Done():return nil}}func (s *WebsocketServer) Stop() error {// todo: call s.eventBusService.RemoveEventListener()return nil}func (s *WebsocketServer) DispatchMessage(ctx context.Context, event *domain.EventBusEvent) error {s.connections.Range(func(key, value interface{}) bool {connection, ok := value.(*WebsocketConnection)if !ok {return true}recepientID, payload, err := s.convertEventToOutgoingMessage(event)if err != nil {s.loggerService.LogError(ctx, syserr.Wrap(err, "could not convert event to outgoing message"))return true}if connection.userID != nil && connection.userID.String() == *recepientID {connection.outgoingChan <- payload}return true})return nil}// todo: move it outfunc (s *WebsocketServer) convertEventToOutgoingMessage(event *domain.EventBusEvent) (recepientID *string, payload *v1.ServerMessage, err error) {switch event.Type {case domain.EventBusEventTypeImageProcessed:eventPayload := event.Payload.(*domain.EventBusEventPayloadImageProcessed)recepientID = lo.ToPtr(eventPayload.CreatorID)payload = &v1.ServerMessage{Timestamp: timestamppb.Now(),PayloadVersion: "v1",Type: v1.ServerMessageType_SERVER_MESSAGE_TYPE_IMAGE_PROCESSED,Payload: &v1.ServerMessage_ImageProcessed{ImageProcessed: &payloadV1.ImageProcessed{},},}default:return nil, nil, syserr.NewBadInput("unknown event type")}return recepientID, payload, nil}func (s *WebsocketServer) GetHandler() types.HTTPHandler {return func(w http.ResponseWriter, r *http.Request) error {// todo: record error with defer, if neededctx, cancel := context.WithCancel(r.Context())defer cancel()upgrader, err := s.getUpgrader()if err != nil {return syserr.Wrap(err, "could not get upgrader")}conn, err := upgrader.Upgrade(w, r, nil)if err != nil {return syserr.Wrap(err, "could not upgrade connection")}connection := &WebsocketConnection{id: uuid.New(),userID: nil, // user will be assigned lateroutgoingChan: make(chan *v1.ServerMessage),incomingChan: s.createIncomingChannel(ctx, conn, cancel),expiresAt: nil, // expiration will be assigned laterwsConnection: conn,}defer func() {err := s.closeAndRemoveConnection(connection)if err != nil {s.loggerService.LogError(ctx, syserr.Wrap(err, "could not close and remove connection"))} else {s.loggerService.Info(ctx, "connection closed and removed")}}()s.loggerService.Info(ctx, "websocket is waiting for the token")user, err := s.runHandshake(ctx, connection)if err != nil {return syserr.Wrap(err, "could not conduct handshake")}ctx = ctxUtil.WithUser(ctx, *user)err = s.addConnection(connection)if err != nil {return syserr.Wrap(err, "error adding a connection")}return s.serveConnection(ctx, connection)}}func (s *WebsocketServer) serveConnection(ctx context.Context, connection *WebsocketConnection) error {for {select {case <-ctx.Done():return syserr.Wrap(context.Canceled, "context is done, closing the connection")case <-connection.NotValid():s.loggerService.Warning(ctx, "the token was not updated in time, closing the connection")return nilcase message := <-connection.outgoingChan:data, err := protojson.MarshalOptions{EmitUnpopulated: true,}.Marshal(message)if err != nil {s.loggerService.LogError(ctx, syserr.Wrap(err, "error marshaling a message"))continue}err = connection.wsConnection.WriteMessage(websocket.TextMessage, data)if err != nil {s.loggerService.LogError(ctx, syserr.Wrap(err, "error sending a message"))}case message := <-connection.incomingChan:switch message.messageType {case websocket.TextMessage:err := s.processMessage(ctx, connection, message.message)if err != nil {s.loggerService.LogError(ctx, syserr.Wrap(err, "error processing an incoming message"))}case websocket.CloseMessage:s.loggerService.Info(ctx, "close message received, closing the connection")return nil}}}}func (s *WebsocketServer) runHandshake(ctx context.Context, connection *WebsocketConnection) (*domain.User, error) {for {select {case <-ctx.Done():return nil, syserr.Wrap(context.Canceled, "ctx done, exiting")case message := <-connection.incomingChan:s.loggerService.Info(ctx, "token message received")protoMessage, err := s.unmarshalMessage(&message)if err != nil {return nil, syserr.WrapAs(err, syserr.BadInputCode, "could not decode message")}if protoMessage.GetType() != v1.ClientMessageType_CLIENT_MESSAGE_TYPE_TOKEN_UPDATE {return nil, syserr.WrapAs(err, syserr.BadInputCode, "first message is not of token update, closing the connection")}token := protoMessage.GetTokenUpdate().GetToken()sup, expiry, err := s.authService.ValidateToken(ctx, token)if err != nil {return nil, syserr.NewBadInput("could not validate token")}user, err := s.userService.GetUserBySUP(ctx, nil, sup)if err != nil {return nil, syserr.Wrap(err, "could not get user by sup")}connection.userID = &user.IDconnection.userSup = &user.Supconnection.expiresAt = lo.ToPtr(time.Unix(expiry, 0))return user, nilcase <-time.After(time.Second * TokenProvisionTimeout):return nil, syserr.NewBadInput("no token provided before the timeout, closing the connection")}}}func (s *WebsocketServer) getUpgrader() (*websocket.Upgrader, error) {config, err := s.configService.GetConfig()if err != nil {return nil, syserr.Wrap(err, "could not get config")}return &websocket.Upgrader{ReadBufferSize: 1024,WriteBufferSize: 1024,CheckOrigin: func(r *http.Request) bool {allowedOrigins := config.Backend.HTTP.Cors.Originif lo.Contains(allowedOrigins, "*") {return true}return lo.Contains(allowedOrigins, r.Header.Get("Origin"))},}, nil}func (s *WebsocketServer) addConnection(connection *WebsocketConnection) error {key := connection.GetKey()_, hasKey := s.connections.LoadOrStore(key, connection)if hasKey {return syserr.NewInternal("connection already registered", syserr.F("user_id", connection.userID))}return nil}func (s *WebsocketServer) closeAndRemoveConnection(connection *WebsocketConnection) error {err := connection.Close()if err != nil {return syserr.Wrap(err, "could not close connection")}if connection.userID != nil {key := connection.GetKey()s.connections.Delete(key)}return nil}func (s *WebsocketServer) unmarshalMessage(message *incomingMessage) (*v1.ClientMessage, error) {var protoMessage v1.ClientMessageerr := protojson.Unmarshal(message.message, &protoMessage)if err != nil {return nil, syserr.WrapAs(err, syserr.BadInputCode, "could not unmarshal message")}return &protoMessage, nil}func (s *WebsocketServer) createIncomingChannel(ctx context.Context, conn *websocket.Conn, cancel context.CancelFunc) chan incomingMessage {messageChan := make(chan incomingMessage)go func() {for {if ctxUtil.IsDone(ctx) {types.CloseChannelSafely(messageChan)return}messageType, message, err := conn.ReadMessage()if err != nil {s.loggerService.LogError(ctx, syserr.Wrap(err, "could not read incoming message"))types.CloseChannelSafely(messageChan)cancel()return}messageChan <- incomingMessage{messageType: messageType,message: message,}}}()return messageChan}func (s *WebsocketServer) processMessage(ctx context.Context, connection *WebsocketConnection, payload []byte) error {var protoMessage v1.ClientMessageerr := protojson.Unmarshal(payload, &protoMessage)if err != nil {return err}switch protoMessage.GetType() {case v1.ClientMessageType_CLIENT_MESSAGE_TYPE_TOKEN_UPDATE:err = s.processTokenUpdateMessage(ctx, connection, &protoMessage)// todo: add more types here when neededdefault:err = syserr.NewBadInput("unrecognised message, skipped", syserr.F("payload", string(payload)))}return err}func (s *WebsocketServer) processTokenUpdateMessage(ctx context.Context, connection *WebsocketConnection, protoMessage *v1.ClientMessage) error {token := protoMessage.GetTokenUpdate().GetToken()sup, expiry, err := s.authService.ValidateToken(ctx, token)if err != nil {return syserr.NewBadInput("could not validate token")}if *connection.userSup != sup {return syserr.NewBadInput("user in the token is different from the connection owner")}connection.expiresAt = lo.ToPtr(time.Unix(expiry, 0))s.loggerService.Info(ctx, "token message was processed")return nil}
Besides Websockets, an alternative technology of WebRTC could have been considered. There is also the Server Sent Events (SSE) technology, but this one is mostly used for the one-way streaming of massive response data, and in my opinion doesn't really work well with long-lasting and full duplex connections.
Since we have more than one service, we need to have an effective way of async communication between them. The event bus is a good way to achieve this.
The event bus service is a wrapper around RabbitMQ. A topic is created, and all the services subscribe to it. The event dispatching can be invoked from any part of the codebase, and as long as a recepient is subscribed to the topic, it will get the message.
The main() function of the service is an entry point. It initializes the services and starts the server. It also handles the graceful shutdown on SIGTERM and SIGINT signals, letting the services to clean up their resources and close the connections. The function is also responsible for returning the non-zero exit code in case of an error, so the container can be restarted by the orchestrator.
🤖 Geez, this AI-assisted writing produces sheer paragraphs of text and is definitely getting out of hand. It feels like a book already. Kudos if you've read this far.
The web dashboard is a simple React app based on CreateReactApp (as of Feb 2025 sadly deprecated) and Craco.
Talking to the API is made using react-query. The library runs the service functions generated from the protobuf definitions using Protoweb, as already mentioned.
The application itself is pretty basic, and the only two prominent features are the authentication and working with websockets.
The authentication is built using @auth0/auth0-react library and wrapped into a context provider. There is a widget in the top bar that shows the currently authenticated user, where the user can also sign-out. The user can sign-in using the Google provider.
The websockets are based on the react-use-websocket library. Whenever the connection fails, the library makes sure it's restored. The logic also dispatches the access token periodically, as the backend requires. The token is taken from the local storage through the @auth0/auth0-react library mentioned above.
Everything mentioned so far is basically a boilerplate and fuss necessary to accommodate the core of the application - the worker service. Let's dive into it deeper.
Technically speaking, the worker service inherits a lot from the API service in terms of code structure, logging, error handling and so on.
❓ Why is the worker even needed? In order to process an image we need to do two things:
- Detect the faces in the image.
- Blur the faces.
These are two pretty heavy operations, and in order for the service to be reliable and scalable, that cannot be done on a user request syncronously. The image processing must be asynchronous (which means a user will have to wait longer), and the worker service is responsible for that. Depending on the load, the worker can scale horizontally, consuming the incoming load using partitioning.
❓ Why is the job queue needed?
Todo: talk about the job queue and why it can'be substituted with the message queue.
As we can see, the backend application contains a lot of features, and all of them are essential for a production-ready service to function as expected.
The application is not containerized, there are no Dockerfiles. Also, the application needs to be deployed to an orchestrator, like Kubernetes. For that, corresponding infrastructure setup must be done, perferably following the infra-as-code approach. The CICD must be put in place, to build new images and push them to the container registry. The deployment process must be automated using tools like Spinnaker or ArgoCD.
This was a nice project to work on. It covers a lot of topics and technologies relevant to modern cloud-native applications and fullstack development. Certainly, there is still a lot to talk about, like DevOps, CI/CD, security and so on, but it's a topic for another article.
The code is, as always, available on GitHub. Enjoy!
