Go e RabbitMQ: Mensageria Assíncrona Completa

Introdução

RabbitMQ é um dos brokers de mensagens mais populares do mundo, usado por empresas como Uber, Reddit e Stripe para processar bilhões de mensagens diariamente. Quando combinado com Go, criamos sistemas altamente performáticos, confiáveis e escaláveis.

Neste guia completo, você vai aprender desde os conceitos fundamentais até padrões avançados de mensageria com Go e RabbitMQ.

O que é RabbitMQ?

RabbitMQ é um message broker (corretor de mensagens) de código aberto que implementa o protocolo AMQP (Advanced Message Queuing Protocol). Ele atua como um intermediário entre aplicações, permitindo comunicação assíncrona e desacoplada.

Por que usar RabbitMQ com Go?

  • Alta performance: Go + RabbitMQ processa milhares de mensagens/segundo
  • Confiabilidade: Garantia de entrega, persistência e confirmações
  • Escalabilidade: Distribua carga entre múltiplos consumers
  • Desacoplamento: Produtores não precisam saber sobre consumidores
  • Resiliência: Filas persistem mesmo durante falhas

Instalação e Configuração

Instalando RabbitMQ

Docker (Recomendado para desenvolvimento)

docker run -d \
  --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=admin123 \
  rabbitmq:3-management

Acesse o painel de administração em http://localhost:15672 (admin/admin123).

Instalação Local (Ubuntu/Debian)

# Adicionar repositório
sudo apt-get install -y curl gnupg
curl -fsSL https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey | sudo gpg --dearmor -o /usr/share/keyrings/rabbitmq-archive-keyring.gpg

# Instalar
echo "deb [signed-by=/usr/share/keyrings/rabbitmq-archive-keyring.gpg] https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu/ $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
sudo apt-get update
sudo apt-get install -y rabbitmq-server

# Iniciar serviço
sudo systemctl start rabbitmq-server
sudo rabbitmq-plugins enable rabbitmq_management

Instalando a Biblioteca Go AMQP

go get github.com/rabbitmq/amqp091-go

Conceitos Fundamentais

Antes de codificar, entenda estes conceitos essenciais:

ConceitoDescrição
ProducerAplicação que envia mensagens
ConsumerAplicação que recebe e processa mensagens
QueueFila que armazena mensagens
ExchangeRoteia mensagens para filas
BindingConexão entre exchange e queue
Routing KeyChave usada para rotear mensagens

Tipos de Exchange

┌─────────────┐     ┌──────────┐     ┌─────────┐
│  Producer   │────▶│ Exchange │────▶│  Queue  │
└─────────────┘     └──────────┘     └────┬────┘
                                    ┌───────────┐
                                    │  Consumer │
                                    └───────────┘
  1. Direct: Roteia por routing key exata
  2. Fanout: Broadcast para todas as filas ligadas
  3. Topic: Roteia por padrões de routing key (logs.error.*)
  4. Headers: Roteia por headers da mensagem

Conectando ao RabbitMQ

package main

import (
    "fmt"
    "log"
    "time"

    amqp "github.com/rabbitmq/amqp091-go"
)

func connectRabbitMQ() (*amqp.Connection, error) {
    // String de conexão AMQP
    // amqp://usuario:senha@host:porta/vhost
    conn, err := amqp.Dial("amqp://admin:admin123@localhost:5672/")
    if err != nil {
        return nil, fmt.Errorf("falha ao conectar ao RabbitMQ: %w", err)
    }
    return conn, nil
}

func main() {
    conn, err := connectRabbitMQ()
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    fmt.Println("✅ Conectado ao RabbitMQ com sucesso!")
}

Conexão com Retry

Em produção, sempre implemente retry para lidar com falhas temporárias:

func connectWithRetry(url string, maxRetries int) (*amqp.Connection, error) {
    var conn *amqp.Connection
    var err error

    for i := 0; i < maxRetries; i++ {
        conn, err = amqp.Dial(url)
        if err == nil {
            return conn, nil
        }

        log.Printf("Tentativa %d/%d falhou: %v. Retrying em 2s...", i+1, maxRetries, err)
        time.Sleep(2 * time.Second)
    }

    return nil, fmt.Errorf("falha após %d tentativas: %w", maxRetries, err)
}

Publicando Mensagens

Producer Simples

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
    conn, err := amqp.Dial("amqp://admin:admin123@localhost:5672/")
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatal(err)
    }
    defer ch.Close()

    // Declarar fila (cria se não existir)
    q, err := ch.QueueDeclare(
        "hello",    // nome
        false,      // durable
        false,      // delete when unused
        false,      // exclusive
        false,      // no-wait
        nil,        // arguments
    )
    if err != nil {
        log.Fatal(err)
    }

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    body := "Olá, RabbitMQ!"
    err = ch.PublishWithContext(
        ctx,
        "",         // exchange
        q.Name,     // routing key
        false,      // mandatory
        false,      // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
        },
    )
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("✅ Mensagem enviada: %s\n", body)
}

