← Voltar para o blog

Worker Pool em Go: Filas de Jobs na Prática

Aprenda worker pool em Go para filas de jobs: goroutines, channels, context, retries, backpressure, shutdown gracioso e quando usar RabbitMQ ou Kafka.

Worker pool em Go é um dos padrões mais úteis para transformar concorrência em trabalho controlado. Em vez de disparar uma goroutine para cada tarefa e torcer para o sistema aguentar, você define um número fixo de workers, envia jobs por um channel e controla consumo de CPU, memória, conexões e chamadas externas.

Esse padrão aparece em APIs que precisam processar imagens, enviar e-mails, recalcular relatórios, consumir webhooks, sincronizar dados, enriquecer cadastros, executar ETLs pequenos ou chamar serviços externos com limite de taxa. Também é uma ponte natural entre Go básico e sistemas de produção: você usa goroutines e channels, mas precisa pensar em cancelamento, retries, backpressure, observabilidade e shutdown gracioso.

Este guia mostra uma implementação prática de worker pool em Go, explica onde ela funciona bem, onde ela quebra e quando vale sair do channel local para RabbitMQ, Kafka, NATS, SQS ou outro broker. Se você ainda está montando a base, leia também o tutorial de concorrência em Go e o guia de API REST em Go.

O problema: goroutine infinita não é arquitetura

Go torna goroutines baratas, mas “barato” não significa “infinito”. O erro comum é escrever algo assim:

for _, item := range itens {
    go processar(item)
}

Para 100 itens, talvez funcione. Para 100 mil, você acabou de criar uma explosão de goroutines competindo por memória, conexões de banco, sockets HTTP e tempo de CPU. Se cada job chama uma API externa, você pode derrubar o fornecedor, tomar 429 ou travar sua própria fila de conexões.

Um worker pool resolve esse problema colocando um limite explícito de paralelismo. Em vez de cada item virar uma goroutine, os jobs entram em uma fila e um número fixo de workers consome essa fila.

Estrutura mínima de um worker pool

A versão mais simples usa três peças:

  • Um tipo Job, com os dados necessários para processar a tarefa.
  • Um channel jobs, usado como fila interna.
  • N workers, cada um rodando em uma goroutine e lendo do channel.
package main

import (
    "context"
    "fmt"
    "log/slog"
    "sync"
    "time"
)

type Job struct {
    ID      string
    Payload string
}

type WorkerPool struct {
    workers int
    jobs    chan Job
    logger  *slog.Logger
}

func NewWorkerPool(workers int, buffer int, logger *slog.Logger) *WorkerPool {
    return &WorkerPool{
        workers: workers,
        jobs:    make(chan Job, buffer),
        logger:  logger,
    }
}

func (p *WorkerPool) Start(ctx context.Context) *sync.WaitGroup {
    var wg sync.WaitGroup

    for i := 1; i <= p.workers; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            p.runWorker(ctx, workerID)
        }(i)
    }

    return &wg
}

func (p *WorkerPool) runWorker(ctx context.Context, workerID int) {
    for {
        select {
        case <-ctx.Done():
            p.logger.Info("worker encerrado por contexto", slog.Int("worker_id", workerID))
            return
        case job, ok := <-p.jobs:
            if !ok {
                p.logger.Info("worker encerrado: fila fechada", slog.Int("worker_id", workerID))
                return
            }

            started := time.Now()
            if err := processar(ctx, job); err != nil {
                p.logger.Error("falha ao processar job",
                    slog.Int("worker_id", workerID),
                    slog.String("job_id", job.ID),
                    slog.Duration("duration", time.Since(started)),
                    slog.Any("err", err),
                )
                continue
            }

            p.logger.Info("job processado",
                slog.Int("worker_id", workerID),
                slog.String("job_id", job.ID),
                slog.Duration("duration", time.Since(started)),
            )
        }
    }
}

