- read

Quick Introduction to Using Golang for Kafka

Beatrice Leung 61

A walk-through of my short and basic program and what I learned from it.

To preface, I am a self-taught beginner at both Go and Kafka but would like to share this short tutorial that may be helpful to others starting out. This tutorial assumes a basic familiarity with Go and the concept of Kafka as an event-streaming platform. For more information on the latter, refer to this short article: https://kafka.apache.org/intro

This tutorial uses the segmentio/kafka-go library. My code is largely based off the examples in the documentation (https://pkg.go.dev/github.com/segmentio/[email protected]) and provides additional commentary. I will walk through my short and basic Go program to connect to a native Kafka server and write and read messages.

I am running Kafka on Docker on macOS. Zookeeper (or some similar client) is also needed to create and manage topics.

First check Kafka is running (docker ps in the CLI or whatnot) and which port it’s listening to. One way to check the port number is to enter this line in the terminal to display all listening ports:

lsof -i -P | grep LISTEN | grep :$PORT

The default port for Kafka is 9092, but it could be configured differently. There may be an entry like:

com.docke 80786 bbeatriceleung 168u IPv6 0xe7b7c887e8481c2d 0t0 TCP *:9092 (LISTEN)

The Program

Here is the complete program with minimal comments. My four user-defined functions in main are explained beneath.

/* References: 
git: https://github.com/segmentio/kafka-go
doc: https://pkg.go.dev/github.com/segmentio/kafka-go#section-readme
*/
package main

import(
"fmt"
"github.com/segmentio/kafka-go"
"context"
"time"
)

func main(){
topic := "test-go-topic"
partition := 0
conn, _ := connect(topic, partition)

writeMessages(conn, []string{"msg 1", "msg 22","msg 333"})
readMessages(conn, 10, 10e3)
readWithReader(topic, "consumer-through-kafka 1")

if err := conn.Close(); err != nil {
fmt.Println("failed to close connection:", err)
}
} //end main

//Connect to the specified topic and partition in the server
func connect(topic string, partition int)(*kafka.Conn, error){
conn, err := kafka.DialLeader(context.Background(), "tcp",
"localhost:9092", topic, partition)
if err != nil {
fmt.Println("failed to dial leader")
}
return conn, err
} //end connect

//Writes the messages in the string slice to the topic
func writeMessages(conn *kafka.Conn, msgs []string){
var err error
conn.SetWriteDeadline(time.Now().Add(10*time.Second))

for _, msg := range msgs{
_, err = conn.WriteMessages(
kafka.Message{Value: []byte(msg)},)
}
if err != nil {
fmt.Println("failed to write messages:", err)
}
} //end writeMessages

//Reads all messages in the partition from the start
//Specify a minimum and maximum size in bytes to read (1 char = 1 byte)
func readMessages(conn *kafka.Conn, minSize int, maxSize int){
conn.SetReadDeadline(time.Now().Add(5*time.Second))
batch := conn.ReadBatch(minSize, maxSize) //in bytes

msg:= make([]byte, 10e3) //set the max length of each message
for {
msgSize, err := batch.Read(msg)
if err != nil {
break
}
fmt.Println(string(msg[:msgSize]))
}

if err := batch.Close(); err != nil { //make sure to close the batch
fmt.Println("failed to close batch:", err)
}
} //end readMessages

//Read from the topic using kafka.Reader
//Readers can use consumer groups (but are not required to)
func readWithReader(topic string, groupID string){
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
GroupID: groupID,
Topic: topic,
MaxBytes: 100, //per message
// more options are available
})

//Create a deadline
readDeadline, _ := context.WithDeadline(context.Background(),
time.Now().Add(5*time.Second))
for {
msg, err := r.ReadMessage(readDeadline)
if err != nil {
break
}
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n",
msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
}

if err := r.Close(); err != nil {
fmt.Println("failed to close reader:", err)
}
}

Running the program twice outputs:

beatrices-mbp:kafkaTest bbeatriceleung$ go run .
msg 1
msg 22
msg 333
message at topic/partition/offset test-go-topic/0/0: = msg 1
message at topic/partition/offset test-go-topic/0/1: = msg 22
message at topic/partition/offset test-go-topic/0/2: = msg 333
beatrices-mbp:kafkaTest bbeatriceleung$ go run .
msg 1
msg 22
msg 333
msg 1
msg 22
msg 333
message at topic/partition/offset test-go-topic/0/3: = msg 1
message at topic/partition/offset test-go-topic/0/4: = msg 22
message at topic/partition/offset test-go-topic/0/5: = msg 333

Notice the increasing offsets of the “message at…” lines and that it does not repeat the previous messages in the second program execution, unlike the lines that simply print out the messages. The reason is explained later.

Connecting to the server

//Connect to the specified topic and partition in the server
func connect(topic string, partition int)(*kafka.Conn, error){
conn, err := kafka.DialLeader(context.Background(), "tcp",
"localhost:9092", topic, partition) //creates the topic if DNE
if err != nil {
fmt.Println("failed to dial leader")
}
return conn, err
} //end connect

If there are issues connecting to the Kafka server, be sure to check that the correct port has been passed into kafka.DialLeader(). This inbuilt function returns the connection to the specified (case-sensitive) topic and partition, and this connection must be established and returned for most of my program to work.

The topic is automatically created if it does not already exist. The library documentation includes an example in manually creating topics, but it is relatively cumbersome. However, I am not sure how to otherwise create a topic with more than 1 partition using this Go client, as inputing a partition other than 0 into DialLeader() makes the program hang.

