A concorrência é um dos recursos mais poderosos de Go. Enquanto muitas linguagens tratam concorrência como um recurso avançado complexo, Go a torna acessível através de goroutines e channels. Este tutorial explora patterns avançados que você encontrará em aplicações Go de produção.

Pré-requisitos: Conhecimento básico de Go (variáveis, funções, structs) e familiaridade com goroutines e channels. Se você é novo em Go, confira nosso guia para iniciantes.

Sumário dos Patterns

PatternUse CaseComplexidade
Worker PoolProcessar tarefas em paralelo com controle⭐⭐
Fan-Out/Fan-InDistribuir trabalho e agregar resultados⭐⭐⭐
PipelineProcessar dados em estágios sequenciais⭐⭐
Rate LimitingControlar throughput de requisições⭐⭐⭐
Context CancellationCancelar operações de forma segura⭐⭐
Select StatementMultiplexar channels⭐⭐⭐

1. Worker Pool Pattern

O pattern Worker Pool é essencial quando você precisa processar um grande número de tarefas de forma concorrente, mas quer limitar o número de goroutines simultâneas.

Problema

Imagine que você precisa processar 10.000 arquivos. Iniciar 10.000 goroutines simultâneas esgotaria recursos rapidamente.

Solução: Worker Pool

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// Job representa uma unidade de trabalho
type Job struct {
    ID   int
    Data string
}

// Result representa o resultado do processamento
type Result struct {
    JobID int
    Value int
    Error error
}

// Worker processa jobs do canal
type Worker struct {
    ID         int
    jobs       <-chan Job
    results    chan<- Result
    wg         *sync.WaitGroup
}

func (w *Worker) Start(ctx context.Context) {
    defer w.wg.Done()
    for {
        select {
        case job, ok := <-w.jobs:
            if !ok {
                fmt.Printf("Worker %d: canal fechado, encerrando\n", w.ID)
                return
            }
            w.processJob(job)
        case <-ctx.Done():
            fmt.Printf("Worker %d: contexto cancelado, encerrando\n", w.ID)
            return
        }
    }
}

func (w *Worker) processJob(job Job) {
    // Simula processamento
    time.Sleep(100 * time.Millisecond)
    
    result := Result{
        JobID: job.ID,
        Value: len(job.Data) * 10,
    }
    
    fmt.Printf("Worker %d: processou Job %d\n", w.ID, job.ID)
    w.results <- result
}

// Pool gerencia os workers
type Pool struct {
    numWorkers int
    jobs       chan Job
    results    chan Result
    workers    []*Worker
    wg         sync.WaitGroup
    ctx        context.Context
    cancel     context.CancelFunc
}

func NewPool(numWorkers int) *Pool {
    ctx, cancel := context.WithCancel(context.Background())
    return &Pool{
        numWorkers: numWorkers,
        jobs:       make(chan Job, 100),
        results:    make(chan Result, 100),
        ctx:        ctx,
        cancel:     cancel,
    }
}

func (p *Pool) Start() {
    for i := 1; i <= p.numWorkers; i++ {
        worker := &Worker{
            ID:      i,
            jobs:    p.jobs,
            results: p.results,
            wg:      &p.wg,
        }
        p.workers = append(p.workers, worker)
        p.wg.Add(1)
        go worker.Start(p.ctx)
    }
}

func (p *Pool) Submit(job Job) {
    p.jobs <- job
}

func (p *Pool) Results() <-chan Result {
    return p.results
}

func (p *Pool) Shutdown() {
    close(p.jobs)
    p.wg.Wait()
    close(p.results)
}

func main() {
    // Cria pool com 5 workers
    pool := NewPool(5)
    pool.Start()
    
    // Goroutine para coletar resultados
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        defer wg.Done()
        for result := range pool.Results() {
            fmt.Printf("Resultado recebido: Job %d = %d\n", 
                result.JobID, result.Value)
        }
    }()
    
    // Envia 15 jobs
    for i := 1; i <= 15; i++ {
        job := Job{
            ID:   i,
            Data: fmt.Sprintf("Dados do job %d", i),
        }
        pool.Submit(job)
    }
    
    // Aguarda processamento
    pool.Shutdown()
    wg.Wait()
    
    fmt.Println("\nPool encerrado com sucesso!")
}

