- read

How We Scaled to 100 Million Active Users Using Kafka and Golang — Eventual Consistency

Mohammad Hoseini Rad 50

Nowadays, we have reached an era where the most popular startups reach millions of users within less than a year. During my experience as a software developer, where I had the privilege to work on a couple of them, I’ve seen that the most common bottleneck within a backend service is caused by I/O overhead. In this article, we will discuss the Eventual Consistency technique and how we can overcome I/O bottlenecks in scale by utilizing Kafka.

How simplicity allows Kafka to scale up to millions of messages per second

Basically, a Kafka cluster consists of multiple servers, called brokers in the Kafka vocabulary, and one or more topics.

What is a topic in Kafka? A topic is a stream of messages with similar behaviors. They can be similar events, JSON, or anything as bytes. For instance, we had topics for published posts, comments, and likes, as well as action logs to study user experiences.

How is the structure of a topic in a cluster? Topics have two main properties: the number of partitions and replications. The replication determines how many copies we need from a topic to increase resiliency in case of broker failures. Before describing partitions, let’s discuss a message's journey, from its publication to a topic until received and committed by a consumer.

First, we have a happy message published by a producer going toward a topic. What will happen when the topic receives the message? A topic is not just one queue. It consists of one or more queues that we call partitions. These partitions can be located in one or more different brokers.

When the topic receives the message, the topic leader decides where to put the message. We can configure this behavior by using keys and distribution strategies. However, you can allow Kafka to distribute your messages fairly and randomly among partitions.

Each consumer can read from one or more partitions. The key point is that no more than one consumer can read from a partition. What does it mean?

If we have more consumers than partitions, excess consumers will starve! At first, I thought it was going to limit us a lot. But this behavior allows Kafka to scale out partitions dependently and achieve millions of messages per second.

Eventual Consistency — In eventuality, we trust!

Let’s explore different scenarios. Posting a new Instagram photo, a new tweet, a new comment on Amazon, liking a photo, uploading a new avatar, or looking at a photo! In the traditional way, when a client sends an HTTP request to upload a new photo, the backend holds that request until the request is either successful or failed. But there is another approach to it that suits well with asynchronous programming.

How much should It take until our followers receive a new tweet? 100 milliseconds, one second, one minute, or five minutes? As Twitter has mentioned in their articles, they are happy with less than five seconds. In our case, in request peaks with way fewer resources than Twitter, we were happy with less than a minute. The key point is that in many scenarios, we are okay with seconds of delays, but we do not need to hold the user until the request is finished. Imagine waiting 5 seconds to tweet each time.

In other scenarios, like gathering action logs to compute user behaviors later, we do not need messages instantly. Hence, making users wait until the application saves their log is not logical at all. Besides that, Kafka has powerful integrations with other tools, such as ELK and Flink, to work on streams of messages.

The second reason I should mention that makes an application slow is I/O bottlenecks. Assume we need to build a blogging service. By separating reads from writes, we can scale databases independently. For instance, we can have a relational database that supports transactions for insert queries, and after the data have been saved, it publishes a new event that synchronizes read databases. Read databases, therefore, don’t need to be as complicated as write databases and can be optimized just for read queries.

One of the most useful techniques that we used to handle around 10 million requests per hour was utilizing Redis as the read database. I was working on services that directly respond to GET requests for editorial pages. We processed every page and then saved them in the Redis. And whenever there was a change within a page, a Kafka event was triggered to update the cache. Therefore, the Redis was always updated, and we were able to answer requests as fast as possible.

Why do we have two different topics? Because in many cases, your team is only in charge of one part of the page, and other teams are updating other entities, and your team doesn’t need to be worried about it. Using a message broker provides decoupling, less complexity in services, and resilience and scalability by eventual consistency.

Let’s Build a Blogging Service

Let’s build a sample service that utilizes Kafka and Redis to implement eventual consistency. While setting up Kafka and Redis in the development environment is not that challenging, having a resilient and scalable Kafka and Redis cluster on the cloud requires years of experience.

We will be using Upstash’s free Kafka and Redis clusters in this article. Their free-tier Kafka and Redis clusters support up to 10k messages per day which is more than enough for development environment.

To create your cluster, click here and create a new Upstash account. Go to the Kafka tab and click on the create cluster button.

As you can see, you can select between multiple regions and choose the replication factor. We are going to use this topic for the testing environment, so I’ll leave it on Single Replica.

