Golang concurrency
Notes on golang concurrency
Goroutines and Channels
Goroutines is the golang way to support concurrency, or in other words, execute more than one task simultaneously. See the following code block:
package main
import (
"fmt"
"sync"
)
var wg = sync.WaitGroup{}
func f(n int) {
fmt.Println(n, "text")
wg.Done()
}
func main() {
for i := 0; i < 10; i++ {
wg.Add(1)
go f(i)
}
wg.Wait()
}
Running the above code block we’d have:
2 text
6 text
5 text
7 text
3 text
8 text
4 text
9 text
0 text
1 text
By colocating go
keyword before a function call, we are creating 10 goroutines (inside the for loop) and all of them run simultaneously. That’s why the output doesn’t have a specific order, they are concurrent!
Note that we are also using sync.WaitGroup
to sync the goroutine and acknowledge when a goroutine is done.
The WaitGroup in wg
starts at 0.
The wg.Add(1)
indicates we have one more goroutine to run.
The wg.Wait()
waits for the goroutines to be executed all, waits for the WaitGroup be 0 again so it can exit.
The wg.Done()
decrements the WaitGroup by -1, so it indicates the goroutine is done.
When running multiple goroutines that interact with each other, its common that they’ll run into race condition. The following example:
package main
import (
"fmt"
"sync"
)
var wg = sync.WaitGroup{}
var counter = 0
func main() {
for i := 0; i < 10; i++ {
wg.Add(2)
go sayHello()
go increment()
}
wg.Wait()
}
func sayHello() {
fmt.Printf("Hello #%v
", counter)
wg.Done()
}
func increment() {
counter++
wg.Done()
}
won’t yield a predictable output like Hello #1, Hello #2 ...
We can then use a concept called Mutex
to help manage the values and make sure only one goroutine interacts with it at a time. Mutex
gives the ability to lock and unlock the app. If the app is locked and someone tries to manipulate that value, it has to wait until it gets unlocked.
We can then, protect our code so that only one entity can manipulate the data at a single time. It protect from race conditions.
RXMutex
means ReadWrite Mutex. It means everybody can read the data but only one can modify (write) at a single time. And if something is reading the data, we can’t write to it.
So we have a infinite number of readers and only one writer. The writer has to wait until every reader reads the data, and only then it will be able to write. When writing to the data, it locks the data and no one can read or write to it, until it unlocks.
var wg = sync.WaitGroup{}
var counter = 0
var m = sync.RWMutex{}
func main() {
for i := 0; i < 10; i++ {
wg.Add(2)
m.RLock()
go sayHello()
m.Lock()
go increment()
}
wg.Wait()
}
func sayHello() {
fmt.Printf("Hello #%v
", counter)
m.RUnlock()
wg.Done()
}
func increment() {
counter++
m.Unlock()
wg.Done()
}
So we are using the m.RLock()
to lock the read in the sayHello
func. So after it prints the phrase, we unlock the read with m.RUnlock()
.
Then, in the increment
function we are modifiying the data so we have to call m.Lock
before, then we increment the value (counter++
) and the call m.Unlock()
.
- Note we are calling the
m.RLock()
andm.Lock()
before we call the functions. It’s because they have to be outside the goroutines to be able to lock those functions.
Difference between WaitGroup
and Mutex
: the WaitGroup
makes sure the app (or the following code block) waits until the goroutine is executed and the Mutex
allows multiple goroutines to interact with a value in a “synchronous” way, one at each time.
Channels
Channels are a way for goroutines to communicate with each other, pass data between them. We can declare a chanel like this:
var c chan string = make(chan string)
// or
c := make(chan string)
We can then pass values to the channel like c <- "some value"
, and consume the values from the channel like msg := <- c
. The arrow points to the direction where the value will be placed.
When a goroutine sends a messsage to the channel, it will wait until the consumer goroutine has received the message.
var wg = sync.WaitGroup{}
func pinger(c chan string) {
for i := 0; ; i++ {
c <- "ping"
}
}
func ponger(c chan string) {
for i := 0; ; i++ {
c <- "pong"
}
}
func printer(c chan string) {
for {
msg := <-c
fmt.Println(msg)
time.Sleep(time.Second * 1)
}
}
func main() {
var c chan string = make(chan string)
wg.Add(3)
go pinger(c)
go ponger(c)
go printer(c)
wg.Wait()
}
Note that we pass a copy of the data to the channel, so if the data is modified afterwards, it won’t be updated in the channel.
A channel by default sends and receive data, but we can scope down a channel for just sending or receiving data, following the pattern:
ch := make(chan int)
go func(ch <-chan int) {
i := <- ch
fmt.Println(i)
wg.Done()
}(ch)
go func(ch chan <- int) {
ch <- 42
wg.Done()
}(ch)
By receiving the channel as param, we can control the channel’s behavior:
ch <-chan int
it means data is flowing out of the channel. (receiving)
ch chan <- int
means we are sending data into the channel (chan) (sending)
We also have the ability to create “buffered” channels. Channels are synchronous in nature, but by specifying a third parameter when creating a channel, we can define a buffer so the channel becomes async:
var wg = sync.WaitGroup{}
func main() {
ch := make(chan int, 50)
wg.Add(2)
go func(ch <-chan int) {
for {
if i, ok := <- ch; ok {
fmt.Println(i)
} else {
break
}
}
wg.Done()
}(ch)
go func(ch chan<- int) {
ch <- 42
ch <- 27
close(ch)
wg.Done()
}(ch)
wg.Wait()
}
Buffers are used when your sender or receiver needs more time to process the data. For example, when a sender has too much information to send, the receiver has to deal with that somehow. That’s when we need buffers to put the data while the receiver process the incoming data.
Note that in the code above we are using close(ch)
in the sender to notify we are done inputing data (and can close the channel), so the application doesn’t run into a deadlock error mode.
On the receiver side, we loop through the channel and use the comma okay syntax to close it by breaking the loop if ok
is false.