A concorrência é um dos recursos mais poderosos de Go. Enquanto muitas linguagens tratam concorrência como um recurso avançado complexo, Go a torna acessível através de goroutines e channels. Este tutorial explora patterns avançados que você encontrará em aplicações Go de produção.
Pré-requisitos: Conhecimento básico de Go (variáveis, funções, structs) e familiaridade com goroutines e channels. Se você é novo em Go, confira nosso guia para iniciantes.
Sumário dos Patterns
| Pattern | Use Case | Complexidade |
|---|---|---|
| Worker Pool | Processar tarefas em paralelo com controle | ⭐⭐ |
| Fan-Out/Fan-In | Distribuir trabalho e agregar resultados | ⭐⭐⭐ |
| Pipeline | Processar dados em estágios sequenciais | ⭐⭐ |
| Rate Limiting | Controlar throughput de requisições | ⭐⭐⭐ |
| Context Cancellation | Cancelar operações de forma segura | ⭐⭐ |
| Select Statement | Multiplexar channels | ⭐⭐⭐ |
1. Worker Pool Pattern
O pattern Worker Pool é essencial quando você precisa processar um grande número de tarefas de forma concorrente, mas quer limitar o número de goroutines simultâneas.
Problema
Imagine que você precisa processar 10.000 arquivos. Iniciar 10.000 goroutines simultâneas esgotaria recursos rapidamente.
Solução: Worker Pool
package main
import (
"context"
"fmt"
"sync"
"time"
)
// Job representa uma unidade de trabalho
type Job struct {
ID int
Data string
}
// Result representa o resultado do processamento
type Result struct {
JobID int
Value int
Error error
}
// Worker processa jobs do canal
type Worker struct {
ID int
jobs <-chan Job
results chan<- Result
wg *sync.WaitGroup
}
func (w *Worker) Start(ctx context.Context) {
defer w.wg.Done()
for {
select {
case job, ok := <-w.jobs:
if !ok {
fmt.Printf("Worker %d: canal fechado, encerrando\n", w.ID)
return
}
w.processJob(job)
case <-ctx.Done():
fmt.Printf("Worker %d: contexto cancelado, encerrando\n", w.ID)
return
}
}
}
func (w *Worker) processJob(job Job) {
// Simula processamento
time.Sleep(100 * time.Millisecond)
result := Result{
JobID: job.ID,
Value: len(job.Data) * 10,
}
fmt.Printf("Worker %d: processou Job %d\n", w.ID, job.ID)
w.results <- result
}
// Pool gerencia os workers
type Pool struct {
numWorkers int
jobs chan Job
results chan Result
workers []*Worker
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
func NewPool(numWorkers int) *Pool {
ctx, cancel := context.WithCancel(context.Background())
return &Pool{
numWorkers: numWorkers,
jobs: make(chan Job, 100),
results: make(chan Result, 100),
ctx: ctx,
cancel: cancel,
}
}
func (p *Pool) Start() {
for i := 1; i <= p.numWorkers; i++ {
worker := &Worker{
ID: i,
jobs: p.jobs,
results: p.results,
wg: &p.wg,
}
p.workers = append(p.workers, worker)
p.wg.Add(1)
go worker.Start(p.ctx)
}
}
func (p *Pool) Submit(job Job) {
p.jobs <- job
}
func (p *Pool) Results() <-chan Result {
return p.results
}
func (p *Pool) Shutdown() {
close(p.jobs)
p.wg.Wait()
close(p.results)
}
func main() {
// Cria pool com 5 workers
pool := NewPool(5)
pool.Start()
// Goroutine para coletar resultados
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for result := range pool.Results() {
fmt.Printf("Resultado recebido: Job %d = %d\n",
result.JobID, result.Value)
}
}()
// Envia 15 jobs
for i := 1; i <= 15; i++ {
job := Job{
ID: i,
Data: fmt.Sprintf("Dados do job %d", i),
}
pool.Submit(job)
}
// Aguarda processamento
pool.Shutdown()
wg.Wait()
fmt.Println("\nPool encerrado com sucesso!")
}
Quando Usar Worker Pool
| Use quando… | Evite quando… |
|---|---|
| Processar muitas tarefas com paralelismo limitado | Você tem poucas tarefas (overhead não vale a pena) |
| Precisar de controle de recursos | Cada tarefa depende fortemente das outras |
| Implementar APIs com rate limiting | Existem requirements de ordem estrita |
2. Fan-Out / Fan-In Pattern
Fan-Out distribui trabalho entre múltiplas goroutines. Fan-In combina os resultados de volta em um único canal.
Problema
Processar dados de múltiplas fontes simultaneamente e agregar resultados.
Implementação
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// Generator (Producer) - fan-out source
func Generator(id string, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
fmt.Printf("%s gerando: %d\n", id, n)
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
out <- n
}
}()
return out
}
// Worker - processa dados (fan-out)
func Worker(id int, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
// Processamento pesado simulado
result := n * n
fmt.Printf("Worker %d: %d -> %d\n", id, n, result)
time.Sleep(50 * time.Millisecond)
out <- result
}
}()
return out
}
// Merge - fan-in, combina múltiplos canais
func Merge(cs ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
// Função auxiliar para copiar de um canal
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
out <- n
}
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Goroutine para fechar out quando todos terminarem
go func() {
wg.Wait()
close(out)
}()
return out
}
// Demonstração do pattern
func main() {
rand.Seed(time.Now().Unix())
// === FAN-OUT ===
// Múltiplas fontes gerando dados
gen1 := Generator("Fonte-A", 1, 2, 3, 4, 5)
gen2 := Generator("Fonte-B", 6, 7, 8, 9, 10)
// Distribuir para múltiplos workers
worker1 := Worker(1, gen1)
worker2 := Worker(2, gen2)
worker3 := Worker(3, gen1) // Reutiliza gen1
// === FAN-IN ===
// Combinar resultados em um único canal
merged := Merge(worker1, worker2, worker3)
// Consumir resultados
var sum int
for result := range merged {
sum += result
fmt.Printf("Resultado recebido: %d (soma parcial: %d)\n", result, sum)
}
fmt.Printf("\nSoma total: %d\n", sum)
}
Aplicação Real: Web Crawler
// Crawler usa fan-out/fan-in para processar URLs
func Crawl(urls []string) []Page {
var crawlers []<-chan Page
// Fan-out: múltiplos crawlers
for i := 0; i < numCrawlers; i++ {
crawlers = append(crawlers, Crawler(urls[i::numCrawlers]))
}
// Fan-in: merge dos resultados
pages := []Page{}
for page := range Merge(crawlers...) {
pages = append(pages, page)
}
return pages
}
3. Pipeline Pattern
Um Pipeline processa dados através de múltiplos estágios sequenciais, onde cada estágio pode executar concorrentemente.
Implementação de Pipeline
package main
import (
"fmt"
"strings"
)
// === ESTÁGIOS DO PIPELINE ===
// 1. Generator - produz dados
func Generator(files ...string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for _, file := range files {
out <- file
}
}()
return out
}
// 2. ReadFile - lê conteúdo
func ReadFile(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for filename := range in {
// Simula leitura
content := fmt.Sprintf("conteúdo do arquivo: %s", filename)
out <- content
}
}()
return out
}
// 3. ProcessContent - processa texto
func ProcessContent(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for content := range in {
// Processamento: conta palavras
words := strings.Fields(content)
processed := fmt.Sprintf("%s | palavras: %d",
content, len(words))
out <- processed
}
}()
return out
}
// 4. SaveResults - salva resultados
func SaveResults(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for data := range in {
// Simula salvamento
saved := fmt.Sprintf("[SALVO] %s", data)
out <- saved
}
}()
return out
}
// Compose conecta estágios
func Compose(stages ...func(<-chan string) <-chan string) func(<-chan string) <-chan string {
return func(source <-chan string) <-chan string {
c := source
for _, stage := range stages {
c = stage(c)
}
return c
}
}
func main() {
// Compor pipeline
pipeline := Compose(
ReadFile,
ProcessContent,
SaveResults,
)
// Executar
files := Generator("doc1.txt", "doc2.txt", "doc3.txt")
results := pipeline(files)
// Consumir
for result := range results {
fmt.Println(result)
}
}
Pipeline Paralelo (Com Buffer)
Para melhorar throughput, adicionamos parallelism a estágios individuais:
// ParallelStage executa múltiplas instâncias de um estágio
func ParallelStage(workers int, stage func(<-chan string) <-chan string)
func(<-chan string) <-chan string {
return func(in <-chan string) <-chan string {
// Fan-out para múltiplos workers
var workers_out []<-chan string
for i := 0; i < workers; i++ {
workers_out = append(workers_out, stage(in))
}
// Fan-in dos resultados
return Merge(workers_out...)
}
}
// Uso: paralelizar estágio de processamento
processParallel := ParallelStage(4, ProcessContent)
4. Rate Limiting com Channels
Controle de taxa é essencial para não sobrecarregar APIs ou serviços externos.
Rate Limiting Básico
package main
import (
"context"
"fmt"
"time"
)
// RateLimiter controla requests por segundo
type RateLimiter struct {
requests chan struct{}
interval time.Duration
}
func NewRateLimiter(reqPerSec int) *RateLimiter {
rl := &RateLimiter{
requests: make(chan struct{}, reqPerSec),
interval: time.Second / time.Duration(reqPerSec),
}
// Preenche o bucket
for i := 0; i < reqPerSec; i++ {
rl.requests <- struct{}{}
}
// Refill contínuo
go rl.refill(reqPerSec)
return rl
}
func (rl *RateLimiter) refill(reqPerSec int) {
ticker := time.NewTicker(rl.interval)
defer ticker.Stop()
for range ticker.C {
select {
case rl.requests <- struct{}{}:
// Adicionou token
default:
// Bucket cheio, ignora
}
}
}
func (rl *RateLimiter) Acquire(ctx context.Context) error {
select {
case <-rl.requests:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// Uso
func main() {
// Permite 5 requisições por segundo
limiter := NewRateLimiter(5)
ctx := context.Background()
for i := 1; i <= 15; i++ {
if err := limiter.Acquire(ctx); err != nil {
fmt.Printf("Erro: %v\n", err)
return
}
fmt.Printf("Request %d executado em %s\n", i, time.Now().Format("15:04:05.000"))
}
}
Rate Limiting Avançado: Token Bucket
package main
import (
"context"
"sync"
"time"
)
// TokenBucket implementa token bucket algorithm
type TokenBucket struct {
capacity int
tokens int
refillRate time.Duration
mu sync.Mutex
lastRefill time.Time
}
func NewTokenBucket(capacity int, refillRate time.Duration) *TokenBucket {
return &TokenBucket{
capacity: capacity,
tokens: capacity,
refillRate: refillRate,
lastRefill: time.Now(),
}
}
func (tb *TokenBucket) refill() {
now := time.Now()
elapsed := now.Sub(tb.lastRefill)
// Calcular tokens a adicionar
tokensToAdd := int(elapsed / tb.refillRate)
if tokensToAdd > 0 {
tb.tokens = min(tb.capacity, tb.tokens + tokensToAdd)
tb.lastRefill = now
}
}
func (tb *TokenBucket) TryAcquire() bool {
tb.mu.Lock()
defer tb.mu.Unlock()
tb.refill()
if tb.tokens > 0 {
tb.tokens--
return true
}
return false
}
func (tb *TokenBucket) Acquire(ctx context.Context) error {
for {
if tb.TryAcquire() {
return nil
}
select {
case <-time.After(tb.refillRate):
// Tenta novamente após refill
case <-ctx.Done():
return ctx.Err()
}
}
}
5. Context Package para Cancellation
O context package é essencial para gerenciar timeouts e cancellation em operações concorrentes.
Padrões de Uso
package main
import (
"context"
"fmt"
"time"
)
// ProcessTask com timeout
type Task struct {
ID int
Data string
}
func ProcessTask(ctx context.Context, task Task) (string, error) {
// Cria context com timeout específico para esta tarefa
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
result := make(chan string, 1)
go func() {
// Simula processamento pesado
time.Sleep(time.Duration(100+task.ID*100) * time.Millisecond)
result <- fmt.Sprintf("Task %d processada: %s", task.ID, task.Data)
}()
select {
case res := <-result:
return res, nil
case <-ctx.Done():
return "", fmt.Errorf("task %d cancelada: %w", task.ID, ctx.Err())
}
}
// Worker Pool com Context
func WorkerPool(ctx context.Context, tasks []Task, workers int) []string {
var wg sync.WaitGroup
taskChan := make(chan Task, len(tasks))
resultChan := make(chan string, len(tasks))
// Start workers
for i := 0; i < workers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for task := range taskChan {
select {
case <-ctx.Done():
fmt.Printf("Worker %d: cancelado\n", id)
return
default:
result, err := ProcessTask(ctx, task)
if err != nil {
resultChan <- fmt.Sprintf("Erro: %v", err)
} else {
resultChan <- result
}
}
}
}(i)
}
// Send tasks
go func() {
for _, task := range tasks {
select {
case taskChan <- task:
case <-ctx.Done():
close(taskChan)
return
}
}
close(taskChan)
}()
// Wait and collect
go func() {
wg.Wait()
close(resultChan)
}()
var results []string
for res := range resultChan {
results = append(results, res)
}
return results
}
// Propagation de Context
func ProcessWithChildContext(parentCtx context.Context) {
// Context com valor
ctx := context.WithValue(parentCtx, "user_id", "12345")
// Context com deadline
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(5*time.Second))
defer cancel()
// Passar para funções downstream
ProcessTask(ctx, Task{ID: 1, Data: "teste"})
}
Graceful Shutdown
func GracefulServer() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()
srv := &http.Server{Addr: ":8080"}
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("Erro: %v\n", err)
}
}()
fmt.Println("Servidor rodando. Pressione Ctrl+C para parar.")
<-ctx.Done()
fmt.Println("\nShutdown iniciado...")
// Cleanup com timeout
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := srv.Shutdown(shutdownCtx); err != nil {
log.Printf("Erro no shutdown: %v", err)
}
fmt.Println("Servidor encerrado com sucesso")
}
6. Select Statement Avançado
O select statement permite esperar por múltiplas operações de channels.
Pattern: Non-Blocking Send/Receive
select {
case ch <- data:
// Enviou com sucesso
default:
// Channel cheio, não bloqueia
}
// Non-blocking receive
select {
case data := <-ch:
// Recebeu dados
default:
// Sem dados disponíveis
}
Pattern: Timeout
func WithTimeout() {
ch := make(chan string)
select {
case result := <-ch:
fmt.Println(result)
case <-time.After(5 * time.Second):
fmt.Println("Timeout!")
}
}
Pattern: Priority Select
func PrioritySelect(hi, lo <-chan string) {
for {
select {
case msg := <-hi:
// Alta prioridade
fmt.Printf("HI: %s\n", msg)
default:
select {
case msg := <-hi:
fmt.Printf("HI: %s\n", msg)
case msg := <-lo:
fmt.Printf("LO: %s\n", msg)
}
}
}
}
Pattern: Sensor/Aggregator
func Aggregator(sensors ...<-chan Reading) <-chan Alert {
alerts := make(chan Alert)
go func() {
defer close(alerts)
// Timeout entre leituras
timeout := time.NewTimer(1 * time.Minute)
defer timeout.Stop()
for {
select {
case reading := <-sensors[0]:
if reading.Temperature > 100 {
alerts <- Alert{Sensor: 0, Level: Critical}
}
timeout.Reset(1 * time.Minute)
case reading := <-sensors[1]:
if reading.Pressure < 10 {
alerts <- Alert{Sensor: 1, Level: Warning}
}
timeout.Reset(1 * time.Minute)
case <-timeout.C:
alerts <- Alert{Level: Timeout, Message: "Sem dados"}
return
}
}
}()
return alerts
}
Comparação com Outras Linguagens
| Language | Concurrency Model | Primitivas | Nota |
|---|---|---|---|
| Go | CSP (Communicating Sequential Processes) | Goroutines, Channels | Simples, built-in |
| Rust | async/await + ownership | Tokio, async-std | Seguro em compile-time |
| Java | Threads + Executors | CompletableFuture, Streams | Mais verboso |
| Python | async/await (single-threaded) | asyncio, coroutines | GIL limita parallelism |
| Node.js | Event loop | Promises, async/await | Single-threaded |
Vantagens de Go:
- Goroutines são leves (2KB stack inicial)
- Channels são type-safe
- Garbage collector gerencia goroutines
- Sem complexidade de ownership (vs Rust)
Melhores Práticas
✅ Faça
- Sempre use
defer close(ch)quando apropriado - Use
contextpara timeout e cancellation - Adicione buffer quando conhecer a carga
- Documente se um channel é usado para sinalização ou dados
- Use select para timeouts e multiplexação
❌ Evite
// ❌ Não: partilhar memória por comunicação
var counter int
var mu sync.Mutex
go func() {
mu.Lock()
counter++
mu.Unlock()
}()
// ✅ Sim: comunicar por channels
updates := make(chan int)
go func() {
updates <- 1
}()
counter += <-updates
// ❌ Não: deixar goroutines orfãs
go longRunningTask()
// Sem forma de cancelar!
// ✅ Sim: usar context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go longRunningTask(ctx)
Exercícios Práticos
1. Implemente um Throttler
Crie um rate limiter que permite “bursts” de requests.
Ver solução
type Throttler struct {
maxBurst int
tokens chan struct{}
}
func NewThrottler(maxBurst int, refillRate time.Duration) *Throttler {
t := &Throttler{
maxBurst: maxBurst,
tokens: make(chan struct{}, maxBurst),
}
// Pre-fill
for i := 0; i < maxBurst; i++ {
t.tokens <- struct{}{}
}
go t.refill(refillRate)
return t
}
2. Merge com Prioridade
Modifique o Merge para priorizar canais específicos.
3. Pipeline com Error Handling
Adicione tratamento de errors a cada estágio do pipeline.
Conclusão
Os patterns de concorrência de Go são poderosos e expressivos:
✅ Worker Pools para controle de recursos
✅ Fan-Out/Fan-In para processamento paralelo
✅ Pipelines para fluxos de processamento
✅ Rate Limiting para proteção de recursos
✅ Context para cancellation e timeouts
✅ Select para multiplexação elegante
Próximos Passos
- 📚 Golang para Iniciantes — revisar fundamentos
- 🌐 Criando APIs REST em Go — aplicar patterns em APIs
- ⚡ Performance em Go — otimizar goroutines
Recursos
- 🎥 Go Concurrency Patterns - Rob Pike
- 📖 Go Memory Model
- 🔧 go.uber.org/goleak - detectar goroutine leaks
Tem dúvidas sobre concorrência em Go? Participe da nossa comunidade no Discord!