← Voltar para o blog

Redis Streams em Go: Filas e Consumer Groups

Aprenda Redis Streams em Go para filas internas: XADD, consumer groups, ACK, retries, pending messages, idempotência e cuidados de produção.

Redis Streams em Go é uma opção prática quando você precisa de uma fila interna, já usa Redis na stack e quer algo mais estruturado que uma lista com LPUSH/BRPOP. Com Streams, cada mensagem recebe um ID ordenado, pode ser consumida por grupos, reconhecida com ACK, reprocessada quando um worker cai e inspecionada durante incidentes. Isso aproxima Redis de um broker leve, sem exigir Kafka, RabbitMQ ou SQS logo no primeiro dia.

O cuidado é não tratar Redis Streams como mágica. Redis continua sendo um banco em memória com persistência configurável, política de eviction, limites de RAM e comportamento operacional próprio. Para jobs internos, integrações de baixo a médio volume, pipelines de produto e filas de background, ele pode ser excelente. Para eventos financeiros críticos, contratos entre muitos times ou retenção longa para replay, talvez você precise de outro broker ou de uma combinação com outbox pattern em Go.

Este guia mostra como usar Redis Streams em Go com XADD, XREADGROUP, consumer groups, ACK, retries, mensagens pendentes, idempotência e observabilidade. Ele complementa os artigos de worker pool em Go, mensageria em Go e idempotência, retry e DLQ em Go.

Quando Redis Streams faz sentido

Redis Streams costuma funcionar bem quando a fila pertence ao mesmo produto e o time já opera Redis com maturidade. Exemplos comuns:

  • Processar e-mails, notificações, webhooks recebidos ou tarefas de baixa latência.
  • Distribuir jobs entre múltiplos workers sem criar um broker novo.
  • Manter histórico curto de eventos para diagnóstico.
  • Reprocessar mensagens que ficaram pendentes porque um worker caiu.
  • Evoluir uma fila simples antes de migrar para RabbitMQ, NATS, SQS ou Kafka.

O ponto forte é a simplicidade operacional em stacks pequenas. Você não precisa provisionar outro serviço, aprender uma topologia complexa ou explicar ao time por que um broker pesado entrou no sistema. O ponto fraco é justamente esse: por ser fácil começar, muita gente esquece de definir retenção, memória, persistência, monitoramento e idempotência.

Se a mensagem não pode ser perdida em hipótese alguma, faça uma análise honesta. Redis com AOF, replica, backup e configuração correta pode ser confiável para muitos cenários, mas não substitui automaticamente um log durável como Kafka nem um serviço gerenciado com garantias específicas como SQS. A decisão deve considerar impacto de negócio, volume, tempo de retenção, custo de operação e experiência do time.

Conceitos básicos: stream, entry e grupo

Um stream é uma sequência ordenada de entradas. Cada entrada tem um ID, normalmente gerado pelo Redis, e um conjunto de campos. Ao inserir uma mensagem com XADD, você cria algo como:

1700000000000-0 order_id=123 event=paid amount=19990

O ID combina timestamp e sequência. Isso ajuda na ordenação e permite ler a partir de um ponto específico. Para consumo simples, um processo pode usar XREAD. Para fila com múltiplos workers, use consumer groups com XREADGROUP.

Consumer group é o recurso que permite dividir trabalho. O grupo registra quais mensagens já foram entregues, quais estão pendentes e quais foram reconhecidas. Cada worker se identifica como um consumidor dentro do grupo. Quando uma mensagem é processada com sucesso, o worker chama XACK. Se o worker morre antes do ACK, a mensagem fica no pending entries list e pode ser recuperada depois.

Essa semântica é parecida com entrega “ao menos uma vez”. Ela reduz perda, mas abre espaço para duplicidade. Por isso, todo handler sério precisa ser idempotente.

Publicando mensagens com go-redis

O pacote github.com/redis/go-redis/v9 é uma escolha comum para Redis moderno em Go. Um publisher básico pode ficar assim:

package jobs

import (
    "context"
    "time"

    "github.com/redis/go-redis/v9"
)

