Go e RabbitMQ: Mensageria Assíncrona Completa
Introdução
RabbitMQ é um dos brokers de mensagens mais populares do mundo, usado por empresas como Uber, Reddit e Stripe para processar bilhões de mensagens diariamente. Quando combinado com Go, criamos sistemas altamente performáticos, confiáveis e escaláveis.
Neste guia completo, você vai aprender desde os conceitos fundamentais até padrões avançados de mensageria com Go e RabbitMQ.
O que é RabbitMQ?
RabbitMQ é um message broker (corretor de mensagens) de código aberto que implementa o protocolo AMQP (Advanced Message Queuing Protocol). Ele atua como um intermediário entre aplicações, permitindo comunicação assíncrona e desacoplada.
Por que usar RabbitMQ com Go?
- Alta performance: Go + RabbitMQ processa milhares de mensagens/segundo
- Confiabilidade: Garantia de entrega, persistência e confirmações
- Escalabilidade: Distribua carga entre múltiplos consumers
- Desacoplamento: Produtores não precisam saber sobre consumidores
- Resiliência: Filas persistem mesmo durante falhas
Instalação e Configuração
Instalando RabbitMQ
Docker (Recomendado para desenvolvimento)
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin123 \
rabbitmq:3-management
Acesse o painel de administração em http://localhost:15672 (admin/admin123).
Instalação Local (Ubuntu/Debian)
# Adicionar repositório
sudo apt-get install -y curl gnupg
curl -fsSL https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey | sudo gpg --dearmor -o /usr/share/keyrings/rabbitmq-archive-keyring.gpg
# Instalar
echo "deb [signed-by=/usr/share/keyrings/rabbitmq-archive-keyring.gpg] https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu/ $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
sudo apt-get update
sudo apt-get install -y rabbitmq-server
# Iniciar serviço
sudo systemctl start rabbitmq-server
sudo rabbitmq-plugins enable rabbitmq_management
Instalando a Biblioteca Go AMQP
go get github.com/rabbitmq/amqp091-go
Conceitos Fundamentais
Antes de codificar, entenda estes conceitos essenciais:
| Conceito | Descrição |
|---|---|
| Producer | Aplicação que envia mensagens |
| Consumer | Aplicação que recebe e processa mensagens |
| Queue | Fila que armazena mensagens |
| Exchange | Roteia mensagens para filas |
| Binding | Conexão entre exchange e queue |
| Routing Key | Chave usada para rotear mensagens |
Tipos de Exchange
┌─────────────┐ ┌──────────┐ ┌─────────┐
│ Producer │────▶│ Exchange │────▶│ Queue │
└─────────────┘ └──────────┘ └────┬────┘
│
▼
┌───────────┐
│ Consumer │
└───────────┘
- Direct: Roteia por routing key exata
- Fanout: Broadcast para todas as filas ligadas
- Topic: Roteia por padrões de routing key (
logs.error.*) - Headers: Roteia por headers da mensagem
Conectando ao RabbitMQ
package main
import (
"fmt"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func connectRabbitMQ() (*amqp.Connection, error) {
// String de conexão AMQP
// amqp://usuario:senha@host:porta/vhost
conn, err := amqp.Dial("amqp://admin:admin123@localhost:5672/")
if err != nil {
return nil, fmt.Errorf("falha ao conectar ao RabbitMQ: %w", err)
}
return conn, nil
}
func main() {
conn, err := connectRabbitMQ()
if err != nil {
log.Fatal(err)
}
defer conn.Close()
fmt.Println("✅ Conectado ao RabbitMQ com sucesso!")
}
Conexão com Retry
Em produção, sempre implemente retry para lidar com falhas temporárias:
func connectWithRetry(url string, maxRetries int) (*amqp.Connection, error) {
var conn *amqp.Connection
var err error
for i := 0; i < maxRetries; i++ {
conn, err = amqp.Dial(url)
if err == nil {
return conn, nil
}
log.Printf("Tentativa %d/%d falhou: %v. Retrying em 2s...", i+1, maxRetries, err)
time.Sleep(2 * time.Second)
}
return nil, fmt.Errorf("falha após %d tentativas: %w", maxRetries, err)
}
Publicando Mensagens
Producer Simples
package main
import (
"context"
"fmt"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
conn, err := amqp.Dial("amqp://admin:admin123@localhost:5672/")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatal(err)
}
defer ch.Close()
// Declarar fila (cria se não existir)
q, err := ch.QueueDeclare(
"hello", // nome
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
body := "Olá, RabbitMQ!"
err = ch.PublishWithContext(
ctx,
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
},
)
if err != nil {
log.Fatal(err)
}
fmt.Printf("✅ Mensagem enviada: %s\n", body)
}
Producer com Confirmações (Confirms)
Para garantir que mensagens chegaram ao broker:
func publishWithConfirm(ch *amqp.Channel, queueName string, body []byte) error {
// Habilitar confirmações de publisher
if err := ch.Confirm(false); err != nil {
return fmt.Errorf("falha ao habilitar confirms: %w", err)
}
confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 1))
err := ch.Publish(
"",
queueName,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: body,
DeliveryMode: amqp.Persistent, // Persiste em disco
},
)
if err != nil {
return err
}
// Aguardar confirmação
select {
case confirm := <-confirms:
if confirm.Ack {
return nil
}
return fmt.Errorf("mensagem rejeitada pelo broker")
case <-time.After(5 * time.Second):
return fmt.Errorf("timeout aguardando confirmação")
}
}
Consumindo Mensagens
Consumer Básico
package main
import (
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
conn, err := amqp.Dial("amqp://admin:admin123@localhost:5672/")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatal(err)
}
defer ch.Close()
q, err := ch.QueueDeclare(
"hello",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Fatal(err)
}
log.Println("📥 Aguardando mensagens...")
for msg := range msgs {
log.Printf("✅ Mensagem recebida: %s\n", msg.Body)
}
}
Consumer com Ack Manual (Recomendado)
O ack manual garante que mensagens só são removidas da fila após processamento bem-sucedido:
func consumeWithAck(ch *amqp.Channel, queueName string) error {
msgs, err := ch.Consume(
queueName,
"go-consumer",
false, // auto-ack = false (manual ack)
false,
false,
false,
nil,
)
if err != nil {
return err
}
for msg := range msgs {
// Processar mensagem
if err := processMessage(msg.Body); err != nil {
// Rejeitar e reencaminhar (nack)
msg.Nack(false, true)
log.Printf("❌ Erro processando mensagem: %v", err)
continue
}
// Confirmar processamento bem-sucedido (ack)
msg.Ack(false)
log.Printf("✅ Mensagem processada: %s", msg.Body)
}
return nil
}
func processMessage(body []byte) error {
// Sua lógica de processamento aqui
log.Printf("Processando: %s", body)
return nil
}
Exchanges e Routing Avançado
Exchange Direct (Roteamento Específico)
func setupDirectExchange(ch *amqp.Channel) error {
// Declarar exchange do tipo direct
err := ch.ExchangeDeclare(
"logs_direct", // nome
"direct", // tipo
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return err
}
// Criar fila para logs de erro
q, err := ch.QueueDeclare(
"error_logs",
true, // durable
false,
false,
false,
nil,
)
if err != nil {
return err
}
// Binding: fila recebe mensagens com routing key "error"
err = ch.QueueBind(
q.Name, // queue
"error", // routing key
"logs_direct", // exchange
false,
nil,
)
return err
}
// Publicar com routing key específica
func publishLog(ch *amqp.Channel, level string, message []byte) error {
return ch.Publish(
"logs_direct",
level, // routing key: "error", "warning", "info"
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: message,
},
)
}
Exchange Topic (Padrões de Roteamento)
Perfeito para sistemas de logs hierárquicos:
func setupTopicExchange(ch *amqp.Channel) error {
err := ch.ExchangeDeclare(
"logs_topic",
"topic",
true,
false,
false,
false,
nil,
)
if err != nil {
return err
}
// Criar fila exclusiva
q, err := ch.QueueDeclare(
"",
false,
false,
true, // exclusive
false,
nil,
)
if err != nil {
return err
}
// Binding com padrão wildcard
// "kern.*" = todos os logs do kernel
// "*.critical" = todos os logs críticos de qualquer facility
// "#" = todos os logs
patterns := []string{"kern.*", "*.critical"}
for _, pattern := range patterns {
err = ch.QueueBind(q.Name, pattern, "logs_topic", false, nil)
if err != nil {
return err
}
}
return nil
}
Exchange Fanout (Broadcast)
func setupFanoutExchange(ch *amqp.Channel) error {
err := ch.ExchangeDeclare(
"notifications",
"fanout",
true,
false,
false,
false,
nil,
)
if err != nil {
return err
}
// Todas as filas ligadas recebem todas as mensagens
queues := []string{"email_service", "push_service", "sms_service"}
for _, queueName := range queues {
q, err := ch.QueueDeclare(queueName, true, false, false, false, nil)
if err != nil {
return err
}
err = ch.QueueBind(q.Name, "", "notifications", false, nil)
if err != nil {
return err
}
}
return nil
}
Padrões de Mensageria
Work Queues (Fila de Trabalho)
Distribua tarefas entre múltiplos workers:
// Configuração do consumer para prefetch
func setupWorker(ch *amqp.Channel) error {
// Fair dispatch: cada worker recebe no máximo 1 mensagem por vez
err := ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
if err != nil {
return err
}
msgs, err := ch.Consume(
"task_queue",
"",
false, // manual ack
false,
false,
false,
nil,
)
if err != nil {
return err
}
for msg := range msgs {
// Simular processamento demorado
processTask(msg.Body)
msg.Ack(false)
}
return nil
}
func processTask(body []byte) {
log.Printf("Processando tarefa: %s", body)
// Simular trabalho...
time.Sleep(time.Second)
}
RPC (Remote Procedure Call)
// Servidor RPC
func startRPCServer(ch *amqp.Channel) error {
q, err := ch.QueueDeclare(
"rpc_queue",
false,
false,
false,
false,
nil,
)
if err != nil {
return err
}
msgs, err := ch.Consume(q.Name, "", false, false, false, false, nil)
if err != nil {
return err
}
for msg := range msgs {
go handleRPC(ch, msg)
}
return nil
}
func handleRPC(ch *amqp.Channel, msg amqp.Delivery) {
// Processar requisição
response := processRPCRequest(msg.Body)
// Enviar resposta para a fila de reply
ch.Publish(
"",
msg.ReplyTo,
false,
false,
amqp.Publishing{
ContentType: "application/json",
CorrelationId: msg.CorrelationId,
Body: response,
},
)
msg.Ack(false)
}
Dead Letter Exchange (DLX)
Trate mensagens que falham repetidamente:
func setupWithDLX(ch *amqp.Channel) error {
// Declarar exchange para mensagens mortas
err := ch.ExchangeDeclare(
"dlx",
"direct",
true,
false,
false,
false,
nil,
)
if err != nil {
return err
}
// Fila para mensagens mortas
_, err = ch.QueueDeclare(
"dead_letter_queue",
true,
false,
false,
false,
nil,
)
if err != nil {
return err
}
// Fila principal com DLX configurado
args := amqp.Table{
"x-dead-letter-exchange": "dlx",
"x-dead-letter-routing-key": "failed",
"x-message-ttl": 30000, // 30s TTL opcional
"x-max-retries": 3,
}
_, err = ch.QueueDeclare(
"main_queue",
true,
false,
false,
false,
args,
)
return err
}
Tratamento de Erros
type RabbitMQError struct {
Op string
Err error
Code int
}
func (e *RabbitMQError) Error() string {
return fmt.Sprintf("rabbitmq %s: %v (code: %d)", e.Op, e.Err, e.Code)
}
// Função segura para reconectar
func safeConsume(conn *amqp.Connection, queueName string, handler func([]byte) error) {
for {
ch, err := conn.Channel()
if err != nil {
log.Printf("Erro ao criar canal: %v", err)
time.Sleep(5 * time.Second)
continue
}
msgs, err := ch.Consume(queueName, "", false, false, false, false, nil)
if err != nil {
log.Printf("Erro ao iniciar consumer: %v", err)
ch.Close()
time.Sleep(5 * time.Second)
continue
}
log.Println("✅ Consumer iniciado com sucesso")
for msg := range msgs {
if err := handler(msg.Body); err != nil {
log.Printf("❌ Erro no handler: %v", err)
msg.Nack(false, true) // requeue
} else {
msg.Ack(false)
}
}
log.Println("⚠️ Canal fechado, reconectando...")
ch.Close()
time.Sleep(5 * time.Second)
}
}
Boas Práticas
1. Connection e Channel Management
type RabbitMQClient struct {
conn *amqp.Connection
channel *amqp.Channel
mu sync.Mutex
}
func NewRabbitMQClient(url string) (*RabbitMQClient, error) {
conn, err := amqp.Dial(url)
if err != nil {
return nil, err
}
ch, err := conn.Channel()
if err != nil {
conn.Close()
return nil, err
}
return &RabbitMQClient{
conn: conn,
channel: ch,
}, nil
}
func (c *RabbitMQClient) Close() {
c.channel.Close()
c.conn.Close()
}
2. Configurações de Produção
func productionConfig() *amqp.Config {
return &amqp.Config{
Heartbeat: 10 * time.Second,
Locale: "en_US",
Properties: amqp.Table{
"connection_name": "go-app-production",
},
}
}
3. Logging e Observabilidade
func consumeWithMetrics(ch *amqp.Channel, queueName string) error {
msgs, err := ch.Consume(queueName, "", false, false, false, false, nil)
if err != nil {
return err
}
for msg := range msgs {
start := time.Now()
err := processMessage(msg.Body)
duration := time.Since(start)
// Métricas
if err != nil {
log.Printf("[ERROR] queue=%s duration=%v error=%v", queueName, duration, err)
msg.Nack(false, true)
} else {
log.Printf("[SUCCESS] queue=%s duration=%v", queueName, duration)
msg.Ack(false)
}
}
return nil
}
Exemplo Completo: Sistema de Processamento de Pedidos
package main
import (
"encoding/json"
"fmt"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
type Order struct {
ID string `json:"id"`
Customer string `json:"customer"`
Product string `json:"product"`
Quantity int `json:"quantity"`
Timestamp time.Time `json:"timestamp"`
}
func main() {
conn, err := amqp.Dial("amqp://admin:admin123@localhost:5672/")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatal(err)
}
defer ch.Close()
// Setup
setupOrderProcessing(ch)
// Iniciar consumer em goroutine
go consumeOrders(ch)
// Publicar algumas ordens de exemplo
for i := 1; i <= 5; i++ {
order := Order{
ID: fmt.Sprintf("ORD-%d", i),
Customer: fmt.Sprintf("Cliente %d", i),
Product: "Produto A",
Quantity: i * 2,
Timestamp: time.Now(),
}
if err := publishOrder(ch, order); err != nil {
log.Printf("Erro ao publicar: %v", err)
}
time.Sleep(time.Second)
}
// Manter rodando
select {}
}
func setupOrderProcessing(ch *amqp.Channel) error {
// Exchange para pedidos
err := ch.ExchangeDeclare(
"orders",
"direct",
true,
false,
false,
false,
nil,
)
if err != nil {
return err
}
// Fila de pedidos
_, err = ch.QueueDeclare(
"order_queue",
true,
false,
false,
false,
nil,
)
if err != nil {
return err
}
return ch.QueueBind("order_queue", "new", "orders", false, nil)
}
func publishOrder(ch *amqp.Channel, order Order) error {
body, err := json.Marshal(order)
if err != nil {
return err
}
return ch.Publish(
"orders",
"new",
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: body,
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
},
)
}
func consumeOrders(ch *amqp.Channel) {
msgs, err := ch.Consume(
"order_queue",
"order-processor",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
for msg := range msgs {
var order Order
if err := json.Unmarshal(msg.Body, &order); err != nil {
log.Printf("Erro ao decodificar: %v", err)
msg.Nack(false, false) // descartar
continue
}
log.Printf("📦 Processando pedido %s de %s", order.ID, order.Customer)
// Simular processamento
time.Sleep(500 * time.Millisecond)
log.Printf("✅ Pedido %s processado com sucesso!", order.ID)
msg.Ack(false)
}
}
Próximos Passos
Agora que você domina RabbitMQ com Go, explore estes tópicos avançados:
- Go e Kafka: Processamento de Streaming - Para alto throughput
- Go e Redis: Cache e Session Store - Complemente com cache
- Go Concurrency Patterns - Goroutines avançadas
- Go Observability: Logs, Métricas e Traces - Monitore sua mensageria
FAQ
Qual a diferença entre RabbitMQ e Kafka?
RabbitMQ é melhor para mensageria complexa com routing avançado, enquanto Kafka é ideal para streaming de eventos com alto throughput e retenção de longo prazo.
Como escalo consumers em RabbitMQ?
Simplesmente inicie múltiplas instâncias do mesmo consumer. RabbitMQ distribui mensagens automaticamente usando round-robin.
RabbitMQ é adequado para microserviços?
Sim! RabbitMQ é excelente para comunicação assíncrona entre microserviços, proporcionando desacoplamento e resiliência.
Como garanto que mensagens não sejam perdidas?
Use: filas duráveis (durable=true), mensagens persistentes (DeliveryMode: amqp.Persistent), e confirmações de publisher (Confirm).
Posso usar RabbitMQ em produção com Go?
Absolutamente! Empresas como Uber, Reddit e Mercado Livre usam RabbitMQ em produção com milhões de mensagens diárias.
Referências: