
Production ready service for face blurring with Go and React. Part 2: Implementation
Articles in this series
In the previous article we have outlined the architecture of the service, now it's time to implement it. I won't go too much into details, but I'll try to cover the most important parts.
This article describes also how I would have implemented a similar service from scratch if I had to.
As it was mentioned, this cloud-native application consists of 3 services:
- API service - written in Go, it provides REST API for uploading images, getting results and checking the status of the job.
- Worker service - written in Go as well, it performs the actual face detection and blurring.
- Web app - written in React and TypeScript, it allows to upload images and see the results.
Let me "briefly" cover the code structure, the choice of technologies and other boring stuff first, before we get to the juicy part.
Let's briefly cover the main components of the application. I won't include any code snippets, otherwise the article becomes too long.
Logically, the code of the backend services follow the dependency injection pattern and principles of Hexagonal Architecture.
Basically each component of the application (be that a logger, an auth provider, a database client, etc.) is either a service, a repository or a converter (adapter). The parts implement implicit interfaces and depend on each other through them. The dependencies are created using the Factory method pattern, like in Angular or NestJS.
This setup makes the system loosely coupled, modular and easily composable. At any time any dependency can be swapped with a mock or stub, which makes the system easily testable.
More about this way of organizing the code you can read in my other article "Dependency management for the dependency injection pattern: a Golang kata".
The service exposes the following REST endpoints:
POST /v1/image/upload-url/get
- get a presigned URL for uploading an image.POST /v1/image/submit
- submits an image for processing.POST /v1/image/list
- get the jobs and their statuses.
Note that all endpoints have the POST method, it helps avoiding problems with support of HTTP body and limitations of query parameters. Also note, that every path begins with the version number and ends with a verb, which makes the whole setup close to RPC-style API. It's a reasonable approach unless the API isn't publicly advertised.
The API is declared using Protobuf v3 and compiled into Go and TypeScript using Protoc and Protoweb respectively, see the root Makefile. Protoweb is a tool I wrote specifically to generate code runnable in a browser.
In this setup the gRPC methods are available at the same time, and HTTP endpoints are exposed using an internal in-app proxy. That's not a very highload setup, I know, but for this task it's okay.
Besides the API itself, the /liveness
and /healthcheck
endpoins are exposed, in order to inform the orchestrator about the status of the service.
Every well established project has a dedicated authentication microservice instead of logic built into each API. For my project I've decided to go with Auth0. Auth0 is a great auth provider that offers a generous free tier.
Authentication is covered by the auth service, which is a wrapper around the Auth0 client.
On the backend there are two middlewares written to accommodate the auth logic.
ExtractToken
- extracts the token from the request.ValidateToken
- validates the token.
Upon validation, the user serivce is used to pull the user data into the request context. The registration pipleine isn't implemented, because it involves implementing a webhook that can't be exposed from the local machine without tunneling.
The configuration is pulled from the environment variables using the config service, which is a wrapper around envconfig. There isn't any additional magic to it, except that the service is easily mockable thanks to the dependency injection pattern mentioned above.
Custom errors are used in the project. The errors introduce error codes that can be easily mapped to the HTTP status codes and log levels. The errors support Wrapping and Unwrapping, and capturing the stack trace. They implement the standard error interface, so they are returnable by any function that returns a generic error.
Read more about this way of error handling in my other article "Handling errors in a Go based microservice".
The logger is just another injectable service, a wrapper around standard log/slog.
It has a useful LogError()
method that converts a custom error into a log entry and maps the severity to the log level.
The database is chosen to be Postgres. The logic of interacting with the database is implemented in a set of repositories, injectable into the services. The repositories use gorm.io/gorm to interact with the database. The names of the methods are somewhat uninfied, limited to Create, Update, Get, List, Count, Delete. Each method supports working with transactions and thus accepts a transaction handle. The transactions are managed on the service level.
The business logic is held within the domain services. One service can use another service or a repository as a dependency. The domain service operate with the domain level structs, the structs are not aware of the database or the API. It's a task of the database or API level structures to convert themeselves to the domain level structures, but not the other way around.
This approach ensures good horizontal separation of concerns and isolation of the business logic from the rest of the application.
The observability is implemented with an injectable monitoring service, that wraps the Open Telemetry libraries for Go
and exposes metrics in the Prometheus format via the /metrics
endpoint.
The service provides simple functions to log counters, gauges and histograms.
The spans are not exported.
Websockets are used to notify the client when an image is processed. The logic of the websocket server could have been a service too, but actually it's more of an adapter, and it also has a state, so I've placed it to the network package.
The github.com/gorilla/websocket is used as an underlying websocket library, and a typical /ws
endpoint is exposed. For every connection a new goroutine is started.
The goroutine is also associated with two channels: for the incoming and the outgoing messages. The references to the channels are stored in the connection object, and the object itself is placed into a pool.
This allows outgoing message multicast to several clients at once, when needed.
There is also a notable mechanism of checking the authentication. In case of regular API, the authentication is checked before serving each new incoming request, and the process is thus finite and deterministic. In case of websockets, the connection can be held potentially indefinitely, so there may be situations when a client looses credibility while the connection remains alive.
To remedy this, a client is obliged to send a periodic heartbeat message to the server, with a valid JWT token attached. If the token isn't valid anymore, or the client stopped transmitting the message, the connection is dropped. The first message the client transmits is also expected to be the token message, as the Websocket protocol doesn't allow sending the token via HTTP headers.
This part was somewhat tricky to build, so I'll share the code here for self-indulgence.
package networkimport ("context""fmt""net/http""sync""time""github.com/google/uuid""github.com/gorilla/websocket""github.com/samber/lo""google.golang.org/protobuf/encoding/protojson""google.golang.org/protobuf/types/known/timestamppb""backend/interfaces""backend/internal/domain"ctxUtil "backend/internal/util/ctx""backend/internal/util/logger""backend/internal/util/syserr""backend/internal/util/types"payloadV1 "backend/proto/websocket/payload/server/v1"v1 "backend/proto/websocket/v1")const (TokenProvisionTimeout = 3)type WebsocketConnection struct {id uuid.UUIDuserID *uuid.UUIDuserSup *stringoutgoingChan chan *v1.ServerMessageincomingChan chan incomingMessageexpiresAt *time.TimewsConnection *websocket.Conn}func (c *WebsocketConnection) IsValid() bool {return c.expiresAt.After(time.Now())}func (c *WebsocketConnection) NotValid() <-chan struct{} {done := make(chan struct{})if c.expiresAt == nil {close(done)return done}duration := time.Until(*c.expiresAt)if duration <= 0 {close(done)return done}go func() {time.Sleep(duration)close(done)}()return done}func (c *WebsocketConnection) Close() error {types.CloseChannelSafely(c.outgoingChan)types.CloseChannelSafely(c.incomingChan)err := c.wsConnection.Close()if err != nil {return syserr.Wrap(err, "could not close websocket connection")}return nil}func (c *WebsocketConnection) GetKey() string {return fmt.Sprintf("%s-%s", c.userID.String(), c.id.String())}type incomingMessage struct {messageType intmessage []byteerr error}type WebsocketServer struct {configService interfaces.ConfigServiceauthService interfaces.AuthServiceloggerService interfaces.LoggerServiceuserService interfaces.UserServiceeventBusService interfaces.EventBusServiceconnections sync.Map}func NewWebsocketServer(configService interfaces.ConfigService,authService interfaces.AuthService,loggerService interfaces.LoggerService,userService interfaces.UserService,eventBusService interfaces.EventBusService,) *WebsocketServer {return &WebsocketServer{configService: configService,authService: authService,loggerService: loggerService,userService: userService,eventBusService: eventBusService,}}func (s *WebsocketServer) Start(ctx context.Context) error {callback := func(event *domain.EventBusEvent) {s.loggerService.Info(ctx, "websocket: new event received", logger.F("event", event))err := s.DispatchMessage(ctx, event)if err != nil {s.loggerService.LogError(ctx, syserr.Wrap(err, "could not dispatch event"))}}err := s.eventBusService.AddEventListener(domain.EventBusEventTypeImageProcessed, callback)if err != nil {return syserr.Wrap(err, "could not start listening to events")}select {case <-ctx.Done():return nil}}func (s *WebsocketServer) Stop() error {// todo: call s.eventBusService.RemoveEventListener()return nil}func (s *WebsocketServer) DispatchMessage(ctx context.Context, event *domain.EventBusEvent) error {s.connections.Range(func(key, value interface{}) bool {connection, ok := value.(*WebsocketConnection)if !ok {return true}recepientID, payload, err := s.convertEventToOutgoingMessage(event)if err != nil {s.loggerService.LogError(ctx, syserr.Wrap(err, "could not convert event to outgoing message"))return true}if connection.userID != nil && connection.userID.String() == *recepientID {connection.outgoingChan <- payload}return true})return nil}// todo: move it outfunc (s *WebsocketServer) convertEventToOutgoingMessage(event *domain.EventBusEvent) (recepientID *string, payload *v1.ServerMessage, err error) {switch event.Type {case domain.EventBusEventTypeImageProcessed:eventPayload := event.Payload.(*domain.EventBusEventPayloadImageProcessed)recepientID = lo.ToPtr(eventPayload.CreatorID)payload = &v1.ServerMessage{Timestamp: timestamppb.Now(),PayloadVersion: "v1",Type: v1.ServerMessageType_SERVER_MESSAGE_TYPE_IMAGE_PROCESSED,Payload: &v1.ServerMessage_ImageProcessed{ImageProcessed: &payloadV1.ImageProcessed{},},}default:return nil, nil, syserr.NewBadInput("unknown event type")}return recepientID, payload, nil}func (s *WebsocketServer) GetHandler() types.HTTPHandler {return func(w http.ResponseWriter, r *http.Request) error {// todo: record error with defer, if neededctx, cancel := context.WithCancel(r.Context())defer cancel()upgrader, err := s.getUpgrader()if err != nil {return syserr.Wrap(err, "could not get upgrader")}conn, err := upgrader.Upgrade(w, r, nil)if err != nil {return syserr.Wrap(err, "could not upgrade connection")}connection := &WebsocketConnection{id: uuid.New(),userID: nil, // user will be assigned lateroutgoingChan: make(chan *v1.ServerMessage),incomingChan: s.createIncomingChannel(ctx, conn, cancel),expiresAt: nil, // expiration will be assigned laterwsConnection: conn,}defer func() {err := s.closeAndRemoveConnection(connection)if err != nil {s.loggerService.LogError(ctx, syserr.Wrap(err, "could not close and remove connection"))} else {s.loggerService.Info(ctx, "connection closed and removed")}}()s.loggerService.Info(ctx, "websocket is waiting for the token")user, err := s.runHandshake(ctx, connection)if err != nil {return syserr.Wrap(err, "could not conduct handshake")}ctx = ctxUtil.WithUser(ctx, *user)err = s.addConnection(connection)if err != nil {return syserr.Wrap(err, "error adding a connection")}return s.serveConnection(ctx, connection)}}func (s *WebsocketServer) serveConnection(ctx context.Context, connection *WebsocketConnection) error {for {select {case <-ctx.Done():return syserr.Wrap(context.Canceled, "context is done, closing the connection")case <-connection.NotValid():s.loggerService.Warning(ctx, "the token was not updated in time, closing the connection")return nilcase message := <-connection.outgoingChan:data, err := protojson.MarshalOptions{EmitUnpopulated: true,}.Marshal(message)if err != nil {s.loggerService.LogError(ctx, syserr.Wrap(err, "error marshaling a message"))continue}err = connection.wsConnection.WriteMessage(websocket.TextMessage, data)if err != nil {s.loggerService.LogError(ctx, syserr.Wrap(err, "error sending a message"))}case message := <-connection.incomingChan:switch message.messageType {case websocket.TextMessage:err := s.processMessage(ctx, connection, message.message)if err != nil {s.loggerService.LogError(ctx, syserr.Wrap(err, "error processing an incoming message"))}case websocket.CloseMessage:s.loggerService.Info(ctx, "close message received, closing the connection")return nil}}}}func (s *WebsocketServer) runHandshake(ctx context.Context, connection *WebsocketConnection) (*domain.User, error) {for {select {case <-ctx.Done():return nil, syserr.Wrap(context.Canceled, "ctx done, exiting")case message := <-connection.incomingChan:s.loggerService.Info(ctx, "token message received")protoMessage, err := s.unmarshalMessage(&message)if err != nil {return nil, syserr.WrapAs(err, syserr.BadInputCode, "could not decode message")}if protoMessage.GetType() != v1.ClientMessageType_CLIENT_MESSAGE_TYPE_TOKEN_UPDATE {return nil, syserr.WrapAs(err, syserr.BadInputCode, "first message is not of token update, closing the connection")}token := protoMessage.GetTokenUpdate().GetToken()sup, expiry, err := s.authService.ValidateToken(ctx, token)if err != nil {return nil, syserr.NewBadInput("could not validate token")}user, err := s.userService.GetUserBySUP(ctx, nil, sup)if err != nil {return nil, syserr.Wrap(err, "could not get user by sup")}connection.userID = &user.IDconnection.userSup = &user.Supconnection.expiresAt = lo.ToPtr(time.Unix(expiry, 0))return user, nilcase <-time.After(time.Second * TokenProvisionTimeout):return nil, syserr.NewBadInput("no token provided before the timeout, closing the connection")}}}func (s *WebsocketServer) getUpgrader() (*websocket.Upgrader, error) {config, err := s.configService.GetConfig()if err != nil {return nil, syserr.Wrap(err, "could not get config")}return &websocket.Upgrader{ReadBufferSize: 1024,WriteBufferSize: 1024,CheckOrigin: func(r *http.Request) bool {allowedOrigins := config.Backend.HTTP.Cors.Originif lo.Contains(allowedOrigins, "*") {return true}return lo.Contains(allowedOrigins, r.Header.Get("Origin"))},}, nil}func (s *WebsocketServer) addConnection(connection *WebsocketConnection) error {key := connection.GetKey()_, hasKey := s.connections.LoadOrStore(key, connection)if hasKey {return syserr.NewInternal("connection already registered", syserr.F("user_id", connection.userID))}return nil}func (s *WebsocketServer) closeAndRemoveConnection(connection *WebsocketConnection) error {err := connection.Close()if err != nil {return syserr.Wrap(err, "could not close connection")}if connection.userID != nil {key := connection.GetKey()s.connections.Delete(key)}return nil}func (s *WebsocketServer) unmarshalMessage(message *incomingMessage) (*v1.ClientMessage, error) {var protoMessage v1.ClientMessageerr := protojson.Unmarshal(message.message, &protoMessage)if err != nil {return nil, syserr.WrapAs(err, syserr.BadInputCode, "could not unmarshal message")}return &protoMessage, nil}func (s *WebsocketServer) createIncomingChannel(ctx context.Context, conn *websocket.Conn, cancel context.CancelFunc) chan incomingMessage {messageChan := make(chan incomingMessage)go func() {for {if ctxUtil.IsDone(ctx) {types.CloseChannelSafely(messageChan)return}messageType, message, err := conn.ReadMessage()if err != nil {s.loggerService.LogError(ctx, syserr.Wrap(err, "could not read incoming message"))types.CloseChannelSafely(messageChan)cancel()return}messageChan <- incomingMessage{messageType: messageType,message: message,}}}()return messageChan}func (s *WebsocketServer) processMessage(ctx context.Context, connection *WebsocketConnection, payload []byte) error {var protoMessage v1.ClientMessageerr := protojson.Unmarshal(payload, &protoMessage)if err != nil {return err}switch protoMessage.GetType() {case v1.ClientMessageType_CLIENT_MESSAGE_TYPE_TOKEN_UPDATE:err = s.processTokenUpdateMessage(ctx, connection, &protoMessage)// todo: add more types here when neededdefault:err = syserr.NewBadInput("unrecognised message, skipped", syserr.F("payload", string(payload)))}return err}func (s *WebsocketServer) processTokenUpdateMessage(ctx context.Context, connection *WebsocketConnection, protoMessage *v1.ClientMessage) error {token := protoMessage.GetTokenUpdate().GetToken()sup, expiry, err := s.authService.ValidateToken(ctx, token)if err != nil {return syserr.NewBadInput("could not validate token")}if *connection.userSup != sup {return syserr.NewBadInput("user in the token is different from the connection owner")}connection.expiresAt = lo.ToPtr(time.Unix(expiry, 0))s.loggerService.Info(ctx, "token message was processed")return nil}
Besides Websockets, an alternative technology of WebRTC could have been considered. There is also the Server Sent Events (SSE) technology, but this one is mostly used for the one-way streaming of massive response data, and in my opinion doesn't really work well with long-lasting and full duplex connections.
Since we have more than one service, we need to have an effective way of async communication between them. The event bus is a good way to achieve this.
The event bus service is a wrapper around RabbitMQ. A topic is created, and all the services subscribe to it. The event dispatching can be invoked from any part of the codebase, and as long as a recepient is subscribed to the topic, it will get the message.
The main() function of the service is an entry point. It initializes the services and starts the server. It also handles the graceful shutdown on SIGTERM and SIGINT signals, letting the services to clean up their resources and close the connections. The function is also responsible for returning the non-zero exit code in case of an error, so the container can be restarted by the orchestrator.
🤖 Geez, this AI-assisted writing produces sheer paragraphs of text and is definitely getting out of hand. It feels like a book already. Kudos if you've read this far.
The web dashboard is a simple React app based on CreateReactApp (as of Feb 2025 sadly deprecated) and Craco.
Talking to the API is made using react-query. The library runs the service functions generated from the protobuf definitions using Protoweb, as already mentioned.
The application itself is pretty basic, and the only two prominent features are the authentication and working with websockets.
The authentication is built using @auth0/auth0-react library and wrapped into a context provider. There is a widget in the top bar that shows the currently authenticated user, where the user can also sign-out. The user can sign-in using the Google provider.
The websockets are based on the react-use-websocket library. Whenever the connection fails, the library makes sure it's restored. The logic also dispatches the access token periodically, as the backend requires. The token is taken from the local storage through the @auth0/auth0-react library mentioned above.
Everything mentioned so far is basically a boilerplate and fuss necessary to accommodate the core of the application - the worker service. Let's dive into it deeper.
Technically speaking, the worker service inherits a lot from the API service in terms of code structure, logging, error handling and so on.
The worker service contains a worker pool, which is responsible for image processing.
❓ Why are workers needed?
In order to process an image we need to do two things:
- Detect the faces in the image.
- Blur the faces.
These are two pretty heavy operations, and in order for the service to be reliable and scalable, that cannot be done on a user request syncronously. The image processing must be asynchronous (which means a user will have to wait longer), and the worker service is responsible for that.
Depending on the load, the worker pod can scale horizontally both instance-wise (using partitioning and load balancing) and cpu-wise (by increasing the capacity of the worker pool), griding the load faster.
❓ Why is the job queue needed?
These are three key properties for the job queue that we need:
- Exactly one delivery - the job queue must guarantee that each job is processed exactly once.
- Crash tolerance - the job queue must be able to handle the failures of the workers and the service itself, picking up the jobs where they left off.
In my opinion, message brokers are not the best fit for the job queue, due to:
- Order of delivery - message brokers do not guarantee the order of messages, which is crucial for the job queue.
- Single delivery - brokers can't guarantee a message is delivered exactly once.
However, I will receive jobs via RabbitMQ from the API pod, and store them in the database. Then workers then will start picking up the jobs for execution.
❓ Why building custom job queue?
Funny answer: every software engineer, besides planting a tree, building a house and raising a son should also build a website, a database wrapper and... a job queue.
Real answer: mainly due to: maintenance costs of a broker, unsufficient flexibility. But, ofcourse, any existing solution could have been used.
The code of the job queue is of special interest, that's why I post the listing:
package imageProcessorimport ("context""fmt""sync""sync/atomic""time""backend/interfaces""backend/internal/database""backend/internal/domain""backend/internal/util/logger""backend/internal/util/syserr"ctxUtil "backend/internal/util/ctx"imageUtil "backend/internal/util/image"typeUtil "backend/internal/util/types"otelMetric "go.opentelemetry.io/otel/metric""github.com/google/uuid""github.com/samber/lo")const (taskBufferThreshold = 5imageProcessingQueueBatchSize = 30meterName = "image_processor")type Service struct {configService interfaces.ConfigServiceeventBusService interfaces.EventBusServiceloggerService interfaces.LoggerServiceimageQueueRepository interfaces.ImageProcessingQueueRepositoryimageRepository interfaces.ImageRepositoryfaceDetectionService interfaces.FaceDetectionServicestorageService interfaces.StorageServicemonitoringService interfaces.MonitoringServicehasNewTasks atomic.BooltaskBuffer sync.Mapchannel chan database.ImageProcessingQueue}func NewImageProcessor(configService interfaces.ConfigService,eventBusService interfaces.EventBusService,loggerService interfaces.LoggerService,imageQueueRepository interfaces.ImageProcessingQueueRepository,imageRepository interfaces.ImageRepository,faceDetectionService interfaces.FaceDetectionService,storageService interfaces.StorageService,monitoringService interfaces.MonitoringService,) *Service {result := &Service{configService: configService,eventBusService: eventBusService,loggerService: loggerService,imageQueueRepository: imageQueueRepository,imageRepository: imageRepository,channel: make(chan database.ImageProcessingQueue),faceDetectionService: faceDetectionService,storageService: storageService,monitoringService: monitoringService,}result.hasNewTasks.Store(true) // upon startup check if there are some tasksreturn result}func (s *Service) Start(ctx context.Context) error {var wg sync.WaitGrouperr := s.init(ctx, &wg)if err != nil {return syserr.Wrap(err, "could not initialize")}callback := func(event *domain.EventBusEvent) {s.loggerService.Info(ctx, "worker: new event received", logger.F("event", event))s.hasNewTasks.Store(true)}err = s.eventBusService.AddEventListener(domain.EventBusEventTypeImageCreated, callback)if err != nil {return syserr.Wrap(err, "could not start listening to events")}wg.Add(1)go func(){defer wg.Done()for {select {case <-ctx.Done():err = syserr.Wrap(ctx.Err(), "context is done")returndefault:if s.hasNewTasks.Swap(false) {err = s.ProcessImages(ctx)if err != nil {s.loggerService.LogError(ctx, syserr.Wrap(err, "could not process images"))}time.Sleep(100 * time.Millisecond)}}}}()wg.Wait()return err}func (s *Service) ProcessImages(ctx context.Context) error {s.loggerService.Info(ctx, "processing images")for {select {case <-ctx.Done():return syserr.Wrap(ctx.Err(), "context is done")default:if typeUtil.GetSyncMapSize(&s.taskBuffer) < taskBufferThreshold {// the buffer is getting empty, let's add some itemsres, err := s.imageQueueRepository.List(ctx, nil, database.ImageProcessingQueueListParameters{Filter: &database.ImageProcessingQueueFilter{IsFailed: lo.ToPtr(false),IsCompleted: lo.ToPtr(false),},Pagination: &database.Pagination{PageNumber: 1,PageSize: imageProcessingQueueBatchSize,},})if err != nil {return syserr.Wrap(err, "could not list images")}s.loggerService.Info(ctx, "found images", logger.F("count", len(res)))if len(res) == 0 {// nothing left to doreturn nil}wasAdded := falsefor _, task := range res {if _, ok := s.taskBuffer.Load(task.ID); !ok {s.taskBuffer.Store(task.ID, task)wasAdded = trues.channel <- task}}if !wasAdded {// all tasks are already in the buffer, exitingreturn nil}}time.Sleep(time.Second)}}}func (s *Service) Stop() error {// todo: implement this// s.eventBusService.RemoveEventListener(domain.EventBusEventTypeImageCreated, callback)return nil}func (s *Service) init(ctx context.Context, wg *sync.WaitGroup) error {config, err := s.configService.GetConfig()if err != nil {return syserr.Wrap(err, "could not extract config")}workerPoolSize := config.Backend.Worker.ThreadCountif workerPoolSize < 1 {return syserr.NewInternal("invalid worker pool size", syserr.F("value", workerPoolSize))}for i := 0; i < workerPoolSize; i++ {wg.Add(1)go s.processImages(ctx, i, wg)}// init metrics.recordError(ctx, 0)s.recordSuccess(ctx, 0)s.recordDuration(ctx, 0)return nil}func (s *Service) processImages(ctx context.Context, workerId int, wg *sync.WaitGroup) {defer func(){wg.Done()s.loggerService.Info(ctx, fmt.Sprintf("worker %d exited", workerId), logger.F("workerId", workerId))}()s.loggerService.Info(ctx, fmt.Sprintf("worker %d started", workerId), logger.F("workerId", workerId))for {select {case <-ctx.Done():returncase task := <-s.channel:operationID := uuid.New().String()processCtx := ctxUtil.WithOperationID(ctx, operationID)err := s.processTask(processCtx, task)if err != nil {s.loggerService.LogError(processCtx, syserr.Wrap(err, "could not process task"))err = s.markTaskFailed(processCtx, task, operationID, err.Error())if err != nil {s.loggerService.LogError(processCtx, syserr.Wrap(err, "could not update image processing queue"))}err = s.markImageProcessed(processCtx, task.ImageID, true, nil)if err != nil {s.loggerService.LogError(processCtx, syserr.Wrap(err, "could not mark image processed"))}s.recordError(processCtx, 1)}s.taskBuffer.Delete(task.ID)}}}func (s *Service) processTask(processCtx context.Context, task database.ImageProcessingQueue) error {var err errorstartTime := time.Now()config, err := s.configService.GetConfig()if err != nil {return syserr.Wrap(err, "could not extract config")}s.loggerService.Info(processCtx, "processing image", logger.F("imageId", task.ID))operationID := ctxUtil.GetOperationID(processCtx)taskCtx, cancelTaskCtx := context.WithTimeout(processCtx, time.Second * 15)defer cancelTaskCtx()var detections []*domain.BoundingBoximageElement, err := s.imageRepository.GetByID(taskCtx, nil, task.ImageID)if err != nil {return syserr.Wrap(err, "could not get image")}if imageElement == nil {return syserr.NewInternal("image not found", syserr.F("id", task.ID))}image, err := imageUtil.DownloadImage(imageElement.OriginalURL)if err != nil {return syserr.Wrap(err, "could not download image")}detections, err = s.faceDetectionService.Detect(taskCtx, image)if err != nil {return syserr.Wrap(err, "could not detect faces")}if ctxUtil.IsTimeouted(taskCtx) {return syserr.Wrap(err, "context is done")}image, err = imageUtil.BlurBoxes(image, detections, 9.0)if err != nil {return syserr.Wrap(err, "could not blur faces")}buffer, err := imageUtil.EncodeImage(image, "jpg", 90)if err != nil {return syserr.Wrap(err, "could not encode image")}writer, err := s.storageService.GetWriter(taskCtx, config.Storage.ImageBucketName, operationID)if err != nil {return syserr.Wrap(err, "could not get writer")}defer writer.Close()_, err = writer.Write(buffer.Bytes())if err != nil {return syserr.Wrap(err, "could not write image")}if ctxUtil.IsTimeouted(taskCtx) {return syserr.Wrap(err, "context is done")}err = s.markImageProcessed(processCtx, task.ImageID, false, lo.ToPtr(s.storageService.GetPublicURL(config.Storage.ImageBucketName, operationID)))if err != nil {return syserr.Wrap(err, "could not update image")}err = s.markTaskSucessful(processCtx, task, operationID)if err != nil {return syserr.Wrap(err, "could not update image processing queue")}err = s.eventBusService.TriggerEvent(&domain.EventBusEvent{Type: domain.EventBusEventTypeImageProcessed,Payload: &domain.EventBusEventPayloadImageProcessed{ImageID: task.ID,CreatorID: task.CreatedBy.String(),},})if err != nil {return syserr.Wrap(err, "could not trigger event bus event")}s.loggerService.Info(processCtx, "image was processed", logger.F("imageId", task.ID))s.recordSuccess(processCtx, 1)endTime := time.Since(startTime)s.recordDuration(processCtx, endTime)return nil}func (s *Service) markTaskSucessful(ctx context.Context, task database.ImageProcessingQueue, operationID string) error {return s.imageQueueRepository.Update(ctx, nil, &database.ImageProcessingQueueUpdate{ID: task.ID,OperationID: &database.FieldValue[*string]{Value: &operationID},IsCompleted: &database.FieldValue[*bool]{Value: lo.ToPtr(true)},CompletedAt: &database.FieldValue[*time.Time]{Value: lo.ToPtr(time.Now().UTC())},})}func (s *Service) markTaskFailed(ctx context.Context, task database.ImageProcessingQueue, operationID string, reason string) error {return s.imageQueueRepository.Update(ctx, nil, &database.ImageProcessingQueueUpdate{ID: task.ID,OperationID: &database.FieldValue[*string]{Value: &operationID},IsCompleted: &database.FieldValue[*bool]{Value: lo.ToPtr(false)},IsFailed: &database.FieldValue[*bool]{Value: lo.ToPtr(true)},CompletedAt: &database.FieldValue[*time.Time]{Value: nil},FailureReason: &database.FieldValue[*string]{Value: &reason},})}func (s *Service) markImageProcessed(ctx context.Context, imageID uuid.UUID, failed bool, url *string) error {return s.imageRepository.Update(ctx, nil, &database.ImageUpdate{ID: imageID,IsProcessed: &database.FieldValue[*bool]{Value: lo.ToPtr(true)},IsFailed: &database.FieldValue[*bool]{Value: &failed},URL: &database.FieldValue[*string]{Value: url},})}var histogramBoundaries = []float64{500.0, 1000.0, 1500.0, 2000.0, 2500.0, 3000.0, 3500.0, 4000.0, 4500.0, 5000.0,5500.0, 6000.0, 6500.0, 7000.0, 7500.0, 8000.0, 8500.0, 9000.0, 9500.0, 10000.0,10500.0, 11000.0, 11500.0, 12000.0, 12500.0, 13000.0, 13500.0, 14000.0, 14500.0,15000.0, 15500.0, 16000.0, 16500.0, 17000.0, 17500.0, 18000.0, 18500.0, 19000.0,19500.0, 20000.0, 20500.0, 21000.0, 21500.0, 22000.0, 22500.0, 23000.0, 23500.0,24000.0, 24500.0, 25000.0, 25500.0, 26000.0, 26500.0, 27000.0, 27500.0, 28000.0,28500.0, 29000.0, 29500.0, 30000.0,}func (s *Service) recordError(ctx context.Context, value int64) {s.monitoringService.AddInt64Counter(ctx, meterName, "error", value, "", "")}func (s *Service) recordSuccess(ctx context.Context, value int64) {s.monitoringService.AddInt64Counter(ctx, meterName, "processed_images", value, "", "")}func (s *Service) recordDuration(ctx context.Context, duration time.Duration) {s.monitoringService.RecordInt64Histogram(ctx, meterName, "image_processing_duration", duration.Milliseconds(), "", "", otelMetric.WithExplicitBucketBoundaries(histogramBoundaries...))}
The job queue service:
- Creates and starts the worker pool, using the amount of threads specified in the configuration.
- Subscribes to the event bus, to receive new jobs.
- As soon as the
hasNewTasks
flag is set to true, it starts reading the jobs from the database and sending them to the workers for processing, until there is nothing left.- The jobs are being read in batches to avoid OOM situations.
- When the queue is empty, it sets the flag to false.
- The
hasNewTasks
is set to true by default, so in an event of a crash the service picks the old jobs up. As soon as a new notification arrives, the flag is set to true again. - Thus, the queue does not poll the database.
- When the worker is done processing the image, it
- marks the job as completed or failed,
- uploads the processed image to the storage,
- updates the image record,
- dispatches an event to the event bus, so the API can notify the web client through the websocket connection.
- The service also records metrics for success, failures and duration of the image processing.
Upon receiving a new job, the service starts processing the image. The processing includes the face detection and the blurring of the faces found.
The face detection is powered by the fine-tuned YOLO V8 model, converted to the ONNX format for CPU-based inference using the ONNX engine and the onnx - golang binding onnxruntime_go. There is apparently a competing library - onnx-go, but due to some reason it didn't work for me.
The onnx engine is a C++ library, so it's being picked from a list of pre-compiled libraries suitable the current system.
The model is loaded from the local file system using a path which is held in config.Backend.Worker.ModelPath
("BACKEND_WORKER_MODEL_PATH" env variable).
I am no ML expert, so I tried to vibe code this part with Claude, but nothing good came out of it. This proves my theory, that vendor LLMs are only trained to generate code for specific areas, such as web development.
Eventually, I stumbled across an example that does exactly what I needed, so I borrowed a lot from there.
The model only accepts images of 640x640px resolution, so the image is resized using the Lanczos3 algorithm. The bounding boxes are then detected with a certain amount of confidence. This means, we can potentially have overlapping bounding boxes. The overlappings are then merged together to make the result cleaner. The remaining bounding boxes are then upscaled to the original image resolution.
Here is the listing of the service:
package facedetectionimport ("backend/interfaces""backend/internal/domain"imageUtil "backend/internal/util/image""backend/internal/util/logger""backend/internal/util/syserr""context""fmt""image""runtime""sync""github.com/nfnt/resize"ort "github.com/yalue/onnxruntime_go")type modelSession struct {Session *ort.AdvancedSessionInput *ort.Tensor[float32]Output *ort.Tensor[float32]}func (m *modelSession) Destroy() {m.Session.Destroy()m.Input.Destroy()m.Output.Destroy()}type Service struct {configService interfaces.ConfigServiceloggerService interfaces.LoggerServiceortCreated boolortCreationMutex sync.Mutex}func NewService(configService interfaces.ConfigService, loggerService interfaces.LoggerService) *Service {return &Service{configService: configService,loggerService: loggerService,ortCreationMutex: sync.Mutex{},ortCreated: false,}}func (s *Service) Detect(ctx context.Context, image image.Image) ([]*domain.BoundingBox, error) {originalWidth := image.Bounds().Canon().Dx()originalHeight := image.Bounds().Canon().Dy()modelSession, e := s.initSession()if e != nil {return nil, syserr.Wrap(e, "could not create model session")}defer modelSession.Destroy()e = s.prepareInput(image, modelSession.Input)if e != nil {return nil, syserr.Wrap(e, "could not convert image to network input")}e = modelSession.Session.Run()if e != nil {return nil, syserr.Wrap(e, "could not run session")}boundingBoxes := s.processOutput(ctx, modelSession.Output.GetData(), originalWidth, originalHeight)boundingBoxes = imageUtil.FilterDistinctBoxes(boundingBoxes, 0.45, 0.9, 0)// for i, boundingBox := range boundingBoxes {// fmt.Printf("Box %d: %s\n", i, &boundingBox)// }result := make([]*domain.BoundingBox, len(boundingBoxes))for i, boundingBox := range boundingBoxes {result[i] = &domain.BoundingBox{X1: boundingBox.X1,Y1: boundingBox.Y1,X2: boundingBox.X2,Y2: boundingBox.Y2,}}return result, nil}func (s *Service) prepareInput(pic image.Image, dst *ort.Tensor[float32]) error {data := dst.GetData()channelSize := 640 * 640if len(data) < (channelSize * 3) {return syserr.NewInternal(fmt.Sprint("Destination tensor only holds %d floats, needs %d (make sure it's the right shape!)", len(data), channelSize*3) )}redChannel := data[0:channelSize]greenChannel := data[channelSize : channelSize*2]blueChannel := data[channelSize*2 : channelSize*3]// Resize the image to 640x640 using Lanczos3 algorithmpic = resize.Resize(640, 640, pic, resize.Lanczos3)i := 0for y := 0; y < 640; y++ {for x := 0; x < 640; x++ {r, g, b, _ := pic.At(x, y).RGBA()redChannel[i] = float32(r>>8) / 255.0greenChannel[i] = float32(g>>8) / 255.0blueChannel[i] = float32(b>>8) / 255.0i++}}return nil}func (s *Service) getSharedLibPath() (string, error) {if runtime.GOOS == "windows" {if runtime.GOARCH == "amd64" {return "./third_party/onnxruntime.dll", nil}}if runtime.GOOS == "darwin" {if runtime.GOARCH == "arm64" {return "./third_party/onnxruntime_arm64.dylib", nil}if runtime.GOARCH == "amd64" {return "./third_party/onnxruntime_amd64.dylib", nil}}if runtime.GOOS == "linux" {if runtime.GOARCH == "arm64" {return "./third_party/onnxruntime_arm64.so", nil}return "./third_party/onnxruntime.so", nil}return "", syserr.NewInternal("unable to find a version of the onnxruntime library supporting this system")}func (s *Service) processOutput(ctx context.Context, output []float32, originalWidth, originalHeight int) []imageUtil.BoundingBox {boundingBoxes := make([]imageUtil.BoundingBox, 0)detectionCount := len(output) / 5 // Ensure we don't go out of boundsfor idx := 0; idx < detectionCount; idx++ {if idx >= len(output) || 8400+idx >= len(output) || 2*8400+idx >= len(output) || 3*8400+idx >= len(output) {s.loggerService.Info(ctx, "skipping idx out of bounds", logger.F("idx", idx))continue}xc, yc := output[idx], output[8400+idx]w, h := output[2*8400+idx], output[3*8400+idx]confidence := output[4*8400+idx]if confidence < 0.5 {continue}x1 := (xc - w/2) / 640 * float32(originalWidth)y1 := (yc - h/2) / 640 * float32(originalHeight)x2 := (xc + w/2) / 640 * float32(originalWidth)y2 := (yc + h/2) / 640 * float32(originalHeight)boundingBoxes = append(boundingBoxes, imageUtil.BoundingBox{Confidence: confidence,X1: x1,Y1: y1,X2: x2,Y2: y2,})}return boundingBoxes}func (s *Service) initORT() error {if s.ortCreated {return nil}s.ortCreationMutex.Lock()defer s.ortCreationMutex.Unlock()libraryPath, err := s.getSharedLibPath()if err != nil {return syserr.Wrap(err, "could not get library path")}ort.SetSharedLibraryPath(libraryPath)err = ort.InitializeEnvironment()if err != nil {return syserr.Wrap(err, "error initializing ORT environment")}s.ortCreated = truereturn nil}func (s *Service) initSession() (*modelSession, error) {config, err := s.configService.GetConfig()if err != nil {return nil, syserr.Wrap(err, "could not get config")}err = s.initORT()if err != nil {return nil, syserr.Wrap(err, "could not initialize ORT")}inputShape := ort.NewShape(1, 3, 640, 640)inputTensor, err := ort.NewEmptyTensor[float32](inputShape)if err != nil {return nil, syserr.Wrap(err, "error creating input tensor")}outputShape := ort.NewShape(1, 5, 8400)outputTensor, err := ort.NewEmptyTensor[float32](outputShape)if err != nil {inputTensor.Destroy()return nil, syserr.Wrap(err, "error creating output tensor")}options, err := ort.NewSessionOptions()if err != nil {inputTensor.Destroy()outputTensor.Destroy()return nil, syserr.Wrap(err, "error creating ORT session options")}defer options.Destroy()// If CoreML is enabled, append the CoreML execution providerif config.Backend.Worker.UseCoreML {err = options.AppendExecutionProviderCoreML(0)if err != nil {inputTensor.Destroy()outputTensor.Destroy()return nil, syserr.Wrap(err, "error enabling CoreML")}}session, err := ort.NewAdvancedSession(config.Backend.Worker.ModelPath,[]string{"images"}, []string{"output0"},[]ort.ArbitraryTensor{inputTensor},[]ort.ArbitraryTensor{outputTensor},options)if err != nil {inputTensor.Destroy()outputTensor.Destroy()return nil, syserr.Wrap(err, "error creating ORT session")}return &modelSession{Session: session,Input: inputTensor,Output: outputTensor,}, nil}
🚨 There is one gotcha: the model is being loaded on every user request, which is highly suboptimal. The model should be loaded once and then cached, so this part must be refactored, should the code ever reach production.
The image blurring is done using the the disintegration/imaging library. There isn't anything special about it. Originally I planned to use sharp, but then decided to refrain from introducing another external dependency.
Here is the code:
package imageimport ("backend/internal/domain""backend/internal/util/syserr""bytes""image"_ "image/gif" // Register GIF format"image/jpeg"_ "image/jpeg" // Register JPEG format"image/png"_ "image/png" // Register PNG format"net/http""strings""github.com/disintegration/imaging")func DownloadImage(url string) (image.Image, error) {// Fetch the imageresp, err := http.Get(url)if err != nil {return nil, syserr.Wrap(err, "failed to fetch image")}defer resp.Body.Close()// Check if the HTTP status is OKif resp.StatusCode != http.StatusOK {return nil, syserr.NewInternal("received non-200 status code", syserr.F("code", resp.StatusCode))}// Decode the imageimg, _, err := image.Decode(resp.Body)if err != nil {return nil, syserr.Wrap(err, "failed to decode image")}return img, nil}func BlurBoxes(img image.Image, boxes []*domain.BoundingBox, blurRadius float64) (image.Image, error) {// Create a clone of the original image to modifydst := imaging.Clone(img)// Process each bounding boxfor _, box := range boxes {// Convert float32 coordinates to integersx1, y1 := int(box.X1), int(box.Y1)x2, y2 := int(box.X2), int(box.Y2)// Ensure coordinates are within boundsbounds := dst.Bounds()x1 = max(x1, bounds.Min.X)y1 = max(y1, bounds.Min.Y)x2 = min(x2, bounds.Max.X)y2 = min(y2, bounds.Max.Y)// Skip invalid boxesif x2 <= x1 || y2 <= y1 {continue}// Extract the region to blurregion := imaging.Crop(dst, image.Rect(x1, y1, x2, y2))// Apply Gaussian blur to the regionblurred := imaging.Blur(region, blurRadius)// Paste the blurred region backdst = imaging.Paste(dst, blurred, image.Pt(x1, y1))}return dst, nil}func EncodeImage(img image.Image, format string, quality int) (*bytes.Buffer, error) {var buf bytes.Buffervar err errorformat = strings.ToLower(format)switch format {case "jpeg", "jpg":err = jpeg.Encode(&buf, img, &jpeg.Options{Quality: quality})case "png":err = png.Encode(&buf, img)default:return nil, syserr.NewInternal("unsupported format", syserr.F("format", format))}if err != nil {return nil, syserr.Wrap(err, "error encoding image")}return &buf, nil}
As we can see, the backend application contains a lot of features, and all of them are essential for a production-ready service to function as expected.
The application is not containerized, there are no Dockerfiles. Also, the application needs to be deployed to an orchestrator, like Kubernetes. For that, corresponding infrastructure setup must be done, perferably following the infra-as-code approach. The CICD must be put in place, to build new images and push them to the container registry. The deployment process must be automated using tools like Spinnaker or ArgoCD.
This was a nice project to work on. It covers a lot of topics and technologies relevant to modern cloud-native applications and fullstack development. Certainly, there is still a lot to talk about, like DevOps, CI/CD, security and so on, but it's a topic for another article.
The code is, as always, available on GitHub. Enjoy!
