- read

Mastering Goroutines and Channels: A Dance of Concurrency in Go!

Rahul Chopra 113

GoRoutines

So, a GoRoutine is a function that executes simultaneously with other goroutines. We use goroutines to fire up multiple threads, which helps us to achieve concurrency.

Concurrency is a program’s ability to run more than one task independently over overlapping periods.

What is the difference between concurrency and parallelism?

func randSleep(name String, limit int, sleep int){
for(i=1;i<=n;i++){
fmt.println(name,rand.Intn(i))
time.sleep(time.Duration(sleep*int(time.second)))
}
}

func main(){
go randSleep("first:",4,3)
go randSleep("second:",4,3)
}

The above program will not print anything out in the terminal because the main function completes before the goroutines execute.

But if there is any sequential code after the goroutine, then for the time being, that goroutine will execute until the execution of the sequential code. For example:

func randSleep(name String, limit int, sleep int){
for i=1;i<=n;i++{
fmt.println(name,rand.Intn(i))
time.sleep(time.Duration(sleep*int(time.second)))
}
}

func main(){
go randSleep("first:",4,3)
randSleep("second:",4,3)
}

Now, in the above code, the goroutine will run concurrently with sequential code, irrespective of whether the goroutine is completed or not, because it is waiting for the execution of our sequential code.

To overcome the above problem, we will use wait groups, which is a part of the sync package.

Wait Groups

So, Wait Groups allow a program to wait for specified goroutines. These are sync mechanisms in Golang that block the execution of the program until goroutines in the WaitGroup completely execute.

func randSleep(wg *sync.WaitGroup, name String, limit int, sleep int){

defer wg.Done() //altering/subtracting the counter of goroutine when it completes
for i:=1;i<=limit;i++{
fmt.println(name,rand.Intn(i))
}
}

func main(){
wg:=new(sync.WaitGroup) //creates a new wait group

wg.Add(2) //it informs that it must wait for two goroutines

go randSleep(wg, "first:",5,3)
go randSleep(wg,"second:",5,3)

wg.Wait() //block the execution until the goroutine's execution completes.
}

So, the whole process is like adding to a counter in wg.Add() subtracts from the counter in wg.Done(), and waiting for the counter to hit 0 in wg.Wait().

Channels

It is a communication mechanism that allows GoRoutines to exchange data. Channels are bidirectional, which means either party can send or receive a message.

Bidirectional channels in Go are blocked, meaning that when sending data into a channel, Go waits until the data is read from the channel before execution continues.

func writeChannel(wg *sync.WaitGroup, ch chan int, stop int){
defer wg.Done()
for i:=1;i<=stop;i++{
ch<-i
}
}

func readChannel(wg *sync.WaitGroup, ch chan int, stop int){
defer wg.Done()
for i:=1;i<=stop;i++{
fmt.println(<-ch)
}
}

func main(){
wg:=new(sync.WaitGroup)
wg.Add(2)
limitChannel:=make(chan int) //unbuffered channel
defer close(limitChannel)
go writeChannel(wg,limitChannel,3)
go readChannel(wg,limitChannel,3)
wg.Wait()
}

So, the number of sending operations must be equal to the number of receiving operations; otherwise, we will get deadlock.

How can deadlock occur in the above code?

If the `writeChannel` goroutine writes more values to the channel than the `read Channel` goroutine can read, the `writeChannel` goroutine will eventually block when attempting to write, because the channel’s buffer is full.

Since the `writeChannel` goroutine is blocked and cannot proceed, it cannot reach the `wg.Done()` statement that would decrement the `sync.WaitGroup` counter.

As a result, the `wg.Wait()` call in the `main` function will never return, and the program will be stuck in a deadlock state.

Types of channels in go

Go mainly consists of two types of channels: buffered channels and Unbuffered channels. We are going to see how both of them are different and what their benefits are.

Buffered Channels

  1. We can create a channel that stores several values, meaning sending data into the channel won’t block until we exceed its capacity.
  2. In buffered channels, communication between goroutines is asynchronous. Asynchronous basically means that our program is not going to be in a block state, and the rest of its functionality will be in progress.
  3. A buffered channel follows the queue data structure, i.e., values inserted first in the channel will pop out first.
  4. In the case of buffered channels, the sender’s goroutine will not get blocked even if we populate the value in the channel. It will send the data onto the channel, and that data will essentially get queued, and that goroutine will continue doing what it was supposed to do.
func writeChannel(wg *sync.WaitGroup, limitchannel chan int, stop int) {
defer wg.Done()
for i := 1; i <= stop; i++ {
limitchannel <- i
}
}

