- read

Implementing Request Parking in a Go Service with Redis

Pandula Weerasooriya 83

Vector lines by starline on Freepik

This short article explains, in essence, how to use Redis to synchronize multiple concurrent requests in a Web service. Although I’ve used Go as my back-end language, this methodology can easily be implemented for other languages as well.

Problem overview

I work at a company where we process large number of user transactions regularly for our financial wellness app. When a user requests their transactions in the front-end, we start a process where we pull transactions from upstream 3rd party APIs, enrich them with extra information, check for possible recurring expenses etc. and save the processed information in the DB (Database) and return the results back to the user.

Unfortunately, this process could take up-to 20, 30 seconds and we intended to conduct it only once for a period of time (such as 30-minutes). The requests that arrive in-between this cache interval should be served stale information from the DB.

Here’s a look at the first iteration.

func (s *Service) GetTransactions(ctx context.Context, uid string) ([]model.Transaction, error) {
// get cacheOnly flag from redis
var cacheOnly bool
if err = c.cache.Get(ctx, uid, &cacheOnly); err != nil && err != cache.ErrNotFound {
logger.Logger().Warn().Msgf("failed to get the key for uid %s", uid)
}

// return from the db, if the cache flag is true
if cacheOnly {
return c.getTxsFromDb(ctx, uid)
}

// time-consuming workload which could take many seconds
txs, err := c.processTransactions(ctx, uid)
if err != nil {
return nil, err
}

// set cache only interval
if err = c.cache.Put(ctx, uid, true, CachePeriod); err != nil {
logger.Logger().Warn().Msgf("failed to set the 30 min cache flag for %s", uid)
}

return txs, nil
}

For each user_id (uid), we use a cacheOnly flag of type boolean and whenever a request hits this GetTransactions method, it will first check this flag and decide whether to return results from the DB or do the full transaction processing.

Now this is where we found the core problem at hand. The GetTransactions method could get called multiple times while a transaction is being processed. This may occur because,

  • When a user first loads the home-screen, it loads several components such as budgets, bills, trackers etc. and each of these components are calling GetTransactions internally.
  • Some components at the front-end were poorly designed, so that, multiple calls to GetTransactions were triggering at component re-renders.

Below is a visual representation of the problem.

drawn using draw.io

The requests originating during the processing, will check the cache flag first, and because it hasn’t been set yet, they will also begin processing the transactions. (Note that, all of these requests will update the cache after the processing is finished, but I omitted that for brevity).

This unwarranted transaction processing is hugely detrimental to the system as whole since,

  • It uses up a lot of the service’s CPU and memory. Since multiple updates and inserts may occur quickly and cause numerous lock contentions, it may slow down the database as well.
  • Doing a large number of 3rd party API calls can quickly add up on your API bills.
  • Increases the delay between when users receive updates for various components of the App.

What we ideally, would want is to park the requests until the first request has finished processing. As illustrated below,

drawn using draw.io

For a problem like this, a global lock wouldn’t cut it since, we want to park requests for each user and processing of each user is mutually exclusively to each other. So, instead of trying to build a locking mechanism in Go itself, we decided to leverage Redis, as it was already being used in other parts of the service.

But, before I reveal the solution, there’s a couple of Redis operations that you need to be familiar with.

Redis Operators

SETNX

In Redis, SETNX stands for "Set if Not Exists." It is a command used to set the value of a key in the Redis database only if the key does not already exist. This command is often used to implement a simple form of distributed locking in a multi-threaded or distributed environment.

When used as a lock, the idea is to use a specific key as a lock identifier. Multiple processes or threads can attempt to acquire the lock by trying to execute the SETNX command on that key. If the key doesn't exist (meaning the lock is not acquired), the process succeeds in setting the key and acquires the lock. If the key already exists (lock is already acquired by another process), the process fails to set the key and doesn't acquire the lock.

Here’s a basic example of how SETNX can be used as a lock in a distributed environment:

  1. Thread-A wants to acquire the lock:
SETNX lock_key true

If the return value is 1, thread A successfully acquired the lock. If the return value is 0, it means the lock is already held by another thread.

2. Thread A performs its critical section of code while holding the lock.

3. Thread A releases the lock when it’s done:

DEL lock_key

Pub / Sub

