Go e Kafka: Processamento de Streaming
Apache Kafka é a plataforma de streaming de eventos mais popular do mundo, usada por milhares de empresas para processar trilhões de eventos diariamente. Com Go, você pode construir aplicações de streaming de alta performance e baixa latência.
Neste guia completo, você vai aprender a usar Kafka com Go desde o básico até padrões avançados de processamento.
O Que Você Vai Aprender
- Fundamentos do Apache Kafka
- Configuração do cliente Kafka em Go
- Implementação de producers e consumers
- Gerenciamento de topics e partitions
- Consumer groups para escalabilidade
- Tratamento de erros e retries
- Casos de uso do mundo real
Por Que Kafka?
O Problema: Processamento em Lote vs Streaming
Tradicionalmente, sistemas processavam dados em lotes (batch):
[Coleta] → [Espera 24h] → [Processamento] → [Resultado]
↓ ↓ ↓ ↓
Dados Aguarda Processa Delay
Com streaming usando Kafka:
[Evento] → [Kafka] → [Processamento em tempo real] → [Resultado]
↓ ↓ ↓ ↓
Instantâneo Buffer Imediato Agora
| Aspecto | Batch Processing | Stream Processing |
|---|---|---|
| Latência | Horas/dias | Milissegundos |
| Escalabilidade | Limitada | Horizontal |
| Tolerância a falhas | Complexa | Nativa |
| Custo | Alto (hardware) | Otimizado |
Casos de Uso
- E-commerce: rastreamento de pedidos em tempo real
- Bancos: detecção de fraude instantânea
- IoT: processamento de sensores
- Mídia: personalização de conteúdo
- Logs: agregação centralizada
Configurando o Projeto
1. Inicialização
mkdir go-kafka-app
cd go-kafka-app
go mod init github.com/seuusuario/go-kafka-app
2. Instalando Dependências
go get github.com/IBM/sarama
go get github.com/IBM/sarama/tools/kafka-console-producer
go get github.com/IBM/sarama/tools/kafka-console-consumer
Sarama é a biblioteca Go mais popular para Apache Kafka.
3. Docker Compose para Desenvolvimento
Crie docker-compose.yml:
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_LOG_SEGMENT_BYTES: 1073741824
KAFKA_LOG_RETENTION_BYTES: 1073741824
kafka-ui:
image: provectuslabs/kafka-ui:latest
depends_on:
- kafka
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
Inicie o cluster:
docker-compose up -d
Acesse o Kafka UI em http://localhost:8080
Fundamentos do Kafka
Conceitos Principais
┌─────────────────────────────────────────────────────────────┐
│ Apache Kafka │
├─────────────────────────────────────────────────────────────┤
│ Topic: "orders" │
│ ┌─────────────┬─────────────┬─────────────┐ │
│ │ Partition 0 │ Partition 1 │ Partition 2 │ │
│ │ Offset 0 │ Offset 0 │ Offset 0 │ │
│ │ Offset 1 │ Offset 1 │ Offset 1 │ │
│ │ Offset 2 │ Offset 2 │ Offset 2 │ │
│ │ ... │ ... │ ... │ │
│ └─────────────┴─────────────┴─────────────┘ │
│ │
│ Producer → │ ← Consumer Group (3 consumers) ← │
└─────────────────────────────────────────────────────────────┘
| Conceito | Descrição |
|---|---|
| Topic | Stream de dados categorizado |
| Partition | Subdivisão de um topic para paralelismo |
| Offset | Posição única de uma mensagem na partition |
| Producer | Envia mensagens para topics |
| Consumer | Lê mensagens de topics |
| Consumer Group | Grupo que divide o processamento |
| Broker | Servidor Kafka que armazena dados |
Garantias de Entrega
// Tres níveis de garantia
// 0: No acknowledgment (mais rápido, menos seguro)
config.Producer.RequiredAcks = sarama.NoResponse
// 1: Leader acknowledgment (padrão)
config.Producer.RequiredAcks = sarama.WaitForLocal
// -1: All in-sync replicas (mais seguro)
config.Producer.RequiredAcks = sarama.WaitForAll
| Garantia | Latência | Confiabilidade | Caso de Uso |
|---|---|---|---|
NoResponse | ~1ms | Pode perder mensagens | Logs não-críticos |
WaitForLocal | ~10ms | Lider persiste | Processamento geral |
WaitForAll | ~50ms | Réplicas sincronizam | Pagamentos, transações |
Implementando o Producer
Producer Simples
package main
import (
"encoding/json"
"log"
"time"
"github.com/IBM/sarama"
)
// Order representa um pedido
type Order struct {
ID string `json:"id"`
Customer string `json:"customer"`
Product string `json:"product"`
Amount float64 `json:"amount"`
Status string `json:"status"`
Timestamp time.Time `json:"timestamp"`
}
type Producer struct {
producer sarama.AsyncProducer
topic string
}
func NewProducer(brokers []string, topic string) (*Producer, error) {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Compression = sarama.CompressionSnappy
config.Producer.Flush.Frequency = 500 * time.Millisecond
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
producer, err := sarama.NewAsyncProducer(brokers, config)
if err != nil {
return nil, err
}
return &Producer{
producer: producer,
topic: topic,
}, nil
}
func (p *Producer) SendOrder(order *Order) error {
data, err := json.Marshal(order)
if err != nil {
return err
}
msg := &sarama.ProducerMessage{
Topic: p.topic,
Key: sarama.StringEncoder(order.Customer),
Value: sarama.ByteEncoder(data),
Timestamp: time.Now(),
Headers: []sarama.RecordHeader{
{
Key: []byte("version"),
Value: []byte("1.0"),
},
},
}
p.producer.Input() <- msg
return nil
}
func (p *Producer) Close() error {
return p.producer.Close()
}
func (p *Producer) HandleErrors() {
go func() {
for err := range p.producer.Errors() {
log.Printf("Erro ao enviar mensagem: %v\n", err)
}
}()
}
func (p *Producer) HandleSuccesses() {
go func() {
for msg := range p.producer.Successes() {
log.Printf("Mensagem enviada: topic=%s, partition=%d, offset=%d\n",
msg.Topic, msg.Partition, msg.Offset)
}
}()
}
Producer com Retry
type RetryProducer struct {
producer sarama.SyncProducer
topic string
maxRetries int
}
func NewRetryProducer(brokers []string, topic string, maxRetries int) (*RetryProducer, error) {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = maxRetries
config.Producer.Retry.Backoff = 100 * time.Millisecond
config.Producer.Return.Successes = true
config.Producer.Idempotent = true // Exactly-once semantics
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
return nil, err
}
return &RetryProducer{
producer: producer,
topic: topic,
maxRetries: maxRetries,
}, nil
}
func (p *RetryProducer) SendWithRetry(order *Order) error {
data, err := json.Marshal(order)
if err != nil {
return err
}
msg := &sarama.ProducerMessage{
Topic: p.topic,
Key: sarama.StringEncoder(order.ID),
Value: sarama.ByteEncoder(data),
}
partition, offset, err := p.producer.SendMessage(msg)
if err != nil {
return fmt.Errorf("falha após %d retries: %w", p.maxRetries, err)
}
log.Printf("Mensagem persistida: partition=%d, offset=%d\n", partition, offset)
return nil
}
Particionamento Customizado
type HashPartitioner struct{}
func (p *HashPartitioner) Partition(msg *sarama.ProducerMessage, numPartitions int32) (int32, error) {
if msg.Key == nil {
return sarama.NewRandomPartitioner().Partition(msg, numPartitions)
}
// Garante mesma partition para mesma key
keyBytes, err := msg.Key.Encode()
if err != nil {
return -1, err
}
hash := fnv.New32a()
hash.Write(keyBytes)
return int32(hash.Sum32()) % numPartitions, nil
}
func (p *HashPartitioner) RequiresConsistency() bool {
return true
}
// Uso
config := sarama.NewConfig()
config.Producer.Partitioner = func(topic string) sarama.Partitioner {
return &HashPartitioner{}
}
Implementando o Consumer
Consumer Simples
package main
import (
"context"
"encoding/json"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/IBM/sarama"
)
type OrderConsumer struct {
consumer sarama.Consumer
topic string
handler OrderHandler
}
type OrderHandler interface {
HandleOrder(order Order) error
}
type OrderProcessor struct{}
func (p *OrderProcessor) HandleOrder(order Order) error {
log.Printf("Processando pedido: ID=%s, Customer=%s, Amount=%.2f\n",
order.ID, order.Customer, order.Amount)
// Processamento real aqui
time.Sleep(100 * time.Millisecond)
log.Printf("Pedido processado: %s\n", order.ID)
return nil
}
func NewOrderConsumer(brokers []string, topic string, handler OrderHandler) (*OrderConsumer, error) {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.AutoCommit.Enable = false
consumer, err := sarama.NewConsumer(brokers, config)
if err != nil {
return nil, err
}
return &OrderConsumer{
consumer: consumer,
topic: topic,
handler: handler,
}, nil
}
func (c *OrderConsumer) ConsumePartitions(ctx context.Context) error {
// Obtém lista de partitions
partitions, err := c.consumer.Partitions(c.topic)
if err != nil {
return err
}
var wg sync.WaitGroup
errors := make(chan error, len(partitions))
// Consome cada partition em paralelo
for _, partition := range partitions {
wg.Add(1)
go func(p int32) {
defer wg.Done()
if err := c.consumePartition(ctx, p); err != nil {
errors <- err
}
}(partition)
}
// Espera signal de término
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-sigterm:
log.Println("Recebido sinal de término")
case err := <-errors:
log.Printf("Erro no consumo: %v\n", err)
}
wg.Wait()
return c.consumer.Close()
}
func (c *OrderConsumer) consumePartition(ctx context.Context, partition int32) error {
pc, err := c.consumer.ConsumePartition(c.topic, partition, sarama.OffsetNewest)
if err != nil {
return err
}
defer pc.Close()
log.Printf("Iniciando consumo da partition %d\n", partition)
for {
select {
case <-ctx.Done():
return nil
case msg := <-pc.Messages():
var order Order
if err := json.Unmarshal(msg.Value, &order); err != nil {
log.Printf("Erro ao deserializar: %v\n", err)
continue
}
// Processa a mensagem
if err := c.handler.HandleOrder(order); err != nil {
log.Printf("Erro ao processar pedido: %v\n", err)
continue
}
log.Printf("Partition %d - Offset %d processado\n", msg.Partition, msg.Offset)
case err := <-pc.Errors():
log.Printf("Erro na partition %d: %v\n", partition, err)
}
}
}
Consumer Groups (Escalabilidade)
package main
import (
"context"
"encoding/json"
"log"
"time"
"github.com/IBM/sarama"
)
// OrderConsumerGroup implementa sarama.ConsumerGroupHandler
type OrderConsumerGroup struct {
ready chan bool
readyMu sync.Mutex
}
func NewOrderConsumerGroup() *OrderConsumerGroup {
return &OrderConsumerGroup{
ready: make(chan bool),
}
}
// Setup é executado quando o consumer inicia
func (c *OrderConsumerGroup) Setup(session sarama.ConsumerGroupSession) error {
log.Printf("Setup do consumer group. Claims: %v\n", session.Claims())
c.readyMu.Lock()
close(c.ready)
c.readyMu.Unlock()
return nil
}
// Cleanup é executado quando o consumer termina
func (c *OrderConsumerGroup) Cleanup(session sarama.ConsumerGroupSession) error {
log.Println("Cleanup do consumer group")
return nil
}
// ConsumeClaim processa mensagens de uma partition
func (c *OrderConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
log.Printf("Iniciando processamento da partition %d\n", claim.Partition())
for {
select {
case <-session.Context().Done():
return nil
case msg := <-claim.Messages():
if msg == nil {
return nil
}
log.Printf("[%s] Partition=%d Offset=%d Key=%s\n",
claim.Topic(), msg.Partition, msg.Offset, msg.Key)
// Processa a mensagem
var order Order
if err := json.Unmarshal(msg.Value, &order); err != nil {
log.Printf("Erro ao deserializar: %v\n", err)
session.MarkMessage(msg, "failed")
continue
}
// Simula processamento
if err := processOrder(order); err != nil {
log.Printf("Erro ao processar: %v\n", err)
// Não marca como processado - será reprocessado
continue
}
// Confirma processamento
session.MarkMessage(msg, "")
log.Printf("Mensagem processada: order=%s\n", order.ID)
}
}
}
func processOrder(order Order) error {
// Implemente seu processamento aqui
log.Printf("Processando order=%s, customer=%s\n", order.ID, order.Customer)
time.Sleep(100 * time.Millisecond)
return nil
}
func StartConsumerGroup(ctx context.Context, brokers []string, topic string, groupID string) error {
config := sarama.NewConfig()
config.Version = sarama.V2_6_0_0
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Offsets.Initial = sarama.OffsetOldest
client, err := sarama.NewConsumerGroup(brokers, groupID, config)
if err != nil {
return err
}
defer client.Close()
consumer := NewOrderConsumerGroup()
log.Printf("Iniciando consumer group: %s\n", groupID)
for {
select {
case <-ctx.Done():
return nil
default:
}
if err := client.Consume(ctx, []string{topic}, consumer); err != nil {
log.Printf("Erro no consumo: %v\n", err)
time.Sleep(5 * time.Second)
}
}
}
Rebalance Strategies
// 1. Range (padrão)
// Divide partitions em ranges proporcionais
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
// 2. RoundRobin (mais equilibrado)
// Distribui partitions sequencialmente
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
// 3. Sticky (balanceada e estável)
// Mantém assignments anteriores quando possível
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
| Estratégia | Uso Recomendado |
|---|---|
| Range | Partições bem distribuídas em topics múltiplos |
| RoundRobin | Melhor distribuição geral |
| Sticky | Quando assignments estáveis são importantes |
Tratamento de Erros e Resiliência
Circuit Breaker Pattern
type CircuitBreaker struct {
failures int
lastFailure time.Time
threshold int
timeout time.Duration
state State
}
type State int
const (
StateClosed State = iota // Normal operation
StateOpen // Failing fast
StateHalfOpen // Testing if recovered
)
func (cb *CircuitBreaker) Call(fn func() error) error {
cb.mutex.Lock()
if cb.state == StateOpen {
if time.Since(cb.lastFailure) > cb.timeout {
cb.state = StateHalfOpen
} else {
cb.mutex.Unlock()
return errors.New("circuit breaker open")
}
}
cb.mutex.Unlock()
err := fn()
cb.mutex.Lock()
defer cb.mutex.Unlock()
if err != nil {
cb.failures++
cb.lastFailure = time.Now()
if cb.failures >= cb.threshold {
cb.state = StateOpen
}
return err
}
// Success - reset
cb.failures = 0
cb.state = StateClosed
return nil
}
Dead Letter Queue
func (c *Consumer) handleMessage(msg *sarama.ConsumerMessage) {
var event Event
if err := json.Unmarshal(msg.Value, &event); err != nil {
log.Printf("Erro de parsing: %v\n", err)
c.sendToDLQ(msg, "parse_error")
return
}
// Tenta processar com retry
for attempt := 0; attempt < 3; attempt++ {
if err := c.process(event); err == nil {
return
}
time.Sleep(time.Duration(attempt) * time.Second)
}
// Falha após retries - envia para DLQ
log.Printf("Enviando para DLQ após falhas. Event: %s\n", event.ID)
c.sendToDLQ(msg, "processing_failed")
}
func (c *Consumer) sendToDLQ(msg *sarama.ConsumerMessage, reason string) {
dlqMsg := &sarama.ProducerMessage{
Topic: "orders.dlq",
Key: msg.Key,
Value: msg.Value,
Headers: []sarama.RecordHeader{
{Key: []byte("original-topic"), Value: []byte(msg.Topic)},
{Key: []byte("failure-reason"), Value: []byte(reason)},
{Key: []byte("timestamp"), Value: []byte(time.Now().Format(time.RFC3339))},
},
}
c.dlqProducer.SendMessage(dlqMsg)
}
Exponential Backoff
func consumeWithBackoff(ctx context.Context, consumer sarama.Consumer, topic string) error {
maxRetries := 5
backoff := time.Second
for attempt := 0; attempt < maxRetries; attempt++ {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
err := consume(consumer, topic)
if err == nil {
return nil
}
if attempt < maxRetries-1 {
sleep := backoff * time.Duration(1<<attempt) // 1s, 2s, 4s, 8s, 16s
log.Printf("Tentativa %d falhou, aguardando %v\n", attempt+1, sleep)
time.Sleep(sleep)
}
}
return fmt.Errorf("falhou após %d tentativas", maxRetries)
}
Padrões Avançados
Windowing (Janelas de Processamento)
type WindowedProcessor struct {
window time.Duration
buffer []Event
lastFlush time.Time
}
func (p *WindowedProcessor) Process(event Event) {
p.buffer = append(p.buffer, event)
if time.Since(p.lastFlush) >= p.window {
p.flush()
}
}
func (p *WindowedProcessor) flush() {
if len(p.buffer) == 0 {
return
}
// Processing batch
log.Printf("Processando janela de %d eventos\n", len(p.buffer))
// Database insert batch
// etc.
p.buffer = p.buffer[:0]
p.lastFlush = time.Now()
}
Join de Streams
type StreamJoiner struct {
orders map[string]Order
payments map[string]Payment
cacheMutex sync.RWMutex
}
func (j *StreamJoiner) ProcessOrder(order Order) {
j.cacheMutex.Lock()
j.orders[order.ID] = order
j.cacheMutex.Unlock()
// Verifica se tem payment correspondente
j.tryJoin(order.ID)
}
func (j *StreamJoiner) ProcessPayment(payment Payment) {
j.cacheMutex.Lock()
j.payments[payment.OrderID] = payment
j.cacheMutex.Unlock()
// Verifica se tem order correspondente
j.tryJoin(payment.OrderID)
}
func (j *StreamJoiner) tryJoin(orderID string) {
j.cacheMutex.Lock()
defer j.cacheMutex.Unlock()
order, hasOrder := j.orders[orderID]
payment, hasPayment := j.payments[orderID]
if hasOrder && hasPayment {
// Temos ambos! Processa o join
completeOrder := CompleteOrder{
Order: order,
Payment: payment,
}
log.Printf("Join completo para order=%s\n", orderID)
sendToNextStage(completeOrder)
// Remove do cache
delete(j.orders, orderID)
delete(j.payments, orderID)
}
}
Casos de Uso do Mundo Real
1. E-commerce: Pipeline de Pedidos
func main() {
// Producer: Recebe pedidos via HTTP
go startOrderProducer()
// Consumer 1: Valida pedidos
go startValidationConsumer()
// Consumer 2: Processa pagamentos
go startPaymentConsumer()
// Consumer 3: Atualiza estoque
go startInventoryConsumer()
// Consumer 4: Envia notificações
go startNotificationConsumer()
}
2. Monitoramento de IoT
func processSensorData(data SensorData) error {
// Validação
if data.Temperature > 100 {
alert := Alert{
Device: data.DeviceID,
Type: "高温警報",
Value: data.Temperature,
Time: time.Now(),
}
sendAlert(alert)
}
// Agregação
if data.Aggregatable {
batch := getBatch(data.DeviceID)
batch.Add(data)
if batch.IsFull() {
sendToDatabase(batch.Aggregate())
}
}
return nil
}
Testes
Teste com Kafka Embutido
type MockKafka struct {
messages chan *sarama.ProducerMessage
consumers []chan *sarama.ConsumerMessage
}
func TestProducer(t *testing.T) {
topic := "test-orders"
mock := NewMockKafka()
producer := NewMockProducer(mock, topic)
order := &Order{
ID: "123",
Customer: "Test",
Amount: 100.0,
}
err := producer.SendOrder(order)
if err != nil {
t.Fatalf("erro ao enviar: %v", err)
}
// Verifica mensagem recebida
select {
case msg := <-mock.messages:
var received Order
json.Unmarshal(msg.Value.([]byte), &received)
if received.ID != order.ID {
t.Errorf("ID mismatch: got %s, want %s", received.ID, order.ID)
}
case <-time.After(time.Second):
t.Error("timeout esperando mensagem")
}
}
Próximos Passos
Agora que você domina Kafka com Go:
- Kafka Connect: Integre com sistemas externos
- Kafka Streams: Processamento stateful
- Schema Registry: Gerenciamento de schemas Avro/Protobuf
- KSQL: Queries SQL em streams
Explore mais tutoriais de Go:
- Go e NATS: Mensageria Leve
- Go e RabbitMQ: Mensageria Assíncrona
- Go GraphQL: Criando APIs com gqlgen
- Go e Redis: Cache e Session Store
FAQ
Q: Kafka ou RabbitMQ: qual escolher?
R: Kafka para streaming de alto volume e persistência. RabbitMQ para mensageria flexível com routing complexo.
Q: Quantas partitions devo criar?
R: Regra geral: 2x o número máximo de consumers. Para 5 consumers, use 10 partitions.
Q: Como garantir ordem das mensagens?
R: Use a mesma key para mensagens relacionadas. Kafka garante ordem por key dentro da mesma partition.
Q: Kafka pode rodar em containers?
R: Sim, mas use volumes persistentes e considere usar Strimzi ou Kafka Operator para produção.
Q: Qual a diferença entre OffsetOldest e OffsetNewest?
R: Oldest processa todas as mensagens desde o início. Newest processa apenas mensagens novas.
Última atualização: 11 de fevereiro de 2026