func main() {
wg := new(sync.WaitGroup)
wg.Add(1)
limitchannel:= make(chan int, 2)
defer close(limitchannel)
go writeChannel(wg, limitchannel, 2)
wg.Wait()
fmt.Println(<-limitchannel)
fmt.Println(<-limitchannel)
}

What if the buffered channel reached its capacity?

So, if the buffered channel were actually full and the sending goroutine tried to put more data onto the channel at that point, that particular goroutine would be blocked until data was read from this channel.

Unbuffered Channels

  1. By default, channels are unbuffered, i.e., they need a receiver as soon as a message is emitted to the channel.
  2. This sender’s goroutine will be in a block state until the receiver's goroutine reads from the channel.
  3. The sender’s goroutine by default moves to the waiting state once we populate the data into the channel.
  4. Unbuffered channels only allow goroutines to communicate synchronously. Synchronous communication is when the sender needs to await the response from the receiver.

Unbuffered Channels: Fork-Join Model

func main(){
myChannel:=make(chan string)

//This is my anonymous function
go func(){
myChannel <- "data"
}() //here I'm invoking the function

msg:=<-myChannel
fmt.Println(msg)
}

In the above example, the go routine that is created by main has been forked (released from main) and is doing something. But we are re-joining it with the help of a channel.

So, the main will not terminate until we receive the value from the channel or close the channel.

select

A select statement is going to block the execution of the main function until one of its cases can run, and once it receives the message from the channel, it is going to execute the code within that block.

If multiple channels are receiving values in the select statement at the same time, then it is going to choose any one of them on a random basis.

func main(){

go func(){
myChannel <- "data"
}()

go func(){
anotherChannel <- "data data"
}()

select{
case msgFromMyChannel := <-myChannel:
fmt.Println(msgFromMyChannel)

case msgFromAnotherChannel := <-anotherChannel:
fmt.Println(msgFromAnotherChannel)
}
}
func main(){

charChannel:= make(chan string,3)
chars:=[]string{"a","b","c"}

for _,s:=range chars{
select{
case charChannel <- s:
}
}

close(charChannel)
for result:= range charChannel{
fmt.Println(result)
}
}

Close Channels

A closed channel in Golang denotes a case where we want to show that work has been done on this channel and there is no need for it to be open.

Once we close the channel, that channel will not accept any more value, and it will send a message that will indicate that the channel is closed.

func doWork(done <-chan bool){ //this way, the channel will behave as a read-only channel.
for{

Select{
case <- done:
return
default:
fmt.Println("Doing Work")
}
}
}

func main(){
done:= make(chan bool)
go doWork(done)
time.Sleep(time.second*3)
close(done) // The main go routine will send a signal after 3 seconds to its child goroutine, and by that way it will terminate the child goroutine.
}

Note: As The Go Programming Language stated, there is no need to close every channel when you are finished with it. It’s only necessary to close a channel when it is important to tell the receiving goroutines that all the data has been sent.

Pipeline

As per definition, it’s just one of many kinds of concurrent design patterns. Informally, a pipeline is a group of stages connected by channels and handled by goroutines.

There can be any number of inbound and outgoing channels in each step, with the exception of the first and last stages, which contain just outbound or inbound channels, respectively.

start (input) -> stage 1 (do something) -> stage 2 (do something) -> end

The above-designed model is called a "pipeline, in which each stage is responsible for doing some operation and we keep on passing data from one stage to another.

func sliceToChannel(nums []int) <-chan int{ //it will return read-only channel
out:= make(chan int)
go func(){
for _,n := range nums{
out <- n
}
close(out)
}()
return out
}
  • Explanation of the above function: So, we have made an unbuffered channel, which means it will be blocked until we read the value from the channel. So, once we insert the value in our channel inside the loop, that loop will be blocked until we read the value. So, the return statement will get executed, and in the background, the goroutine is still running.
func sq(in <-chan int) <-chan int{
out:= make(chan int)
go func(){
for n:= range in{
out <- n*n
}
close(out)
}()
return out
}
  • Explanation of the above function: Now, In this function, we are looping over the channel that we returned from the sliceToChannel function. So, our channel contains a value that we will read, and once we are done with the reading part, the next value will be inserted into the channel via the goroutine that is running in the background of the sliceToChannel function. So, both functions will follow a synchronous pattern.
func main(){
//input
nums:=[]int{2,3,4,7,1}
//stage 1
dataChannel:= sliceToChannel(nums)
//stage 2
finalChannel := sq(dataChannel)
//stage 3
for n:=range finalChannel{
fmt.Println(n)
}
}