type Publisher struct {
    Redis  *redis.Client
    Stream string
}

func (p Publisher) PublishEmail(ctx context.Context, userID, template string) (string, error) {
    return p.Redis.XAdd(ctx, &redis.XAddArgs{
        Stream: p.Stream,
        MaxLen: 100_000,
        Approx: true,
        Values: map[string]any{
            "type":       "send_email",
            "user_id":    userID,
            "template":   template,
            "created_at": time.Now().UTC().Format(time.RFC3339),
        },
    }).Result()
}

O MaxLen com Approx: true evita crescimento infinito. Sem limite, um stream pode virar um vazamento de memória disfarçado. O número certo depende do volume, da janela de replay desejada e da capacidade do Redis. Comece com retenção explícita e revise com métricas reais.

Também vale incluir campos de negócio que ajudem na idempotência: event_id, order_id, webhook_id, job_id ou outro identificador único. O ID do Redis é útil para fila, mas geralmente não é o melhor identificador de efeito no banco.

Criando o consumer group

Antes de consumir, crie o grupo. Em deploys reais, esse passo pode rodar no bootstrap do worker com tratamento para o erro de grupo existente:

func EnsureGroup(ctx context.Context, rdb *redis.Client, stream, group string) error {
    err := rdb.XGroupCreateMkStream(ctx, stream, group, "0").Err()
    if err != nil && !strings.Contains(err.Error(), "BUSYGROUP") {
        return err
    }
    return nil
}

O MkStream cria o stream se ele ainda não existir. O ID 0 faz o grupo considerar mensagens antigas desde o começo. Para grupos que devem consumir apenas mensagens novas, use $. Em produção, pense nisso antes do deploy: começar em 0 pode disparar reprocessamento grande; começar em $ pode ignorar mensagens publicadas antes do worker subir.

Consumindo com XREADGROUP

Um worker pode ficar em loop, lendo mensagens novas com >:

func Consume(ctx context.Context, rdb *redis.Client, stream, group, consumer string) error {
    for {
        res, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
            Group:    group,
            Consumer: consumer,
            Streams:  []string{stream, ">"},
            Count:    10,
            Block:    5 * time.Second,
        }).Result()
        if err == redis.Nil {
            continue
        }
        if err != nil {
            return err
        }

        for _, s := range res {
            for _, msg := range s.Messages {
                if err := handle(ctx, msg.Values); err != nil {
                    // Sem ACK: a mensagem fica pendente para retry posterior.
                    continue
                }
                if err := rdb.XAck(ctx, stream, group, msg.ID).Err(); err != nil {
                    return err
                }
            }
        }
    }
}

Esse exemplo é propositalmente simples. Em produção, registre logs estruturados, métricas por tipo de job, timeout por mensagem e tratamento de shutdown gracioso. Se o processo receber SIGTERM, pare de buscar mensagens novas, termine as mensagens em andamento dentro de um prazo e só então encerre. O guia de graceful shutdown em Go cobre essa base.

Não dê ACK antes do efeito principal terminar. Se você reconhece a mensagem e depois falha ao gravar no banco, perdeu o job. Também não faça o contrário sem idempotência: se grava no banco e cai antes do ACK, a mensagem volta e o efeito pode duplicar. A solução é guardar uma chave idempotente no banco ou no próprio Redis com TTL coerente.

Reprocessando mensagens pendentes

Quando um worker recebe mensagem e não dá ACK, ela entra na lista de pendentes do grupo. Isso acontece por bug, crash, deploy, timeout ou queda de rede. Redis oferece comandos como XPENDING, XCLAIM e XAUTOCLAIM para encontrar e assumir mensagens antigas.

Uma rotina de recovery pode procurar mensagens paradas há mais de alguns minutos e transferi-las para o consumidor atual:

res, err := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{
    Stream:   stream,
    Group:    group,
    Consumer: consumer,
    MinIdle:  10 * time.Minute,
    Start:    "0-0",
    Count:    20,
}).Result()

