- read

Working with SQS in golang pro-way

Andrew Grachov 23

If you work with modern backend technological stack, most probably you work with microservices, divided by bounded contexts, with the strong team ownership.

In the microservices world, there are two ways the communication can be made — synchronous and asynchronous. With synchronous-style communication everything is relatively simple.

  1. Service B gives endpoints that can be called by service A
  2. Whenever anybody calls service A, he will wait for request to be processed by both of services, obtaining the final result.

If service B is responsible for email sending:

Email send scenario synchronous communication

Alongside with the “synchronous” naming this also can be defined as “orchestration” — (direct data management?) in scope of one user request.

While it’s not complex from the implementation side, and gives strong consistency — this approach has a couple of downsides:

  1. Service B can be down, and the whole system will not work — tasks will be lost
  2. Networks can fail. The more services you have to call in scope of one request — the more prone to error it will be.
  3. Latency. Obviously calling extra service and wait for the task completion adds additional overhead which is not tolerable in some systems.
  4. Sudden spikes in traffic can take down both services and underlying infrastructure.
  5. Direct dependency creates additional coupling, which lowers the system agility and ability to be scaled.
  6. Some scenarios cannot be handled that way, like performing heavy resource consuming operations, which are over http timeout

The asynchronous approach

The another approach will be to use asynchronous communication style, or “choreography” using message broker and message queues.

Email send, asynchronous scenario

What benefits does it bring?

  1. Service B downtime will cause only delay in email sending, but not the data loss, as the message stays in the queue until processed.
  2. Resistant to network failures
  3. User will receive response instantly, without waiting for actual sending to happen — great customer experience
  4. Traffic spikes will also cause only delay in processing.
  5. No direct dependency between services, no Service A knowledge about Service B(email service)
  6. Heavy computational tasks friendly — all the work is done in the background

Working with message brokers utilize the Publisher/Subscriber pattern

So in our example Service A will be the publisher (data producer) and Service B will be the Subscriber (data consumer). What about choosing a message broker?

Choosing the message broker

There are plenty of popular message broker solutions as of 2021 — Kafka, RabbitMQ, GCP pubsub, NATS, SQS/SNS, etc. In this article we will cover the AWS SQS message queue, as one of the most popular and easy to use. Different message broker provide different drivers but almost of them require some extra work on top in order to be production-ready and communicate reliable way.

Working with message publisher

Publishing to SQS is pretty straightforward and usually doesn’t require any special tuneup. However, if we aim for sending millions of messages, some additional work needs to be done. In this article we will not cover up rapid mass sending to be better focused on consumer side.

func sendEmail(email, msg string) error {
_, err := svc.SendMessage(&sqs.SendMessageInput{
DelaySeconds: aws.Int64(0),
MessageAttributes: map[string]*sqs.MessageAttributeValue{
"email": {
DataType: aws.String("String"),
StringValue: aws.String(email),
},
},
MessageBody: aws.String(msg),
QueueUrl: &queueURL,
})
return err
}
func main() {
// load 10000 emails
loadEmailsFromCSV()

sess, err := session.NewSession(&aws.Config{
Region: aws.String(AWS_REGION),
})
if err != nil {
panic(err)
}

svc = sqs.New(sess)
queueURL = QUEUE_URL
msg := "Hello from test golang application!"

start := time.Now()

for i := range emails {
log.Println("sending message:", i)
if err := sendEmail(emails[i], msg); err != nil {
log.Println("EMAIL send error")
}
}

elapsed := time.Since(start)
log.Println("Elapsed: ", elapsed)
}

Working with message consumer

With SQS message subscriber you will have receiving messages in small batches out of the box, however processing them should follow pretty much similar approach that we had with publishing.
In order our message consumer to achieve a high throughput, we will process the messages in parallel, and in order this to be robust, we should impose a limit on how many messages we should process simultaneously.

As standard aws sqs receive call gives us maximum of 10 messages, the naive approach will be to process them in parallel, then call the next batch.

func consumeNaive() {
for {
msgResult, err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
},
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
QueueUrl: &queueURL,
MaxNumberOfMessages: aws.Int64(10),
VisibilityTimeout: &timeout,
})

wg := sync.WaitGroup{}
for i := range msgResult.Messages {
wg.Add(1)
go func(){
defer wg.Done()
email := msgResult.Messages[i].MessageAttributes["email"].StringValue
body := msgResult.Messages[i].Body
processEmailMessage(*email, *body)
_, err = svc.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: &queueURL,
ReceiptHandle: msgResult.Messages[i].ReceiptHandle,
})
}()
}
if len(msgResult.Messages) == 0 { // add aditional sleep if queue is empty
time.Sleep(1 * time.Second)
continue
}
}
}