Quando Usar Worker Pool

Use quando…Evite quando…
Processar muitas tarefas com paralelismo limitadoVocê tem poucas tarefas (overhead não vale a pena)
Precisar de controle de recursosCada tarefa depende fortemente das outras
Implementar APIs com rate limitingExistem requirements de ordem estrita

2. Fan-Out / Fan-In Pattern

Fan-Out distribui trabalho entre múltiplas goroutines. Fan-In combina os resultados de volta em um único canal.

Problema

Processar dados de múltiplas fontes simultaneamente e agregar resultados.

Implementação

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// Generator (Producer) - fan-out source
func Generator(id string, nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            fmt.Printf("%s gerando: %d\n", id, n)
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
            out <- n
        }
    }()
    return out
}

// Worker - processa dados (fan-out)
func Worker(id int, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            // Processamento pesado simulado
            result := n * n
            fmt.Printf("Worker %d: %d -> %d\n", id, n, result)
            time.Sleep(50 * time.Millisecond)
            out <- result
        }
    }()
    return out
}

// Merge - fan-in, combina múltiplos canais
func Merge(cs ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup
    
    // Função auxiliar para copiar de um canal
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            out <- n
        }
    }
    
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }
    
    // Goroutine para fechar out quando todos terminarem
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

// Demonstração do pattern
func main() {
    rand.Seed(time.Now().Unix())
    
    // === FAN-OUT ===
    // Múltiplas fontes gerando dados
    gen1 := Generator("Fonte-A", 1, 2, 3, 4, 5)
    gen2 := Generator("Fonte-B", 6, 7, 8, 9, 10)
    
    // Distribuir para múltiplos workers
    worker1 := Worker(1, gen1)
    worker2 := Worker(2, gen2)
    worker3 := Worker(3, gen1) // Reutiliza gen1 
    
    // === FAN-IN ===
    // Combinar resultados em um único canal
    merged := Merge(worker1, worker2, worker3)
    
    // Consumir resultados
    var sum int
    for result := range merged {
        sum += result
        fmt.Printf("Resultado recebido: %d (soma parcial: %d)\n", result, sum)
    }
    
    fmt.Printf("\nSoma total: %d\n", sum)
}

Aplicação Real: Web Crawler

// Crawler usa fan-out/fan-in para processar URLs
func Crawl(urls []string) []Page {
    var crawlers []<-chan Page
    
    // Fan-out: múltiplos crawlers
    for i := 0; i < numCrawlers; i++ {
        crawlers = append(crawlers, Crawler(urls[i::numCrawlers]))
    }
    
    // Fan-in: merge dos resultados
    pages := []Page{}
    for page := range Merge(crawlers...) {
        pages = append(pages, page)
    }
    return pages
}

3. Pipeline Pattern

Um Pipeline processa dados através de múltiplos estágios sequenciais, onde cada estágio pode executar concorrentemente.

Implementação de Pipeline

package main

import (
    "fmt"
    "strings"
)

// === ESTÁGIOS DO PIPELINE ===

// 1. Generator - produz dados
func Generator(files ...string) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        for _, file := range files {
            out <- file
        }
    }()
    return out
}

// 2. ReadFile - lê conteúdo
func ReadFile(in <-chan string) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        for filename := range in {
            // Simula leitura
            content := fmt.Sprintf("conteúdo do arquivo: %s", filename)
            out <- content
        }
    }()
    return out
}

// 3. ProcessContent - processa texto
func ProcessContent(in <-chan string) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        for content := range in {
            // Processamento: conta palavras
            words := strings.Fields(content)
            processed := fmt.Sprintf("%s | palavras: %d", 
                content, len(words))
            out <- processed
        }
    }()
    return out
}

// 4. SaveResults - salva resultados
func SaveResults(in <-chan string) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        for data := range in {
            // Simula salvamento
            saved := fmt.Sprintf("[SALVO] %s", data)
            out <- saved
        }
    }()
    return out
}