func (p *WorkerPool) Enqueue(ctx context.Context, job Job) error {
    select {
    case <-ctx.Done():
        return ctx.Err()
    case p.jobs <- job:
        return nil
    }
}

func (p *WorkerPool) Close() {
    close(p.jobs)
}

func processar(ctx context.Context, job Job) error {
    select {
    case <-ctx.Done():
        return ctx.Err()
    case <-time.After(150 * time.Millisecond):
        fmt.Println("processado", job.ID)
        return nil
    }
}

Esse código já cobre o essencial: workers fixos, fila com buffer, cancelamento via context.Context, logging estruturado com slog e encerramento quando o channel fecha. Para logs em produção, combine com o guia de slog em Go.

Como usar no main

O main deve iniciar o pool, enviar jobs e esperar os workers terminarem. Em serviço real, o ctx viria de sinal do sistema (SIGTERM) ou do ciclo de vida do servidor HTTP.

func main() {
    logger := slog.Default()
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    pool := NewWorkerPool(5, 100, logger)
    wg := pool.Start(ctx)

    for i := 1; i <= 20; i++ {
        job := Job{
            ID:      fmt.Sprintf("job-%02d", i),
            Payload: "dados",
        }

        if err := pool.Enqueue(ctx, job); err != nil {
            logger.Error("erro ao enfileirar job", slog.Any("err", err))
        }
    }

    pool.Close()
    wg.Wait()
}

O número de workers não deve ser escolhido no chute. Para tarefas CPU-bound, comece perto de runtime.NumCPU(). Para tarefas I/O-bound, como HTTP e banco, você pode usar mais workers, mas o limite real costuma ser conexão, latência, rate limit e orçamento do serviço externo.

Backpressure: o detalhe que salva produção

O buffer do channel é uma forma simples de backpressure. Se a fila está cheia, Enqueue bloqueia até haver espaço ou até o contexto ser cancelado. Isso é bom: o sistema está dizendo que não consegue absorver mais trabalho naquela velocidade.

Em uma API HTTP, você talvez não queira bloquear indefinidamente. Uma variação comum é usar timeout curto:

func (p *WorkerPool) EnqueueWithTimeout(job Job, timeout time.Duration) error {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()

    select {
    case p.jobs <- job:
        return nil
    case <-ctx.Done():
        return fmt.Errorf("fila cheia: %w", ctx.Err())
    }
}

Com isso, a API pode responder 503 Service Unavailable ou 429 Too Many Requests em vez de aceitar trabalho que não vai processar a tempo. Essa decisão é importante para sistemas de backend Go, especialmente quando há SLA, fila de e-mail, pagamento ou webhook envolvido.

Retries sem virar loop infinito

Retries são necessários, mas perigosos. Se todo erro gera retry imediato, uma instabilidade externa vira tempestade. O mínimo saudável é limitar tentativas, aplicar backoff e separar erro temporário de erro permanente.

type Job struct {
    ID       string
    Payload  string
    Attempts int
}

func retryDelay(attempt int) time.Duration {
    switch attempt {
    case 0:
        return 500 * time.Millisecond
    case 1:
        return 2 * time.Second
    default:
        return 10 * time.Second
    }
}

Em worker pool local, retry com time.Sleep dentro do worker é simples, mas reduz throughput porque o worker fica parado. Em produção pesada, prefira reenfileirar com atraso em um broker que suporte delay, dead-letter queue ou agendamento. RabbitMQ com DLX, SQS com visibility timeout e NATS JetStream são exemplos comuns.

Quando usar channel local e quando usar broker

Channel local é ótimo quando:

  • O trabalho pode ser perdido se o processo reiniciar.
  • A fila é curta e vive dentro de uma única instância.
  • Você quer limitar paralelismo dentro de uma requisição, CLI ou worker único.
  • A tarefa é derivada de outra fonte durável, como banco ou arquivo.