With approach like this we will be limited to the
1 minute / slowest message processing in batch * 10, for example having the slowest message being processed in 50ms it will give us
(1000 ms / 50ms) * 10 = 200 messages per second of processing time minus network latency, that can eat up most of the projected capacity.

What we are going to do instead — is to create a limited parallel queue, and we continue to poll AWS until all the limit is reached.

Again, as in publisher example we will create a “token bucket” using a buffered channel

var sqsSvc *sqs.SQSfunc createFullBufferedChannel(capacity int) chan bool {
sync := make(chan bool, capacity)

for i := 0; i < capacity; i++ {
sync <- true
}
return sync
}
func consumeConcurrent() {
concurrency := 200
sync := createFullBufferedChannel(concurrency)
for { // create an infinite processing loop
msgResult, err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
},
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
QueueUrl: &queueURL,
MaxNumberOfMessages: aws.Int64(10),
VisibilityTimeout: &timeout,
})
if err != nil {
// TODO: add error notification
log.Println("Receive message error: ", err)
continue
}
if len(msgResult.Messages) == 0 {
time.Sleep(1 * time.Second)
continue
}

for i := range msgResult.Messages {
// request the exact amount of "workers" from pool.
// Again, empty buffer will block this operation
<- sync

go func() {
email := msgResult.Messages[i].
MessageAttributes["email"].StringValue
body := msgResult.Messages[i].Body
processEmailMessage(*email, *body)
_, err = svc.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: &queueURL,
ReceiptHandle: msgResult.Messages[i].ReceiptHandle,
})
if err != nil {
log.Println("Deleting message error", err)
}
// return "worker" to the "pool"
sync <- true
}()
}
}
}

With approach like this, we are limited only by aws throughput.

Making the code even more robust

Some scenarios will require a different set of resources consumed, depending on the message type (Lets say you want your handler to be able to process from 1 to N emails in 1 message). To maintain our limitations, we could introduce the timely based token bucket algorithm , which will ensure we don’t process more than N emails over a period of time (like 1 minute), by grabbing the exact amount of “worker tokens” from the pool, depending on emails count in message. Also, if your code can be timed out, there is a good approach to impose timeout and cancelation, based on golang context.WithCancel function. Check out the golang semaphore library to build the nuclear-resistant solution. (the mechanics are the same as in our example, abstracted to library, so instead of using channel for limiting our operation we will call semaphore.Acquire, which will also block our execution until “worker tokens” will be refilled).

Dead lettering and eventual consistency

When something is failing, there can be 2 types of errors: retryable and non-retryable

The non-retryable error usually means bug in our code which will produce the same error result with given input.

The retryable error means something is temporary broken and will be eventually available at the some point of time later, like email provider was down for a couple of minutes, but was recovered after.

What does it mean for our message processing?

At first glance, there is a temptation to configure our queue to process messages with 100% success rate providing an infinite retries configuration, e.g. repeat until everything is ok.

While it can help with retryable errors, it can give problems when we face non-retryable ones. Especially when processing message takes down our consumer due to unrecoverable error (panic). These are called the poison messages and they can take down your entire cluster of consumers, one at the time.

To mitigate that, there is a widely-known technique called “dead lettering” which is basically consisted of two parts — retry limit and side queue.

The idea is simple — when something is failing more than specified times, it will be transfered to separate queue (which is not processed by our worker).
The dead lettering is usually configured on the broker side, so it’s not dependent on our implementation, and can be considered as a robust error-handling solution.

What to do with the “dead letters”?

It’s all dependent on the application requirements. For example, if you have tasks error budget (you can allow some percentage of messages to be not processed), you can just have error notifications, when something lands up to the dead letter queue. Then you can decide to what to with it.

When the requirements are strict, and you should process every message, eventual consistency comes into play. That means, that dead letter queue should be consumed as well in order to reach the desired system state.

So, alongside with having standard consumer, we can write a CLI tool, for example, which will process the dead letter queue using the same interface as regular consumer — this will gives us an instrument to recover from failures.

Working with dead letter queue

Afterword

Working with Queues is usually challenging, but very beneficial and greatly contributes to the system stability. Imposing a proper limits, and implementing proper error handling can mitigate most of the problems we can encounter using this approach.

References

https://docs.microsoft.com/en-us/azure/architecture/microservices/model/domain-analysis

https://docs.microsoft.com/en-us/azure/architecture/microservices/design/interservice-communication