After creating the cluster, you can add new topics to it. As you can see, you can configure partition count, message size, and even retention configurations. Basically, retention configurations tell Kafka when to delete old messages to free up space by either discarding old messages or deleting the oldest messages whenever the topic reaches a certain size.

Defining a strategy using eventual consistency.

Let’s walk through it:

  • Whenever the New Post API receives a new post, it puts it on the app.newPost topic.
  • The Publisher service consumes new messages and saves them in a persistent database like PostgreSQL. And then, put the detail on the app.publishedPost topic.
  • The CacheManager service consumes different topics, and one of those topics is the app.publishedPosts. It updates the Redis cache accordingly.
  • Whenever there is a new Get Post request, the service replies with the data within the Redis.

Let’s implement it with Golang.

Beforehand, we need to define the Post model to save in the database as well as contracts between different services on Kafka messages.

The database model:

type Post struct {
UID string `json:"uid" gorm:"primary"`
Title string `json:"title"`
Content string `json:"content"`
Slug string `json:"slug" gorm:"uniqueIndex`
}

And contracts:

type NewPostMessage struct {
UID string `json:"uid"`
Title string `json:"title"`
Content string `json:"content"`
}

type PublishedPostMessage struct {
Post
}

Then, we need to implement the publisher service.

type Publisher struct {
newPostReader *kafka.Reader
publishedPostWriter *kafka.Writer
db *gorm.DB
}

As you can see, it needs to consume messages from the newPosts and publish them, after being saved, to the publishedPosts queue.

func NewPublisher() (*Publisher, func()) {
// we should use dependency injection here, but the point of the article is something else, therefore, for
// simplicity, I initiated all dependencies here.
p := &Publisher{}
mechanism, err := scram.Mechanism(scram.SHA256, "","")
if err != nil {
log.Fatalln(err)
}

// setup database
dsn := "host=localhost user= password= dbname=posts sslmode=disable TimeZone=Asia/Tehran"
db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})

if err != nil {
log.Fatalln(err)
}
p.db = db
if err := db.AutoMigrate(&Post{}); err != nil {
log.Fatalln(err)
}

// setup kafka
dialer := &kafka.Dialer{SASLMechanism: mechanism, TLS: &tls.Config{}}
p.newPostReader = kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{""},
Topic: "app.newPosts",
GroupID: "service.publisher",
Dialer: dialer,
})
p.publishedPostWriter = kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{""},
Topic: "app.publishedPosts",
Dialer: dialer,
})

return p, func() {
p.newPostReader.Close()
p.publishedPostWriter.Close()
}
}

Do you want your cluster credentials, or do you not know how to connect to the cluster in other languages? Go to your Upstash panel, and you can find everything you need.

Now the Run method.

func (p *Publisher) Run() {
for {
newPost, err := p.newPostReader.FetchMessage(context.Background())
if err != nil {
if errors.Is(err, io.EOF) {
return
}
log.Fatalln(err)
}

var post NewPostMessage
if err := json.Unmarshal(newPost.Value, &post); err != nil { // dead letter queue maybe a better solution
log.Printf("decoding new post error: %s\n", err.Error())
continue
}

postModel := Post{
UID: post.UID,
Title: post.Title,
Content: post.Content,
Slug: slug.Make(post.Title + "-" + time.Now().Format(time.Stamp)),
}
if err := p.db.Create(&postModel).Error; err != nil {
log.Printf("saving new post in database: %s\n", err.Error())
}
p.newPostReader.CommitMessages(context.Background(), newPost)

b, _ := json.Marshal(PublishedPostMessage{Post: postModel})
p.publishedPostWriter.WriteMessages(context.Background(), kafka.Message{Value: b})
log.Printf("the %s post has been saved in the database\n", post.UID)
}
}

As you can see, it reads messages from newPost and, after saving them in the database, writes them in the publishedPost queue.

Now the cache manager service:

type CacheManager struct {
publishedPostReader *kafka.Reader
rdb *redis.Client
}

func NewCacheManager() (*CacheManager, func()) {
// we should use dependency injection here, but the point of the article is something else, therefore, for
// simplicity, I initiated all dependencies here.
cm := &CacheManager{}
mechanism, err := scram.Mechanism(scram.SHA256, "", "")
if err != nil {
log.Fatalln(err)
}

// setup Redis
opt, _ := redis.ParseURL("redis://default:PASSWORD@SERVER:PORT")
cm.rdb = redis.NewClient(opt)

// setup kafka
dialer := &kafka.Dialer{SASLMechanism: mechanism, TLS: &tls.Config{}}
cm.publishedPostReader = kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{""},
Topic: "app.publishedPosts",
GroupID: "service.cacheManager",
Dialer: dialer,
})