Use broker quando:

  • O job não pode sumir em deploy, crash ou restart.
  • Há várias instâncias consumindo a mesma fila.
  • Você precisa de retry durável, dead-letter queue ou auditoria.
  • O produtor e o consumidor são serviços diferentes.
  • O volume exige particionamento, replay ou retenção.

No ecossistema Go, RabbitMQ costuma ser direto para filas clássicas de trabalho. Veja o tutorial de Go com RabbitMQ para um caminho prático. Kafka faz mais sentido quando o problema é streaming, retenção e consumo por múltiplos grupos; para isso, veja Go com Kafka.

Observabilidade: métricas que importam

Um worker pool sem métrica vira caixa-preta. No mínimo, acompanhe:

  • Tamanho atual da fila.
  • Jobs processados com sucesso.
  • Jobs com erro, por tipo.
  • Duração de processamento.
  • Número de retries.
  • Jobs descartados por timeout ou fila cheia.
  • Tempo de shutdown.

Se o site ou API já usa Prometheus, exporte contadores e histogramas. O tutorial de Go com Prometheus mostra como instrumentar handlers; o mesmo raciocínio vale para workers. Para investigação de gargalos, combine logs estruturados, métricas e profiling com pprof em Go.

Shutdown gracioso em deploy

O erro clássico em deploy é receber SIGTERM, matar o processo e perder jobs em andamento. Em Kubernetes, systemd ou Cloud Run, você normalmente recebe alguns segundos para encerrar. Use esse tempo para parar de aceitar jobs novos, fechar a fila e esperar os workers terminarem.

pool.Close()

done := make(chan struct{})
go func() {
    wg.Wait()
    close(done)
}()

select {
case <-done:
    logger.Info("todos os workers finalizaram")
case <-time.After(20 * time.Second):
    logger.Warn("shutdown excedeu timeout")
}

Se o job é durável em broker, o worker pode simplesmente não confirmar a mensagem quando o contexto cancela; o broker entrega de novo depois. Se o job está apenas em memory channel, fechar o processo perde o que ainda não foi processado. Essa diferença deve orientar a arquitetura.

Erros comuns em worker pools Go

Os bugs mais frequentes são previsíveis:

  1. Fechar channel do lado errado: quem produz e controla o ciclo de vida fecha. Worker consumidor não deve fechar a fila compartilhada.
  2. Ignorar context.Context: sem cancelamento, shutdown trava e deploy fica lento.
  3. Criar worker demais: mais workers podem piorar latência por saturar banco, CPU ou rede.
  4. Não medir fila: se você não sabe o backlog, não sabe se o sistema está saudável.
  5. Retry infinito: erro permanente precisa morrer em dead-letter, não rodar para sempre.
  6. Misturar regra de negócio com infraestrutura: worker deve orquestrar; processamento deve ficar em função testável.

Esses erros também aparecem em entrevistas. Se você está se preparando para vagas, worker pools são um ótimo tema para explicar concorrência, trade-offs e produção. Depois deste guia, revise 50 perguntas de entrevista Go e acompanhe vagas Go no Brasil. Para quem ainda está entrando no mercado tech e quer comparar oportunidades de estágio ou júnior em outras stacks, o portal eu.dev.br reúne vagas brasileiras de entrada em tecnologia.

Conclusão

Worker pool é simples no código e profundo na operação. A ideia básica — N workers lendo de um channel — resolve o excesso de goroutines e dá controle de paralelismo. O que separa exemplo de produção é o restante: contexto, backpressure, retry limitado, logs, métricas, shutdown e clareza sobre durabilidade.

Use channel local quando o trabalho é efêmero ou derivado de outra fonte. Use broker quando o job precisa sobreviver a restart, escalar entre instâncias ou carregar histórico. Em ambos os casos, Go é uma excelente escolha porque goroutines, channels e context.Context tornam o modelo direto, legível e eficiente.