// Compose conecta estágios
func Compose(stages ...func(<-chan string) <-chan string) func(<-chan string) <-chan string {
    return func(source <-chan string) <-chan string {
        c := source
        for _, stage := range stages {
            c = stage(c)
        }
        return c
    }
}

func main() {
    // Compor pipeline
    pipeline := Compose(
        ReadFile,
        ProcessContent,
        SaveResults,
    )
    
    // Executar
    files := Generator("doc1.txt", "doc2.txt", "doc3.txt")
    results := pipeline(files)
    
    // Consumir
    for result := range results {
        fmt.Println(result)
    }
}

Pipeline Paralelo (Com Buffer)

Para melhorar throughput, adicionamos parallelism a estágios individuais:

// ParallelStage executa múltiplas instâncias de um estágio
func ParallelStage(workers int, stage func(<-chan string) <-chan string) 
    func(<-chan string) <-chan string {
    return func(in <-chan string) <-chan string {
        // Fan-out para múltiplos workers
        var workers_out []<-chan string
        for i := 0; i < workers; i++ {
            workers_out = append(workers_out, stage(in))
        }
        // Fan-in dos resultados
        return Merge(workers_out...)
    }
}

// Uso: paralelizar estágio de processamento
processParallel := ParallelStage(4, ProcessContent)

4. Rate Limiting com Channels

Controle de taxa é essencial para não sobrecarregar APIs ou serviços externos.

Rate Limiting Básico

package main

import (
    "context"
    "fmt"
    "time"
)

// RateLimiter controla requests por segundo
type RateLimiter struct {
    requests chan struct{}
    interval time.Duration
}

func NewRateLimiter(reqPerSec int) *RateLimiter {
    rl := &RateLimiter{
        requests: make(chan struct{}, reqPerSec),
        interval: time.Second / time.Duration(reqPerSec),
    }
    
    // Preenche o bucket
    for i := 0; i < reqPerSec; i++ {
        rl.requests <- struct{}{}
    }
    
    // Refill contínuo
    go rl.refill(reqPerSec)
    
    return rl
}

func (rl *RateLimiter) refill(reqPerSec int) {
    ticker := time.NewTicker(rl.interval)
    defer ticker.Stop()
    
    for range ticker.C {
        select {
        case rl.requests <- struct{}{}:
            // Adicionou token
        default:
            // Bucket cheio, ignora
        }
    }
}