Producer com Confirmações (Confirms)

Para garantir que mensagens chegaram ao broker:

func publishWithConfirm(ch *amqp.Channel, queueName string, body []byte) error {
    // Habilitar confirmações de publisher
    if err := ch.Confirm(false); err != nil {
        return fmt.Errorf("falha ao habilitar confirms: %w", err)
    }

    confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 1))

    err := ch.Publish(
        "",
        queueName,
        false,
        false,
        amqp.Publishing{
            ContentType:  "application/json",
            Body:         body,
            DeliveryMode: amqp.Persistent, // Persiste em disco
        },
    )
    if err != nil {
        return err
    }

    // Aguardar confirmação
    select {
    case confirm := <-confirms:
        if confirm.Ack {
            return nil
        }
        return fmt.Errorf("mensagem rejeitada pelo broker")
    case <-time.After(5 * time.Second):
        return fmt.Errorf("timeout aguardando confirmação")
    }
}

Consumindo Mensagens

Consumer Básico

package main

import (
    "log"

    amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
    conn, err := amqp.Dial("amqp://admin:admin123@localhost:5672/")
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatal(err)
    }
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "hello",
        false,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatal(err)
    }

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    if err != nil {
        log.Fatal(err)
    }

    log.Println("📥 Aguardando mensagens...")

    for msg := range msgs {
        log.Printf("✅ Mensagem recebida: %s\n", msg.Body)
    }
}

Consumer com Ack Manual (Recomendado)

O ack manual garante que mensagens só são removidas da fila após processamento bem-sucedido:

func consumeWithAck(ch *amqp.Channel, queueName string) error {
    msgs, err := ch.Consume(
        queueName,
        "go-consumer",
        false, // auto-ack = false (manual ack)
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        return err
    }

    for msg := range msgs {
        // Processar mensagem
        if err := processMessage(msg.Body); err != nil {
            // Rejeitar e reencaminhar (nack)
            msg.Nack(false, true)
            log.Printf("❌ Erro processando mensagem: %v", err)
            continue
        }

        // Confirmar processamento bem-sucedido (ack)
        msg.Ack(false)
        log.Printf("✅ Mensagem processada: %s", msg.Body)
    }

    return nil
}

func processMessage(body []byte) error {
    // Sua lógica de processamento aqui
    log.Printf("Processando: %s", body)
    return nil
}

Exchanges e Routing Avançado

Exchange Direct (Roteamento Específico)

func setupDirectExchange(ch *amqp.Channel) error {
    // Declarar exchange do tipo direct
    err := ch.ExchangeDeclare(
        "logs_direct", // nome
        "direct",      // tipo
        true,          // durable
        false,         // auto-deleted
        false,         // internal
        false,         // no-wait
        nil,           // arguments
    )
    if err != nil {
        return err
    }

    // Criar fila para logs de erro
    q, err := ch.QueueDeclare(
        "error_logs",
        true,  // durable
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        return err
    }

    // Binding: fila recebe mensagens com routing key "error"
    err = ch.QueueBind(
        q.Name,        // queue
        "error",       // routing key
        "logs_direct", // exchange
        false,
        nil,
    )

    return err
}

// Publicar com routing key específica
func publishLog(ch *amqp.Channel, level string, message []byte) error {
    return ch.Publish(
        "logs_direct",
        level, // routing key: "error", "warning", "info"
        false,
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        message,
        },
    )
}

Exchange Topic (Padrões de Roteamento)

Perfeito para sistemas de logs hierárquicos:

func setupTopicExchange(ch *amqp.Channel) error {
    err := ch.ExchangeDeclare(
        "logs_topic",
        "topic",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        return err
    }

    // Criar fila exclusiva
    q, err := ch.QueueDeclare(
        "",
        false,
        false,
        true,  // exclusive
        false,
        nil,
    )
    if err != nil {
        return err
    }

    // Binding com padrão wildcard
    // "kern.*" = todos os logs do kernel
    // "*.critical" = todos os logs críticos de qualquer facility
    // "#" = todos os logs
    patterns := []string{"kern.*", "*.critical"}
    for _, pattern := range patterns {
        err = ch.QueueBind(q.Name, pattern, "logs_topic", false, nil)
        if err != nil {
            return err
        }
    }

    return nil
}

Exchange Fanout (Broadcast)

func setupFanoutExchange(ch *amqp.Channel) error {
    err := ch.ExchangeDeclare(
        "notifications",
        "fanout",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        return err
    }

    // Todas as filas ligadas recebem todas as mensagens
    queues := []string{"email_service", "push_service", "sms_service"}
    for _, queueName := range queues {
        q, err := ch.QueueDeclare(queueName, true, false, false, false, nil)
        if err != nil {
            return err
        }

        err = ch.QueueBind(q.Name, "", "notifications", false, nil)
        if err != nil {
            return err
        }
    }

    return nil
}