return cm, func() {
cm.publishedPostReader.Close()
cm.rdb.Close()
}
}

func (c *CacheManager) Run() {
for {
publishedPost, err := c.publishedPostReader.FetchMessage(context.Background())
if err != nil {
if errors.Is(err, io.EOF) {
return
}
log.Fatalln(err)
}

var published PublishedPostMessage
if err := json.Unmarshal(publishedPost.Value, &published); err != nil {
log.Printf("decoding published post error: %s\n", err.Error())
continue
}

b, _ := json.Marshal(published.Post)
c.rdb.Set(context.Background(), "post:"+published.Slug, b, 0)
c.publishedPostReader.CommitMessages(context.Background(), publishedPost)
log.Printf("the %s post has been saved in Redis\n", published.UID)
}
}

You can also find your Redis cluster’s credentials in your Upstash panel.

Finally, let’s create a new service for the API:

type API struct {
rdb *redis.Client
newPostWriter *kafka.Writer
}

func NewAPI() (*API, func()) {
// we should use dependency injection here, but the point of the article is something else, therefore, for
// simplicity, I initiated all dependencies here.
p := &API{}
mechanism, err := scram.Mechanism(scram.SHA256, "", "")
if err != nil {
log.Fatalln(err)
}

// setup Redis
opt, _ := redis.ParseURL("redis://default:PASSWORD@SERVER:PORT")
p.rdb = redis.NewClient(opt)
// setup Kafka
dialer := &kafka.Dialer{SASLMechanism: mechanism, TLS: &tls.Config{}}
p.newPostWriter = kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{""},
Topic: "app.newPosts",
Dialer: dialer})

return p, func() {
p.newPostWriter.Close()
p.rdb.Close()
}
}

// NewMessage returns the generated UID and error
func (a *API) NewMessage(title, content string) (string, error) {
uid := shortid.MustGenerate()
message := NewPostMessage{
UID: uid,
Title: title,
Content: content,
}
b, _ := json.Marshal(message)
return uid, a.newPostWriter.WriteMessages(context.Background(), kafka.Message{Value: b})
}

func (a *API) GetPost(slug string) (Post, error) {
var p Post
tr := a.rdb.Get(context.Background(), "post:"+slug)
b, err := tr.Bytes()
if err != nil {
return Post{}, err
}
json.Unmarshal(b, &p)
return p, nil
}

Now that everything is ready, let’s finalize our application and build the HTTP API.

cm, closeCacheManager := services.NewCacheManager()
defer closeCacheManager()
p, closePublisher := services.NewPublisher()
defer closePublisher()
api, closeAPI := services.NewAPI()
defer closeAPI()

// run background services
go cm.Run()
go p.Run()

// setup HTTP server
e := echo.New()
e.POST("/post", func(c echo.Context) error {
title := c.Request().PostFormValue("title")
content := c.Request().PostFormValue("content")
_, err := api.NewMessage(title, content)
if err != nil {
return c.String(500, err.Error())
}
return c.String(201, "We have received your post and it will be published sooner or later.")

})
e.GET("/post/:slug", func(c echo.Context) error {
post, err := api.GetPost(c.Param("slug"))
if err != nil {
if errors.Is(err, redis.Nil) {
return c.String(404, "not found")
}
return c.String(500, err.Error())
}
return c.JSON(200, post)
})
go e.Logger.Fatal(e.Start(":1323"))

Let’s run and try it:

To access Redis, you can use either the redis-cli or Upstash’s data browsers tool.

Now we can easily access the post using its slug:

Conclusion

As you may have noticed, there is always a delay between submitting the new post form and updating the Redis cache. That’s part of the eventual consistency, and you should bear that in mind. The idea is simple, but it can easily create complexity in the code, and to avoid it, you can take a look at CQRS implementations.

In this article, I tried to simplify as much as I could. You see, there are a lot of edge cases that I couldn’t cover in one article, such as What should we do when we lose the entire Redis cluster? How to avoid overpopulating Redis with unused data? What caching strategy should we use? What to do in case of network failure and packet loss. What ensures the consistency of row structures in the Cache? What is our strategy when there is a request for a post that hasn’t been saved in the cache yet?