func (rl *RateLimiter) Acquire(ctx context.Context) error {
    select {
    case <-rl.requests:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

// Uso
func main() {
    // Permite 5 requisições por segundo
    limiter := NewRateLimiter(5)
    ctx := context.Background()
    
    for i := 1; i <= 15; i++ {
        if err := limiter.Acquire(ctx); err != nil {
            fmt.Printf("Erro: %v\n", err)
            return
        }
        fmt.Printf("Request %d executado em %s\n", i, time.Now().Format("15:04:05.000"))
    }
}

Rate Limiting Avançado: Token Bucket

package main

import (
    "context"
    "sync"
    "time"
)

// TokenBucket implementa token bucket algorithm
type TokenBucket struct {
    capacity int
    tokens   int
    refillRate time.Duration
    mu       sync.Mutex
    lastRefill time.Time
}

func NewTokenBucket(capacity int, refillRate time.Duration) *TokenBucket {
    return &TokenBucket{
        capacity:   capacity,
        tokens:     capacity,
        refillRate: refillRate,
        lastRefill: time.Now(),
    }
}

func (tb *TokenBucket) refill() {
    now := time.Now()
    elapsed := now.Sub(tb.lastRefill)
    
    // Calcular tokens a adicionar
    tokensToAdd := int(elapsed / tb.refillRate)
    if tokensToAdd > 0 {
        tb.tokens = min(tb.capacity, tb.tokens + tokensToAdd)
        tb.lastRefill = now
    }
}

func (tb *TokenBucket) TryAcquire() bool {
    tb.mu.Lock()
    defer tb.mu.Unlock()
    
    tb.refill()
    
    if tb.tokens > 0 {
        tb.tokens--
        return true
    }
    return false
}

func (tb *TokenBucket) Acquire(ctx context.Context) error {
    for {
        if tb.TryAcquire() {
            return nil
        }
        
        select {
        case <-time.After(tb.refillRate):
            // Tenta novamente após refill
        case <-ctx.Done():
            return ctx.Err()
        }
    }
}

5. Context Package para Cancellation

O context package é essencial para gerenciar timeouts e cancellation em operações concorrentes.

Padrões de Uso

package main

import (
    "context"
    "fmt"
    "time"
)

// ProcessTask com timeout
type Task struct {
    ID   int
    Data string
}

func ProcessTask(ctx context.Context, task Task) (string, error) {
    // Cria context com timeout específico para esta tarefa
    ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
    defer cancel()
    
    result := make(chan string, 1)
    
    go func() {
        // Simula processamento pesado
        time.Sleep(time.Duration(100+task.ID*100) * time.Millisecond)
        result <- fmt.Sprintf("Task %d processada: %s", task.ID, task.Data)
    }()
    
    select {
    case res := <-result:
        return res, nil
    case <-ctx.Done():
        return "", fmt.Errorf("task %d cancelada: %w", task.ID, ctx.Err())
    }
}

// Worker Pool com Context
func WorkerPool(ctx context.Context, tasks []Task, workers int) []string {
    var wg sync.WaitGroup
    taskChan := make(chan Task, len(tasks))
    resultChan := make(chan string, len(tasks))
    
    // Start workers
    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for task := range taskChan {
                select {
                case <-ctx.Done():
                    fmt.Printf("Worker %d: cancelado\n", id)
                    return
                default:
                    result, err := ProcessTask(ctx, task)
                    if err != nil {
                        resultChan <- fmt.Sprintf("Erro: %v", err)
                    } else {
                        resultChan <- result
                    }
                }
            }
        }(i)
    }
    
    // Send tasks
    go func() {
        for _, task := range tasks {
            select {
            case taskChan <- task:
            case <-ctx.Done():
                close(taskChan)
                return
            }
        }
        close(taskChan)
    }()
    
    // Wait and collect
    go func() {
        wg.Wait()
        close(resultChan)
    }()
    
    var results []string
    for res := range resultChan {
        results = append(results, res)
    }
    
    return results
}

// Propagation de Context
func ProcessWithChildContext(parentCtx context.Context) {
    // Context com valor
    ctx := context.WithValue(parentCtx, "user_id", "12345")
    
    // Context com deadline
    ctx, cancel := context.WithDeadline(ctx, time.Now().Add(5*time.Second))
    defer cancel()
    
    // Passar para funções downstream
    ProcessTask(ctx, Task{ID: 1, Data: "teste"})
}

Graceful Shutdown

func GracefulServer() {
    ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
    defer stop()
    
    srv := &http.Server{Addr: ":8080"}
    
    go func() {
        if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
            log.Fatalf("Erro: %v\n", err)
        }
    }()
    
    fmt.Println("Servidor rodando. Pressione Ctrl+C para parar.")
    
    <-ctx.Done()
    fmt.Println("\nShutdown iniciado...")
    
    // Cleanup com timeout
    shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    
    if err := srv.Shutdown(shutdownCtx); err != nil {
        log.Printf("Erro no shutdown: %v", err)
    }
    
    fmt.Println("Servidor encerrado com sucesso")
}

6. Select Statement Avançado

O select statement permite esperar por múltiplas operações de channels.

Pattern: Non-Blocking Send/Receive

select {
case ch <- data:
    // Enviou com sucesso
default:
    // Channel cheio, não bloqueia
}

// Non-blocking receive
select {
case data := <-ch:
    // Recebeu dados
default:
    // Sem dados disponíveis
}

Pattern: Timeout

func WithTimeout() {
    ch := make(chan string)
    
    select {
    case result := <-ch:
        fmt.Println(result)
    case <-time.After(5 * time.Second):
        fmt.Println("Timeout!")
    }
}