Padrões de Mensageria

Work Queues (Fila de Trabalho)

Distribua tarefas entre múltiplos workers:

// Configuração do consumer para prefetch
func setupWorker(ch *amqp.Channel) error {
    // Fair dispatch: cada worker recebe no máximo 1 mensagem por vez
    err := ch.Qos(
        1,     // prefetch count
        0,     // prefetch size
        false, // global
    )
    if err != nil {
        return err
    }

    msgs, err := ch.Consume(
        "task_queue",
        "",
        false, // manual ack
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        return err
    }

    for msg := range msgs {
        // Simular processamento demorado
        processTask(msg.Body)
        msg.Ack(false)
    }

    return nil
}

func processTask(body []byte) {
    log.Printf("Processando tarefa: %s", body)
    // Simular trabalho...
    time.Sleep(time.Second)
}

RPC (Remote Procedure Call)

// Servidor RPC
func startRPCServer(ch *amqp.Channel) error {
    q, err := ch.QueueDeclare(
        "rpc_queue",
        false,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        return err
    }

    msgs, err := ch.Consume(q.Name, "", false, false, false, false, nil)
    if err != nil {
        return err
    }

    for msg := range msgs {
        go handleRPC(ch, msg)
    }

    return nil
}

func handleRPC(ch *amqp.Channel, msg amqp.Delivery) {
    // Processar requisição
    response := processRPCRequest(msg.Body)

    // Enviar resposta para a fila de reply
    ch.Publish(
        "",
        msg.ReplyTo,
        false,
        false,
        amqp.Publishing{
            ContentType:   "application/json",
            CorrelationId: msg.CorrelationId,
            Body:          response,
        },
    )

    msg.Ack(false)
}

Dead Letter Exchange (DLX)

Trate mensagens que falham repetidamente:

func setupWithDLX(ch *amqp.Channel) error {
    // Declarar exchange para mensagens mortas
    err := ch.ExchangeDeclare(
        "dlx",
        "direct",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        return err
    }

    // Fila para mensagens mortas
    _, err = ch.QueueDeclare(
        "dead_letter_queue",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        return err
    }

    // Fila principal com DLX configurado
    args := amqp.Table{
        "x-dead-letter-exchange":    "dlx",
        "x-dead-letter-routing-key": "failed",
        "x-message-ttl":             30000, // 30s TTL opcional
        "x-max-retries":             3,
    }

    _, err = ch.QueueDeclare(
        "main_queue",
        true,
        false,
        false,
        false,
        args,
    )

    return err
}

Tratamento de Erros

type RabbitMQError struct {
    Op   string
    Err  error
    Code int
}

func (e *RabbitMQError) Error() string {
    return fmt.Sprintf("rabbitmq %s: %v (code: %d)", e.Op, e.Err, e.Code)
}

// Função segura para reconectar
func safeConsume(conn *amqp.Connection, queueName string, handler func([]byte) error) {
    for {
        ch, err := conn.Channel()
        if err != nil {
            log.Printf("Erro ao criar canal: %v", err)
            time.Sleep(5 * time.Second)
            continue
        }

        msgs, err := ch.Consume(queueName, "", false, false, false, false, nil)
        if err != nil {
            log.Printf("Erro ao iniciar consumer: %v", err)
            ch.Close()
            time.Sleep(5 * time.Second)
            continue
        }

        log.Println("✅ Consumer iniciado com sucesso")

        for msg := range msgs {
            if err := handler(msg.Body); err != nil {
                log.Printf("❌ Erro no handler: %v", err)
                msg.Nack(false, true) // requeue
            } else {
                msg.Ack(false)
            }
        }

        log.Println("⚠️  Canal fechado, reconectando...")
        ch.Close()
        time.Sleep(5 * time.Second)
    }
}

Boas Práticas

1. Connection e Channel Management

type RabbitMQClient struct {
    conn    *amqp.Connection
    channel *amqp.Channel
    mu      sync.Mutex
}

func NewRabbitMQClient(url string) (*RabbitMQClient, error) {
    conn, err := amqp.Dial(url)
    if err != nil {
        return nil, err
    }

    ch, err := conn.Channel()
    if err != nil {
        conn.Close()
        return nil, err
    }

    return &RabbitMQClient{
        conn:    conn,
        channel: ch,
    }, nil
}

func (c *RabbitMQClient) Close() {
    c.channel.Close()
    c.conn.Close()
}

2. Configurações de Produção

func productionConfig() *amqp.Config {
    return &amqp.Config{
        Heartbeat: 10 * time.Second,
        Locale:    "en_US",
        Properties: amqp.Table{
            "connection_name": "go-app-production",
        },
    }
}

