7 Concurrency Patterns in Go You Should Know 🐹
¿Hablas español? Ver versión de español.
Concurrency is one of Go’s most powerful features, and mastering it is crucial to building scalable and efficient applications. Here are 7 concurrency patterns in Go that you should know about.
1. Worker Pool
The Worker Pool pattern involves creating a fixed number of goroutines that process tasks from a shared queue. This pattern is useful for controlling the number of concurrent tasks, which is crucial for managing resource usage.
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d started job %d\n", id, job)
time.Sleep(time.Second)
fmt.Printf("Worker %d finished job %d\n", id, job)
results <- job * 2
}
}
func main() {
const numJobs = 5
const numWorkers = 3
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
var wg sync.WaitGroup
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
wg.Wait()
close(results)
for result := range results {
fmt.Println("Result:", result)
}
}
Real Scenario:
A web server handling incoming HTTP requests, where each request is processed by a worker from the pool.
2. Fan-Out, Fan-In
Fan-Out is when multiple goroutines are started to process data, and Fan-In is when the results of these goroutines are combined into a single pipeline. This pattern is useful for parallel processing and then collecting the results.
package main
import (
"fmt"
"sync"
)
func producer(id int, ch chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 5; i++ {
ch <- i
fmt.Printf("Producer %d produced %d\n", id, i)
}
}
func consumer(id int, in <-chan int, out chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for v := range in {
out <- v * 2
fmt.Printf("Consumer %d processed %d\n", id, v)
}
}
func main() {
numProducers := 2
numConsumers := 2
input := make(chan int, 10)
output := make(chan int, 10)
var wg sync.WaitGroup
for i := 1; i <= numProducers; i++ {
wg.Add(1)
go producer(i, input, &wg)
}
wg.Wait()
close(input)
for i := 1; i <= numConsumers; i++ {
wg.Add(1)
go consumer(i, input, output, &wg)
}
wg.Wait()
close(output)
for result := range output {
fmt.Println("Result:", result)
}
}
Real Scenario:
A data processing pipeline where different processing stages are handled by different sets of workers.
3. Pipeline
The Pipeline pattern involves chaining together a series of stages where each stage performs a transformation on the data and passes it on to the next stage. It is useful in scenarios where data needs to go through multiple sequential processing steps.
package main
import "fmt"
func stage1(nums []int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func stage2(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * 2
}
close(out)
}()
return out
}
func stage3(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n + 1
}
close(out)
}()
return out
}
func main() {
nums := []int{1, 2, 3, 4, 5}
c1 := stage1(nums)
c2 := stage2(c1)
c3 := stage3(c2)
for result := range c3 {
fmt.Println(result)
}
}
Real Scenario:
An image processing system where an image goes through several stages such as resizing, filtering and encoding.
4. Publish-Subscribe
The Publish-Subscribe pattern allows messages to be published to multiple subscribers. This pattern is useful in systems where different services need to react independently to certain events or message types.
package main
import (
"fmt"
"sync"
"time"
)
type PubSub struct {
mu sync.Mutex
channels map[string][]chan string
}
func NewPubSub() *PubSub {
return &PubSub{
channels: make(map[string][]chan string),
}
}
func (ps *PubSub) Subscribe(topic string) <-chan string {
ch := make(chan string)
ps.mu.Lock()
ps.channels[topic] = append(ps.channels[topic], ch)
ps.mu.Unlock()
return ch
}
func (ps *PubSub) Publish(topic, msg string) {
ps.mu.Lock()
for _, ch := range ps.channels[topic] {
ch <- msg
}
ps.mu.Unlock()
}
func (ps *PubSub) Close(topic string) {
ps.mu.Lock()
for _, ch := range ps.channels[topic] {
close(ch)
}
ps.mu.Unlock()
}
func main() {
ps := NewPubSub()
subscriber1 := ps.Subscribe("news")
subscriber2 := ps.Subscribe("news")
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for msg := range subscriber1 {
fmt.Println("Subscriber 1 received:", msg)
}
}()
go func() {
defer wg.Done()
for msg := range subscriber2 {
fmt.Println("Subscriber 2 received:", msg)
}
}()
ps.Publish("news", "Breaking News!")
ps.Publish("news", "Another News!")
time.Sleep(time.Second)
ps.Close("news")
wg.Wait()
}
Real-world Scenario:
A messaging system where different services subscribe to certain types of events or messages.
5. Select con Timeout
Using the select statement with a timeout allows you to avoid indefinite blocks. This pattern is useful when you want to perform an action or abort if an operation takes too long.
package main
import (
"fmt"
"time"
)
func main() {
c := make(chan string)
go func() {
time.Sleep(2 * time.Second)
c <- "result"
}()
select {
case res := <-c:
fmt.Println("Received:", res)
case <-time.After(1 * time.Second):
fmt.Println("Timeout")
}
}
Real Scenario:
A network client attempts to connect to a server and stops if the server does not respond in time.
6. Semaphore
A Semaphore limits the number of goroutines that can access a particular resource concurrently. This pattern is useful for controlling concurrency and avoiding resource overload.
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, sem chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
sem <- struct{}{} // Adquirir semáforo
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
<-sem // Liberar semáforo
}
func main() {
const numWorkers = 5
const maxConcurrent = 2
sem := make(chan struct{}, maxConcurrent)
var wg sync.WaitGroup
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, sem, &wg)
}
wg.Wait()
}
Real Scenario:
A database connection pool where a limited number of connections are allowed at a time.
7. Rate Limiting
Rate Limiting controls the rate at which events are processed using a ticker. This pattern is useful when you need to control the frequency of certain tasks, such as requests to an API.
package main
import (
"fmt"
"time"
)
func main() {
rate := time.Second
ticker := time.NewTicker(rate)
defer ticker.Stop()
requests := make(chan int, 5)
for i := 1; i <= 5; i++ {
requests <- i
}
close(requests)
for req := range requests {
<-ticker.C // Esperar el siguiente tick
fmt.Println("Processing request", req)
}
}
Real Scenario:
An API gateway that limits the number of requests a user can make in a given period of time.
Conclusion
Concurrency patterns in Go are essential for building efficient and scalable applications. Mastering these patterns will allow you to handle concurrency effectively, optimizing resource usage and improving the performance of your applications.
Thanks for reading! If you enjoyed this post, be sure to share it and follow me for more content about Go and software development.