Pattern: Priority Select

func PrioritySelect(hi, lo <-chan string) {
    for {
        select {
        case msg := <-hi:
            // Alta prioridade
            fmt.Printf("HI: %s\n", msg)
        default:
            select {
            case msg := <-hi:
                fmt.Printf("HI: %s\n", msg)
            case msg := <-lo:
                fmt.Printf("LO: %s\n", msg)
            }
        }
    }
}

Pattern: Sensor/Aggregator

func Aggregator(sensors ...<-chan Reading) <-chan Alert {
    alerts := make(chan Alert)
    
    go func() {
        defer close(alerts)
        
        // Timeout entre leituras
        timeout := time.NewTimer(1 * time.Minute)
        defer timeout.Stop()
        
        for {
            select {
            case reading := <-sensors[0]:
                if reading.Temperature > 100 {
                    alerts <- Alert{Sensor: 0, Level: Critical}
                }
                timeout.Reset(1 * time.Minute)
                
            case reading := <-sensors[1]:
                if reading.Pressure < 10 {
                    alerts <- Alert{Sensor: 1, Level: Warning}
                }
                timeout.Reset(1 * time.Minute)
                
            case <-timeout.C:
                alerts <- Alert{Level: Timeout, Message: "Sem dados"}
                return
            }
        }
    }()
    
    return alerts
}

Comparação com Outras Linguagens

LanguageConcurrency ModelPrimitivasNota
GoCSP (Communicating Sequential Processes)Goroutines, ChannelsSimples, built-in
Rustasync/await + ownershipTokio, async-stdSeguro em compile-time
JavaThreads + ExecutorsCompletableFuture, StreamsMais verboso
Pythonasync/await (single-threaded)asyncio, coroutinesGIL limita parallelism
Node.jsEvent loopPromises, async/awaitSingle-threaded

Vantagens de Go:

  • Goroutines são leves (2KB stack inicial)
  • Channels são type-safe
  • Garbage collector gerencia goroutines
  • Sem complexidade de ownership (vs Rust)

Melhores Práticas

✅ Faça

  1. Sempre use defer close(ch) quando apropriado
  2. Use context para timeout e cancellation
  3. Adicione buffer quando conhecer a carga
  4. Documente se um channel é usado para sinalização ou dados
  5. Use select para timeouts e multiplexação

❌ Evite

// ❌ Não: partilhar memória por comunicação
var counter int
var mu sync.Mutex

go func() {
    mu.Lock()
    counter++
    mu.Unlock()
}()

// ✅ Sim: comunicar por channels
updates := make(chan int)
go func() {
    updates <- 1
}()
counter += <-updates
// ❌ Não: deixar goroutines orfãs
go longRunningTask()
// Sem forma de cancelar!

// ✅ Sim: usar context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go longRunningTask(ctx)

Exercícios Práticos

1. Implemente um Throttler

Crie um rate limiter que permite “bursts” de requests.

Ver solução
type Throttler struct {
    maxBurst int
    tokens   chan struct{}
}

func NewThrottler(maxBurst int, refillRate time.Duration) *Throttler {
    t := &Throttler{
        maxBurst: maxBurst,
        tokens:   make(chan struct{}, maxBurst),
    }
    
    // Pre-fill
    for i := 0; i < maxBurst; i++ {
        t.tokens <- struct{}{}
    }
    
    go t.refill(refillRate)
    return t
}

2. Merge com Prioridade

Modifique o Merge para priorizar canais específicos.

3. Pipeline com Error Handling

Adicione tratamento de errors a cada estágio do pipeline.

Conclusão

Os patterns de concorrência de Go são poderosos e expressivos:

Worker Pools para controle de recursos
Fan-Out/Fan-In para processamento paralelo
Pipelines para fluxos de processamento
Rate Limiting para proteção de recursos
Context para cancellation e timeouts
Select para multiplexação elegante

Próximos Passos

Recursos


Tem dúvidas sobre concorrência em Go? Participe da nossa comunidade no Discord!