Redis Pub/Sub (Publish/Subscribe) is a messaging mechanism that allows different parts of an application or different applications to communicate with each other in a loosely coupled manner. It’s a way for multiple clients to exchange messages through channels without knowing anything about each other.

You can,

  • Subscribe to channels, thereby register yourself as a listener on a particular channel.
  • Publish messages to channels so that all subscribers that are currently listening to that channel will receive the message.
  • Unsubscribe from channels , when you no longer need to listen to the channels.

Note that, Redis Pub/Sub doesn’t persist messages. It’s more suited for real-time communication than for message storage.

Solution

Here’s how we implemented the solution using SETNX and pub/sub feature.

  1. When a request for a user first comes in, it will call SETNX and simulates the locking part.
  2. It will then proceed to check if the cacheOnly flag is true. If the cacheOnly is false, it will start the transaction processing workload.
  3. If other requests comes in the interim, they will also try to take the lock by calling SETNX and will fail. Once this happens, those requests will subscribe to a channel and will wait until a done message is published to the said channel.
  4. Once the first request finishes it’s work, it will save the results in the DB, set the cacheOnly flag to true and will publish a message in the channel that the subscribers are waiting on.
  5. Once a message is published to the channel, the requests that are waiting on the channel will be freed and each of them will check the cacheOnly flag and will find it to be true. So, they will resort to the DB instead of processing the transactions again.

Below is the Go code for the full solution.

func (s *Service) GetTransactions(ctx context.Context, uid string) ([]model.Transaction, error) {
// read the lock key
lockKey := fmt.Sprintf("%s-lock", uid)
unlockedForReading, err := c.cache.SetNx(ctx, lockKey, true, MaxProcessTime)
if err != nil {
logger.Logger().Warn().Msgf("failed to set the lock flag for %s", uid)
}

// unlock the lock key whenever the function returns, so, that we won't
// be blocking any other requests
defer func() {
if err = c.cache.Del(context.Background(), lockKey); err != nil {
logger.Logger().Warn().Msgf("failed to delete the lock key for uid %s", uid)
}
// publish a message to the pub-sub channel, so that any requests that are
// blocked on the subscription channel will be released
if err := c.ch.Publish(ctx, channel, true); err != nil {
logger.Logger().Warn().Err(err).Msgf("failed to publish to channel for uid %s", uid)
}
}()

// if the key is locked, subscribe to the channel and wait
channel := uid + ":" + string(b) + ":ch"
if !unlockedForReading && err == nil {
sub := c.ch.Subscribe(ctx, channel)
defer sub.Close()

// blocking this request until the current request is done
// or if the timeout exceeds
select {
case <-sub.Channel():
case <-time.After(60 * time.Second):
}
}

// get cacheOnly flag from redis
var cacheOnly bool
if err = c.cache.Get(ctx, uid, &cacheOnly); err != nil && err != cache.ErrNotFound {
logger.Logger().Warn().Msgf("failed to get the key for uid %s", uid)
}

// return from the db, if the cache flag is true
if cacheOnly {
return c.getTxsFromDb(ctx, uid)
}

// time-consuming workload which could take many seconds
txs, err := c.processTransactions(ctx, uid)
if err != nil {
return nil, err
}

// set cache only interval
if err = c.cache.Put(ctx, uid, true, CachePeriod); err != nil {
logger.Logger().Warn().Msgf("failed to set the 30 min cache flag for %s", uid)
}

return txs, nil
}

Few things to note here,

  • I’m unlocking (calling DEL on the locking key) and publishing the done message in a deferred function so that, if a request were to unexpectedly crash or return early, it will free the requests that are waiting.
  • Redis won’t throw an error if you call some operators multiple times. Like for example, each request will call DEL on the locking key when that request has finished.
  • We are using Go’s select operator to wait on the Redis channel. We also added a timeout here, so that requests won’t be clogged.
  • You might think why we still need the cacheOnly flag since the locking key is already there. Consider a situation when a request arrives in the middle of the cache period (say 30 mins). Let’s say that 10 minutes have passed since the cache was first set. Nothing prevents it from starting the transaction processing all over again. We still need a cacheOnly flag to ensure that no processing will take place during the cache time; the locking flag was merely set to prevent requests from commencing concurrent operations.

Hopefully, you will find this helpful if you ever experience or have already encountered an issue similar to this. Until then, happy coding!.