Depois de assumir a mensagem, processe com a mesma função idempotente e dê ACK se concluir. Não transforme recovery em loop infinito. Registre quantas vezes a mensagem foi entregue, quando foi criada e por que falhou. Quando um job excede tentativas razoáveis, mova para uma stream de erro, registre o motivo e alerte alguém.

Redis Streams não tem DLQ automática igual a algumas configurações de broker. Você precisa implementar a política: por exemplo, jobs:email:failed para mensagens que passaram de cinco tentativas. Isso ajuda suporte e engenharia a distinguir atraso temporário de payload inválido.

Idempotência: o detalhe que protege produção

Com filas “ao menos uma vez”, duplicidade é normal. O handler deve poder receber a mesma mensagem duas vezes sem cobrar duas vezes, enviar dois e-mails críticos ou criar dois registros incompatíveis.

Uma estratégia comum é gravar uma tabela de processamento:

CREATE TABLE processed_jobs (
    job_key TEXT PRIMARY KEY,
    processed_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

Antes do efeito principal, tente inserir job_key. Se a inserção falhar por chave duplicada, o job já foi processado e você pode dar ACK. Para alguns casos, o próprio efeito já é idempotente: atualizar status para paid, recalcular agregados ou fazer upsert por chave natural. O importante é ser intencional.

Evite usar apenas lock em memória. Um lock local não protege múltiplos pods, deploys ou crashes. Redis SET NX pode ajudar para deduplicação curta, mas bancos transacionais continuam sendo melhores quando o efeito de negócio também está no banco.

Observabilidade e operação

Uma fila sem métrica vira caixa-preta. No mínimo, acompanhe:

  • Tamanho do stream.
  • Quantidade de mensagens pendentes por grupo.
  • Idade da mensagem mais antiga pendente.
  • Taxa de publicação, processamento, erro e retry.
  • Tempo médio e p95 de processamento por tipo de job.
  • Uso de memória do Redis e política de eviction.

Essas métricas indicam backlog, worker travado, payload problemático e limite de capacidade. Combine com logs estruturados usando slog em Go e traces quando o job chama APIs externas. Se a fila participa de uma API, use também OpenTelemetry em Go para entender o caminho completo.

Também documente o runbook: como pausar publishers, aumentar workers, reprocessar pendentes, mover para stream de falha, limpar retenção e restaurar Redis. Em times brasileiros pequenos, esse runbook muitas vezes vale mais que trocar de ferramenta.

Redis Streams, RabbitMQ, Kafka ou SQS?

Use Redis Streams quando a fila é interna, o volume é moderado, a retenção é curta e o time já opera Redis bem. Use RabbitMQ quando você precisa de filas clássicas, routing e DLQ com operação dedicada. Use Kafka quando o evento precisa de retenção longa, replay e múltiplos consumidores independentes. Use SQS quando você quer fila gerenciada na AWS com baixa operação. NATS JetStream pode ser uma boa alternativa quando simplicidade, baixa latência e ecossistema cloud native pesam mais.

Para uma comparação ampla, veja mensageria em Go: RabbitMQ, Kafka, NATS, SQS ou Redis Streams. Para quem também trabalha com Python, o guia de Celery em Python mostra outra forma comum de organizar workers, retries e filas em aplicações brasileiras.

Checklist para produção

Antes de colocar Redis Streams em produção com Go, revise:

  • O stream tem retenção (MAXLEN) ou política de trimming clara.
  • Redis está configurado com persistência, backup e memória compatíveis com a criticidade.
  • Cada mensagem tem uma chave idempotente de negócio.
  • Workers dão ACK apenas depois do efeito principal.
  • Existe rotina para pendentes antigos com limite de tentativas.
  • Mensagens inválidas vão para uma stream de falha ou processo equivalente.
  • Métricas expõem backlog, pendentes, idade, erros e throughput.
  • Deploy e shutdown não interrompem jobs no meio sem recuperação.

Redis Streams não é a resposta para toda mensageria em Go, mas é uma ferramenta útil quando usada com limites claros. O melhor uso é começar simples sem fingir que simplicidade elimina operação. Se a fila move trabalho real, trate idempotência, retenção, recovery e observabilidade como parte do design, não como ajustes depois do primeiro incidente.