3. Logging e Observabilidade

func consumeWithMetrics(ch *amqp.Channel, queueName string) error {
    msgs, err := ch.Consume(queueName, "", false, false, false, false, nil)
    if err != nil {
        return err
    }

    for msg := range msgs {
        start := time.Now()

        err := processMessage(msg.Body)
        duration := time.Since(start)

        // Métricas
        if err != nil {
            log.Printf("[ERROR] queue=%s duration=%v error=%v", queueName, duration, err)
            msg.Nack(false, true)
        } else {
            log.Printf("[SUCCESS] queue=%s duration=%v", queueName, duration)
            msg.Ack(false)
        }
    }

    return nil
}

Exemplo Completo: Sistema de Processamento de Pedidos

package main

import (
    "encoding/json"
    "fmt"
    "log"
    "time"

    amqp "github.com/rabbitmq/amqp091-go"
)

type Order struct {
    ID        string    `json:"id"`
    Customer  string    `json:"customer"`
    Product   string    `json:"product"`
    Quantity  int       `json:"quantity"`
    Timestamp time.Time `json:"timestamp"`
}

func main() {
    conn, err := amqp.Dial("amqp://admin:admin123@localhost:5672/")
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatal(err)
    }
    defer ch.Close()

    // Setup
    setupOrderProcessing(ch)

    // Iniciar consumer em goroutine
    go consumeOrders(ch)

    // Publicar algumas ordens de exemplo
    for i := 1; i <= 5; i++ {
        order := Order{
            ID:        fmt.Sprintf("ORD-%d", i),
            Customer:  fmt.Sprintf("Cliente %d", i),
            Product:   "Produto A",
            Quantity:  i * 2,
            Timestamp: time.Now(),
        }

        if err := publishOrder(ch, order); err != nil {
            log.Printf("Erro ao publicar: %v", err)
        }

        time.Sleep(time.Second)
    }

    // Manter rodando
    select {}
}

func setupOrderProcessing(ch *amqp.Channel) error {
    // Exchange para pedidos
    err := ch.ExchangeDeclare(
        "orders",
        "direct",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        return err
    }

    // Fila de pedidos
    _, err = ch.QueueDeclare(
        "order_queue",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        return err
    }

    return ch.QueueBind("order_queue", "new", "orders", false, nil)
}

func publishOrder(ch *amqp.Channel, order Order) error {
    body, err := json.Marshal(order)
    if err != nil {
        return err
    }

    return ch.Publish(
        "orders",
        "new",
        false,
        false,
        amqp.Publishing{
            ContentType:  "application/json",
            Body:         body,
            DeliveryMode: amqp.Persistent,
            Timestamp:    time.Now(),
        },
    )
}

func consumeOrders(ch *amqp.Channel) {
    msgs, err := ch.Consume(
        "order_queue",
        "order-processor",
        false,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatal(err)
    }

    for msg := range msgs {
        var order Order
        if err := json.Unmarshal(msg.Body, &order); err != nil {
            log.Printf("Erro ao decodificar: %v", err)
            msg.Nack(false, false) // descartar
            continue
        }

        log.Printf("📦 Processando pedido %s de %s", order.ID, order.Customer)

        // Simular processamento
        time.Sleep(500 * time.Millisecond)

        log.Printf("✅ Pedido %s processado com sucesso!", order.ID)
        msg.Ack(false)
    }
}

Próximos Passos

Agora que você domina RabbitMQ com Go, explore estes tópicos avançados:

  1. Go e Kafka: Processamento de Streaming - Para alto throughput
  2. Go e Redis: Cache e Session Store - Complemente com cache
  3. Go Concurrency Patterns - Goroutines avançadas
  4. Go Observability: Logs, Métricas e Traces - Monitore sua mensageria

FAQ

Qual a diferença entre RabbitMQ e Kafka?

RabbitMQ é melhor para mensageria complexa com routing avançado, enquanto Kafka é ideal para streaming de eventos com alto throughput e retenção de longo prazo.

Como escalo consumers em RabbitMQ?

Simplesmente inicie múltiplas instâncias do mesmo consumer. RabbitMQ distribui mensagens automaticamente usando round-robin.

RabbitMQ é adequado para microserviços?

Sim! RabbitMQ é excelente para comunicação assíncrona entre microserviços, proporcionando desacoplamento e resiliência.

Como garanto que mensagens não sejam perdidas?

Use: filas duráveis (durable=true), mensagens persistentes (DeliveryMode: amqp.Persistent), e confirmações de publisher (Confirm).

Posso usar RabbitMQ em produção com Go?

Absolutamente! Empresas como Uber, Reddit e Mercado Livre usam RabbitMQ em produção com milhões de mensagens diárias.


Referências: