← Voltar para o blog

Padrões de Concorrência em Go: Worker Pools, Fan-Out e Pipelines

Domine padrões de concorrência em Go: worker pools, fan-out/fan-in, pipelines, rate limiting e context para cancelamento. Exemplos práticos e prontos para uso.

Go nasceu para concorrência. Goroutines leves, channels tipados e o modelo CSP (Communicating Sequential Processes) tornam Go uma das linguagens mais produtivas para escrever código concorrente. Mas goroutines e channels são primitivas — para resolver problemas reais, você precisa de padrões.

Neste guia, vamos cobrir os padrões de concorrência mais importantes em Go, com exemplos prontos para produção. Se você ainda não domina os fundamentos, comece pelo nosso guia de concorrência em Go.

Recap: Goroutines e Channels

Goroutines são funções executadas concorrentemente com custo mínimo (~2KB de stack inicial). Channels são o mecanismo de comunicação segura entre goroutines:

ch := make(chan string)    // channel unbuffered
ch := make(chan string, 10) // channel buffered (capacidade 10)

go func() {
    ch <- "resultado" // envia
}()

valor := <-ch // recebe

A regra de ouro: não comunique compartilhando memória; compartilhe memória comunicando.

O Padrão Pipeline

Um pipeline é uma série de estágios conectados por channels, onde cada estágio é um grupo de goroutines que recebe valores de um channel de entrada, processa e envia para um channel de saída.

// Estágio 1: gera números
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

// Estágio 2: eleva ao quadrado
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

// Estágio 3: filtra pares
func filterEven(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            if n%2 == 0 {
                out <- n
            }
        }
    }()
    return out
}

func main() {
    // Conecta os estágios
    nums := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    squared := square(nums)
    evens := filterEven(squared)

    for result := range evens {
        fmt.Println(result) // 4, 16, 36, 64, 100
    }
}

Cada estágio roda em sua própria goroutine, processando dados assim que chegam. Pipelines são a base de sistemas de streaming e ETL.

Fan-Out / Fan-In

Fan-out é quando múltiplas goroutines leem do mesmo channel. Fan-in é quando múltiplos channels são combinados em um só. Juntos, permitem paralelismo real:

// Fan-out: múltiplos workers processando do mesmo channel
func fanOut(in <-chan int, workers int) []<-chan int {
    channels := make([]<-chan int, workers)
    for i := 0; i < workers; i++ {
        channels[i] = processWorker(in)
    }
    return channels
}

func processWorker(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- heavyComputation(n)
        }
    }()
    return out
}

// Fan-in: merge múltiplos channels em um
func fanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)

    output := func(ch <-chan int) {
        defer wg.Done()
        for val := range ch {
            merged <- val
        }
    }

    wg.Add(len(channels))
    for _, ch := range channels {
        go output(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

func main() {
    jobs := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    workers := fanOut(jobs, 4)     // 4 workers paralelos
    results := fanIn(workers...)   // merge dos resultados

    for r := range results {
        fmt.Println(r)
    }
}

Fan-out/fan-in é ideal quando você tem tarefas CPU-bound ou I/O-bound que podem ser processadas independentemente.

Worker Pool

O worker pool é talvez o padrão mais usado em produção. Um número fixo de goroutines processa jobs de uma fila:

type Job struct {
    ID  int
    URL string
}

type Result struct {
    Job    Job
    Status int
    Err    error
}

func workerPool(jobs <-chan Job, results chan<- Result, numWorkers int) {
    var wg sync.WaitGroup

    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            for job := range jobs {
                resp, err := http.Get(job.URL)
                status := 0
                if resp != nil {
                    status = resp.StatusCode
                    resp.Body.Close()
                }
                results <- Result{
                    Job:    job,
                    Status: status,
                    Err:    err,
                }
            }
        }(i)
    }

    go func() {
        wg.Wait()
        close(results)
    }()
}

func main() {
    urls := []string{
        "https://golang.com.br",
        "https://go.dev",
        "https://pkg.go.dev",
        // ... mais URLs
    }

    jobs := make(chan Job, len(urls))
    results := make(chan Result, len(urls))

    // Inicia pool com 5 workers
    workerPool(jobs, results, 5)

    // Envia jobs
    for i, url := range urls {
        jobs <- Job{ID: i, URL: url}
    }
    close(jobs)

    // Coleta resultados
    for r := range results {
        if r.Err != nil {
            fmt.Printf("Job %d falhou: %v\n", r.Job.ID, r.Err)
        } else {
            fmt.Printf("Job %d: %s -> %d\n", r.Job.ID, r.Job.URL, r.Status)
        }
    }
}

Para projetos em produção que usam worker pools com HTTP, veja como estruturar melhor com APIs REST em Go e microsserviços.

Rate Limiting com time.Ticker

Controlar a taxa de requisições é essencial para não sobrecarregar APIs externas:

func rateLimitedWorker(jobs <-chan Job, results chan<- Result, rps int) {
    ticker := time.NewTicker(time.Second / time.Duration(rps))
    defer ticker.Stop()

    for job := range jobs {
        <-ticker.C // espera o tick antes de processar
        resp, err := http.Get(job.URL)
        status := 0
        if resp != nil {
            status = resp.StatusCode
            resp.Body.Close()
        }
        results <- Result{Job: job, Status: status, Err: err}
    }
}

O Padrão Generator

Generators produzem valores sob demanda, encapsulando a lógica de produção em uma goroutine. Desde o Go 1.23, os iteradores nativos oferecem uma alternativa para muitos cenários de generator, mas o padrão com channels ainda é útil quando você precisa de processamento concorrente real:

func fibonacci(ctx context.Context) <-chan int {
    ch := make(chan int)
    go func() {
        defer close(ch)
        a, b := 0, 1
        for {
            select {
            case <-ctx.Done():
                return
            case ch <- a:
                a, b = b, a+b
            }
        }
    }()
    return ch
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()

    for num := range fibonacci(ctx) {
        fmt.Println(num)
    }
}

Context para Cancelamento em Pipelines

O pacote context é fundamental para controlar o ciclo de vida de goroutines em pipelines. Sem cancelamento adequado, você cria goroutine leaks:

func processWithContext(ctx context.Context, in <-chan string) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        for {
            select {
            case <-ctx.Done():
                return // cancelamento recebido
            case val, ok := <-in:
                if !ok {
                    return // channel fechado
                }
                result, err := expensiveOperation(val)
                if err != nil {
                    continue
                }
                select {
                case out <- result:
                case <-ctx.Done():
                    return
                }
            }
        }
    }()
    return out
}

sync.WaitGroup e sync.ErrGroup

sync.WaitGroup é a ferramenta padrão para esperar goroutines. Para cenários com tratamento de erros, errgroup.Group (do pacote golang.org/x/sync/errgroup) é ainda melhor:

import "golang.org/x/sync/errgroup"

func fetchAll(ctx context.Context, urls []string) ([]string, error) {
    g, ctx := errgroup.WithContext(ctx)
    results := make([]string, len(urls))

    for i, url := range urls {
        i, url := i, url // captura variáveis do loop
        g.Go(func() error {
            req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
            if err != nil {
                return fmt.Errorf("criando request para %s: %w", url, err)
            }

            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return fmt.Errorf("fetching %s: %w", url, err)
            }
            defer resp.Body.Close()

            body, err := io.ReadAll(resp.Body)
            if err != nil {
                return fmt.Errorf("lendo body de %s: %w", url, err)
            }

            results[i] = string(body)
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return nil, err
    }

    return results, nil
}

O errgroup cancela automaticamente o context quando qualquer goroutine retorna erro — evitando trabalho desnecessário.

Evitando Goroutine Leaks

Goroutine leaks são o “memory leak” do Go. Acontecem quando goroutines ficam bloqueadas indefinidamente:

// ERRADO: goroutine leak se ninguém ler do channel
func bad() <-chan int {
    ch := make(chan int)
    go func() {
        ch <- 42 // bloqueia para sempre se ninguém ler
    }()
    return ch
}

// CORRETO: use context para garantir cleanup
func good(ctx context.Context) <-chan int {
    ch := make(chan int)
    go func() {
        defer close(ch)
        select {
        case ch <- 42:
        case <-ctx.Done():
        }
    }()
    return ch
}

Regras para evitar leaks:

  1. Sempre feche channels do lado do produtor
  2. Sempre use context para cancelamento
  3. Sempre use select com ctx.Done() em goroutines de longa duração
  4. Use buffered channels quando o produtor não deve bloquear

Buffered vs Unbuffered Channels

TipoQuando Usar
Unbuffered (make(chan T))Sincronização estrita, handoff direto
Buffered (make(chan T, n))Absorver bursts, desacoplar produtor/consumidor

Buffered channels são úteis em worker pools para evitar que producers bloqueiem enquanto todos os workers estão ocupados. Mas cuidado: buffers grandes podem mascarar problemas de backpressure.

Benchmarking Código Concorrente

Use testing.B com b.RunParallel para benchmarks concorrentes:

func BenchmarkWorkerPool(b *testing.B) {
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            processItem(generateItem())
        }
    })
}

Para profiling detalhado, confira o artigo sobre GOMAXPROCS container-aware e como containers afetam a concorrência do Go.

Próximos Passos

Dominar padrões de concorrência é essencial para construir aplicações Go robustas e performáticas. Combine esses padrões com:

Para ver como outras linguagens abordam concorrência, compare com as coroutines do Kotlin, o modelo async/await do Rust ou o asyncio do Python.