The context package is part of the standard library and can be used to set deadlines for other functions/methods to finish executing by. The rest of the program continues to set “deadlines” to time out after a specified duration. context.Background() is an empty but non-nil context with no deadline. For more information, refer to its documentation at https://pkg.go.dev/[email protected].

Writing Messages to the Partition

//Writes the messages in the string slice to the end of the partition
func writeMessages(conn *kafka.Conn, msgs []string){
var err error
conn.SetWriteDeadline(time.Now().Add(10*time.Second)) //optional

for _, msg := range msgs{ //iterate over each msg in the slice
_, err = conn.WriteMessages( //write a batch to the partition
kafka.Message{Value: []byte(msg)},)
}
if err != nil {
fmt.Println("failed to write messages:", err)
}
} //end writeMessages

My function takes a string slice and uses a for-each/range loop to write each string to the partition through the connection(conn). Kafka’s WriteMessages() method takes the kafka.Message type as arguments, in which I have initialized its Value parameter to my desired message as a byte slice. This method takes a variable number of kafka.Message arguments and could(and is probably intended to be used to) write multiple Messages together as a batch. Messages are written to the partition in sequential order.

It doesn’t seem necessary to specify a write deadline for short messages that can be written instantly, but it is good practice to specify it. In this example, I set it to 10 seconds.

Reading Messages from the Partition

//Reads all messages in the partition from the start
//Specify a minimum and maximum size in bytes to read (1 char = 1 byte)
func readMessages(conn *kafka.Conn, minSize int, maxSize int){
conn.SetReadDeadline(time.Now().Add(5*time.Second)) //optional
batch := conn.ReadBatch(minSize, maxSize) //get the batch

msg:= make([]byte, 10e3) //set the max length of each message(10KB here)
for {
msgSize, err := batch.Read(msg) //read the next message
if err != nil { //EOF or other error
break
}
fmt.Println(string(msg[:msgSize])) //print the message
}

if err := batch.Close(); err != nil { //make sure to close the batch
fmt.Println("failed to close batch:", err)
}
} //end readMessages

This function reads all messages from the partition and prints them to the console. Kafka’s ReadBatch() method reads a batch of the specified size limits in bytes through the connection and returns this batch. Before processing its messages, I created a byte slice of 10KB (a bit overkill for this example) so that I could read each message into it. One-by-one, in the unconditional for-loop, each message is read into the slice and printed.

The Read methods also returns the size of the current message, which I used to specify the length of the byte slice to print. In my program, each message is read onto the same slice, so there may be some extraneous characters at the end if the whole slice is printed.

It is also not necessary in this case to set a read deadline for these short messages. The batch.Read() call returns an EOF error once reaching the end of the batch and will break out of the for-loop in the succeeding statement.

Bonus: Reading Messages as a Consumer Group using kafka.Reader

//Read from the topic using kafka.Reader
//Readers can use consumer groups (but are not required to)
func readWithReader(topic string, groupID string){
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
GroupID: groupID, //optional. Created if DNE
Topic: topic,
MaxBytes: 100, //length per message
// more options are available
})

//Create a deadline
readDeadline, _ := context.WithDeadline(context.Background(),
time.Now().Add(5*time.Second))
for {
msg, err := r.ReadMessage(readDeadline) //after 5 seconds of no message, continues
if err != nil {
break
}
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n",
msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
}

if err := r.Close(); err != nil {
fmt.Println("failed to close reader:", err)
}
}

This topic appears further down in the documentation examples, and I took the opportunity to experiment a little with it because it can enable consumer groups. The previous function to read messages does not execute as part of a consumer group, and the offset(which message to start reading from) is not committed (running readMessages() multiple times reads all messages each time if the offset is not manually specified, as shown in the sample execution). However, calling this function to use the Reader starts reading from the last read message. With consumer groups, the server/broker will remember the offset of each partition for each group and automatically update it. Simply initialize the GroupID in the configuration in kafka.NewReader() to use consumer groups. The ID is automatically created if it does not already exist.

In addition to being able to use consumer groups, Readers simplify reading from the partition. It does not require a preexisting connection to the server (I don’t need to pass the *kafka.Conn from my Connect() function for the Reader to use) because it manages its own. There are probably other advantages of using it that I don’t know with my current experience.

One issue I came across while following the documentation example was that the Reader’s ReadMessage() never timed out after all messages were read, blocking the program from continuing. Its example has this line instead in the for-loop:

 m, err := r.ReadMessage(context.Background())

I knew nothing about the context package beforehand and did not realize that the line gave ReadMessage() no deadline. This method “blocks until a message becomes available [unless the program specifies] a context to asynchronously cancel the blocking operation,” according to the method listing in the documentation. Unlike my earlier read and write functions, (to my knowledge) there is no explicit .setReadDeadline or .setWriteDeadline for the Reader type — the context must be specified in each ReadMessage() call.

I fixed this by creating a context with a deadline of 5 seconds from now and passing it to ReadMessage().

//Create a deadline
readDeadline, _ := context.WithDeadline(context.Background(),
time.Now().Add(5*time.Second))
//...
msg, err := r.ReadMessage(readDeadline)

After 5 seconds, the program continues running from the next line and breaks out of the loop.

Conclusion

This is, of course, only a simple program that plays with the most basic of Kafka’s features, but it’s important to at least know how to read and write messages using the Go client. One could’ve simply copy-pasted the code from the documentation (as I did while starting out), but I hope this tutorial imparted a bit more insight and understanding of how that and my example work. Thank you for reading.