---
title: "Go e Kafka: Processamento de Streaming"
url: "https://golang.com.br/tutoriais/go-kafka-streaming/"
markdown_url: "https://golang.com.br/tutoriais/go-kafka-streaming.MD"
description: "Aprenda a usar Apache Kafka com Go para streaming de eventos. Tutorial completo com exemplos práticos de producers, consumers, topics, partitions e padrões de processamento."
date: "2026-02-11"
author: "Hugo"
---

# Go e Kafka: Processamento de Streaming

Aprenda a usar Apache Kafka com Go para streaming de eventos. Tutorial completo com exemplos práticos de producers, consumers, topics, partitions e padrões de processamento.


# Go e Kafka: Processamento de Streaming

**Apache Kafka** é a plataforma de streaming de eventos mais popular do mundo, usada por milhares de empresas para processar trilhões de eventos diariamente. Com **Go**, você pode construir aplicações de streaming de alta performance e baixa latência.

Neste guia completo, você vai aprender a usar Kafka com Go desde o básico até padrões avançados de processamento.

## O Que Você Vai Aprender

- Fundamentos do Apache Kafka
- Configuração do cliente Kafka em Go
- Implementação de producers e consumers
- Gerenciamento de topics e partitions
- Consumer groups para escalabilidade
- Tratamento de erros e retries
- Casos de uso do mundo real

## Por Que Kafka?

### O Problema: Processamento em Lote vs Streaming

Tradicionalmente, sistemas processavam dados em **lotes** (batch):

```
[Coleta] → [Espera 24h] → [Processamento] → [Resultado]
     ↓           ↓              ↓              ↓
   Dados      Aguarda      Processa        Delay
```

Com **streaming** usando Kafka:

```
[Evento] → [Kafka] → [Processamento em tempo real] → [Resultado]
    ↓          ↓             ↓                        ↓
  Instantâneo  Buffer       Imediato                 Agora
```

| Aspecto | Batch Processing | Stream Processing |
|---------|------------------|-------------------|
| Latência | Horas/dias | Milissegundos |
| Escalabilidade | Limitada | Horizontal |
| Tolerância a falhas | Complexa | Nativa |
| Custo | Alto (hardware) | Otimizado |

### Casos de Uso

- **E-commerce**: rastreamento de pedidos em tempo real
- **Bancos**: detecção de fraude instantânea
- **IoT**: processamento de sensores
- **Mídia**: personalização de conteúdo
- **Logs**: agregação centralizada

## Configurando o Projeto

### 1. Inicialização

```bash
mkdir go-kafka-app
cd go-kafka-app
go mod init github.com/seuusuario/go-kafka-app
```

### 2. Instalando Dependências

```bash
go get github.com/IBM/sarama
go get github.com/IBM/sarama/tools/kafka-console-producer
go get github.com/IBM/sarama/tools/kafka-console-consumer
```

Sarama é a biblioteca Go mais popular para Apache Kafka.

### 3. Docker Compose para Desenvolvimento

Crie `docker-compose.yml`:

```yaml
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_LOG_RETENTION_HOURS: 168
      KAFKA_LOG_SEGMENT_BYTES: 1073741824
      KAFKA_LOG_RETENTION_BYTES: 1073741824

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    depends_on:
      - kafka
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
```

Inicie o cluster:

```bash
docker-compose up -d
```

Acesse o Kafka UI em `http://localhost:8080`

## Fundamentos do Kafka

### Conceitos Principais

```
┌─────────────────────────────────────────────────────────────┐
│                     Apache Kafka                             │
├─────────────────────────────────────────────────────────────┤
│  Topic: "orders"                                             │
│  ┌─────────────┬─────────────┬─────────────┐                │
│  │ Partition 0 │ Partition 1 │ Partition 2 │                │
│  │  Offset 0   │  Offset 0   │  Offset 0   │                │
│  │  Offset 1   │  Offset 1   │  Offset 1   │                │
│  │  Offset 2   │  Offset 2   │  Offset 2   │                │
│  │     ...     │     ...     │     ...     │                │
│  └─────────────┴─────────────┴─────────────┘                │
│                                                              │
│  Producer → │ ← Consumer Group (3 consumers) ←              │
└─────────────────────────────────────────────────────────────┘
```

| Conceito | Descrição |
|----------|-----------|
| **Topic** | Stream de dados categorizado |
| **Partition** | Subdivisão de um topic para paralelismo |
| **Offset** | Posição única de uma mensagem na partition |
| **Producer** | Envia mensagens para topics |
| **Consumer** | Lê mensagens de topics |
| **Consumer Group** | Grupo que divide o processamento |
| **Broker** | Servidor Kafka que armazena dados |

### Garantias de Entrega

```go
// Tres níveis de garantia

// 0: No acknowledgment (mais rápido, menos seguro)
config.Producer.RequiredAcks = sarama.NoResponse

// 1: Leader acknowledgment (padrão)
config.Producer.RequiredAcks = sarama.WaitForLocal

// -1: All in-sync replicas (mais seguro)
config.Producer.RequiredAcks = sarama.WaitForAll
```

| Garantia | Latência | Confiabilidade | Caso de Uso |
|----------|----------|----------------|-------------|
| `NoResponse` | ~1ms | Pode perder mensagens | Logs não-críticos |
| `WaitForLocal` | ~10ms | Lider persiste | Processamento geral |
| `WaitForAll` | ~50ms | Réplicas sincronizam | Pagamentos, transações |

## Implementando o Producer

### Producer Simples

```go
package main

import (
    "encoding/json"
    "log"
    "time"
    
    "github.com/IBM/sarama"
)

// Order representa um pedido
type Order struct {
    ID        string    `json:"id"`
    Customer  string    `json:"customer"`
    Product   string    `json:"product"`
    Amount    float64   `json:"amount"`
    Status    string    `json:"status"`
    Timestamp time.Time `json:"timestamp"`
}

type Producer struct {
    producer sarama.AsyncProducer
    topic    string
}

func NewProducer(brokers []string, topic string) (*Producer, error) {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForLocal
    config.Producer.Compression = sarama.CompressionSnappy
    config.Producer.Flush.Frequency = 500 * time.Millisecond
    config.Producer.Return.Successes = true
    config.Producer.Return.Errors = true
    
    producer, err := sarama.NewAsyncProducer(brokers, config)
    if err != nil {
        return nil, err
    }
    
    return &Producer{
        producer: producer,
        topic:    topic,
    }, nil
}

func (p *Producer) SendOrder(order *Order) error {
    data, err := json.Marshal(order)
    if err != nil {
        return err
    }
    
    msg := &sarama.ProducerMessage{
        Topic:     p.topic,
        Key:       sarama.StringEncoder(order.Customer),
        Value:     sarama.ByteEncoder(data),
        Timestamp: time.Now(),
        Headers: []sarama.RecordHeader{
            {
                Key:   []byte("version"),
                Value: []byte("1.0"),
            },
        },
    }
    
    p.producer.Input() <- msg
    return nil
}

func (p *Producer) Close() error {
    return p.producer.Close()
}

func (p *Producer) HandleErrors() {
    go func() {
        for err := range p.producer.Errors() {
            log.Printf("Erro ao enviar mensagem: %v\n", err)
        }
    }()
}

func (p *Producer) HandleSuccesses() {
    go func() {
        for msg := range p.producer.Successes() {
            log.Printf("Mensagem enviada: topic=%s, partition=%d, offset=%d\n",
                msg.Topic, msg.Partition, msg.Offset)
        }
    }()
}
```

### Producer com Retry

```go
type RetryProducer struct {
    producer sarama.SyncProducer
    topic    string
    maxRetries int
}

func NewRetryProducer(brokers []string, topic string, maxRetries int) (*RetryProducer, error) {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Retry.Max = maxRetries
    config.Producer.Retry.Backoff = 100 * time.Millisecond
    config.Producer.Return.Successes = true
    config.Producer.Idempotent = true // Exactly-once semantics
    
    producer, err := sarama.NewSyncProducer(brokers, config)
    if err != nil {
        return nil, err
    }
    
    return &RetryProducer{
        producer: producer,
        topic:    topic,
        maxRetries: maxRetries,
    }, nil
}

func (p *RetryProducer) SendWithRetry(order *Order) error {
    data, err := json.Marshal(order)
    if err != nil {
        return err
    }
    
    msg := &sarama.ProducerMessage{
        Topic: p.topic,
        Key:   sarama.StringEncoder(order.ID),
        Value: sarama.ByteEncoder(data),
    }
    
    partition, offset, err := p.producer.SendMessage(msg)
    if err != nil {
        return fmt.Errorf("falha após %d retries: %w", p.maxRetries, err)
    }
    
    log.Printf("Mensagem persistida: partition=%d, offset=%d\n", partition, offset)
    return nil
}
```

### Particionamento Customizado

```go
type HashPartitioner struct{}

func (p *HashPartitioner) Partition(msg *sarama.ProducerMessage, numPartitions int32) (int32, error) {
    if msg.Key == nil {
        return sarama.NewRandomPartitioner().Partition(msg, numPartitions)
    }
    
    // Garante mesma partition para mesma key
    keyBytes, err := msg.Key.Encode()
    if err != nil {
        return -1, err
    }
    
    hash := fnv.New32a()
    hash.Write(keyBytes)
    return int32(hash.Sum32()) % numPartitions, nil
}

func (p *HashPartitioner) RequiresConsistency() bool {
    return true
}

// Uso
config := sarama.NewConfig()
config.Producer.Partitioner = func(topic string) sarama.Partitioner {
    return &HashPartitioner{}
}
```

## Implementando o Consumer

### Consumer Simples

```go
package main

import (
    "context"
    "encoding/json"
    "log"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
    
    "github.com/IBM/sarama"
)

type OrderConsumer struct {
    consumer sarama.Consumer
    topic    string
    handler  OrderHandler
}

type OrderHandler interface {
    HandleOrder(order Order) error
}

type OrderProcessor struct{}

func (p *OrderProcessor) HandleOrder(order Order) error {
    log.Printf("Processando pedido: ID=%s, Customer=%s, Amount=%.2f\n",
        order.ID, order.Customer, order.Amount)
    
    // Processamento real aqui
    time.Sleep(100 * time.Millisecond)
    
    log.Printf("Pedido processado: %s\n", order.ID)
    return nil
}

func NewOrderConsumer(brokers []string, topic string, handler OrderHandler) (*OrderConsumer, error) {
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true
    config.Consumer.Offsets.Initial = sarama.OffsetOldest
    config.Consumer.Offsets.AutoCommit.Enable = false
    
    consumer, err := sarama.NewConsumer(brokers, config)
    if err != nil {
        return nil, err
    }
    
    return &OrderConsumer{
        consumer: consumer,
        topic:    topic,
        handler:  handler,
    }, nil
}

func (c *OrderConsumer) ConsumePartitions(ctx context.Context) error {
    // Obtém lista de partitions
    partitions, err := c.consumer.Partitions(c.topic)
    if err != nil {
        return err
    }
    
    var wg sync.WaitGroup
    errors := make(chan error, len(partitions))
    
    // Consome cada partition em paralelo
    for _, partition := range partitions {
        wg.Add(1)
        go func(p int32) {
            defer wg.Done()
            if err := c.consumePartition(ctx, p); err != nil {
                errors <- err
            }
        }(partition)
    }
    
    // Espera signal de término
    sigterm := make(chan os.Signal, 1)
    signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
    
    select {
    case <-sigterm:
        log.Println("Recebido sinal de término")
    case err := <-errors:
        log.Printf("Erro no consumo: %v\n", err)
    }
    
    wg.Wait()
    return c.consumer.Close()
}

func (c *OrderConsumer) consumePartition(ctx context.Context, partition int32) error {
    pc, err := c.consumer.ConsumePartition(c.topic, partition, sarama.OffsetNewest)
    if err != nil {
        return err
    }
    defer pc.Close()
    
    log.Printf("Iniciando consumo da partition %d\n", partition)
    
    for {
        select {
        case <-ctx.Done():
            return nil
            
        case msg := <-pc.Messages():
            var order Order
            if err := json.Unmarshal(msg.Value, &order); err != nil {
                log.Printf("Erro ao deserializar: %v\n", err)
                continue
            }
            
            // Processa a mensagem
            if err := c.handler.HandleOrder(order); err != nil {
                log.Printf("Erro ao processar pedido: %v\n", err)
                continue
            }
            
            log.Printf("Partition %d - Offset %d processado\n", msg.Partition, msg.Offset)
            
        case err := <-pc.Errors():
            log.Printf("Erro na partition %d: %v\n", partition, err)
        }
    }
}
```

### Consumer Groups (Escalabilidade)

```go
package main

import (
    "context"
    "encoding/json"
    "log"
    "time"
    
    "github.com/IBM/sarama"
)

// OrderConsumerGroup implementa sarama.ConsumerGroupHandler
type OrderConsumerGroup struct {
    ready   chan bool
    readyMu sync.Mutex
}

func NewOrderConsumerGroup() *OrderConsumerGroup {
    return &OrderConsumerGroup{
        ready: make(chan bool),
    }
}

// Setup é executado quando o consumer inicia
func (c *OrderConsumerGroup) Setup(session sarama.ConsumerGroupSession) error {
    log.Printf("Setup do consumer group. Claims: %v\n", session.Claims())
    c.readyMu.Lock()
    close(c.ready)
    c.readyMu.Unlock()
    return nil
}

// Cleanup é executado quando o consumer termina
func (c *OrderConsumerGroup) Cleanup(session sarama.ConsumerGroupSession) error {
    log.Println("Cleanup do consumer group")
    return nil
}

// ConsumeClaim processa mensagens de uma partition
func (c *OrderConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    log.Printf("Iniciando processamento da partition %d\n", claim.Partition())
    
    for {
        select {
        case <-session.Context().Done():
            return nil
            
        case msg := <-claim.Messages():
            if msg == nil {
                return nil
            }
            
            log.Printf("[%s] Partition=%d Offset=%d Key=%s\n",
                claim.Topic(), msg.Partition, msg.Offset, msg.Key)
            
            // Processa a mensagem
            var order Order
            if err := json.Unmarshal(msg.Value, &order); err != nil {
                log.Printf("Erro ao deserializar: %v\n", err)
                session.MarkMessage(msg, "failed")
                continue
            }
            
            // Simula processamento
            if err := processOrder(order); err != nil {
                log.Printf("Erro ao processar: %v\n", err)
                // Não marca como processado - será reprocessado
                continue
            }
            
            // Confirma processamento
            session.MarkMessage(msg, "")
            log.Printf("Mensagem processada: order=%s\n", order.ID)
        }
    }
}

func processOrder(order Order) error {
    // Implemente seu processamento aqui
    log.Printf("Processando order=%s, customer=%s\n", order.ID, order.Customer)
    time.Sleep(100 * time.Millisecond)
    return nil
}

func StartConsumerGroup(ctx context.Context, brokers []string, topic string, groupID string) error {
    config := sarama.NewConfig()
    config.Version = sarama.V2_6_0_0
    config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
    config.Consumer.Offsets.Initial = sarama.OffsetOldest
    
    client, err := sarama.NewConsumerGroup(brokers, groupID, config)
    if err != nil {
        return err
    }
    defer client.Close()
    
    consumer := NewOrderConsumerGroup()
    
    log.Printf("Iniciando consumer group: %s\n", groupID)
    
    for {
        select {
        case <-ctx.Done():
            return nil
        default:
        }
        
        if err := client.Consume(ctx, []string{topic}, consumer); err != nil {
            log.Printf("Erro no consumo: %v\n", err)
            time.Sleep(5 * time.Second)
        }
    }
}
```

### Rebalance Strategies

```go
// 1. Range (padrão)
// Divide partitions em ranges proporcionais
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange

// 2. RoundRobin (mais equilibrado)
// Distribui partitions sequencialmente
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin

// 3. Sticky (balanceada e estável)
// Mantém assignments anteriores quando possível
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
```

| Estratégia | Uso Recomendado |
|------------|-----------------|
| Range | Partições bem distribuídas em topics múltiplos |
| RoundRobin | Melhor distribuição geral |
| Sticky | Quando assignments estáveis são importantes |

## Tratamento de Erros e Resiliência

### Circuit Breaker Pattern

```go
type CircuitBreaker struct {
    failures    int
    lastFailure time.Time
    threshold   int
    timeout     time.Duration
    state       State
}

type State int

const (
    StateClosed State = iota // Normal operation
    StateOpen                // Failing fast
    StateHalfOpen            // Testing if recovered
)

func (cb *CircuitBreaker) Call(fn func() error) error {
    cb.mutex.Lock()
    
    if cb.state == StateOpen {
        if time.Since(cb.lastFailure) > cb.timeout {
            cb.state = StateHalfOpen
        } else {
            cb.mutex.Unlock()
            return errors.New("circuit breaker open")
        }
    }
    cb.mutex.Unlock()
    
    err := fn()
    
    cb.mutex.Lock()
    defer cb.mutex.Unlock()
    
    if err != nil {
        cb.failures++
        cb.lastFailure = time.Now()
        
        if cb.failures >= cb.threshold {
            cb.state = StateOpen
        }
        return err
    }
    
    // Success - reset
    cb.failures = 0
    cb.state = StateClosed
    return nil
}
```

### Dead Letter Queue

```go
func (c *Consumer) handleMessage(msg *sarama.ConsumerMessage) {
    var event Event
    if err := json.Unmarshal(msg.Value, &event); err != nil {
        log.Printf("Erro de parsing: %v\n", err)
        c.sendToDLQ(msg, "parse_error")
        return
    }
    
    // Tenta processar com retry
    for attempt := 0; attempt < 3; attempt++ {
        if err := c.process(event); err == nil {
            return
        }
        time.Sleep(time.Duration(attempt) * time.Second)
    }
    
    // Falha após retries - envia para DLQ
    log.Printf("Enviando para DLQ após falhas. Event: %s\n", event.ID)
    c.sendToDLQ(msg, "processing_failed")
}

func (c *Consumer) sendToDLQ(msg *sarama.ConsumerMessage, reason string) {
    dlqMsg := &sarama.ProducerMessage{
        Topic: "orders.dlq",
        Key:   msg.Key,
        Value: msg.Value,
        Headers: []sarama.RecordHeader{
            {Key: []byte("original-topic"), Value: []byte(msg.Topic)},
            {Key: []byte("failure-reason"), Value: []byte(reason)},
            {Key: []byte("timestamp"), Value: []byte(time.Now().Format(time.RFC3339))},
        },
    }
    c.dlqProducer.SendMessage(dlqMsg)
}
```

### Exponential Backoff

```go
func consumeWithBackoff(ctx context.Context, consumer sarama.Consumer, topic string) error {
    maxRetries := 5
    backoff := time.Second
    
    for attempt := 0; attempt < maxRetries; attempt++ {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }
        
        err := consume(consumer, topic)
        if err == nil {
            return nil
        }
        
        if attempt < maxRetries-1 {
            sleep := backoff * time.Duration(1<<attempt) // 1s, 2s, 4s, 8s, 16s
            log.Printf("Tentativa %d falhou, aguardando %v\n", attempt+1, sleep)
            time.Sleep(sleep)
        }
    }
    
    return fmt.Errorf("falhou após %d tentativas", maxRetries)
}
```

## Padrões Avançados

### Windowing (Janelas de Processamento)

```go
type WindowedProcessor struct {
    window    time.Duration
    buffer    []Event
    lastFlush time.Time
}

func (p *WindowedProcessor) Process(event Event) {
    p.buffer = append(p.buffer, event)
    
    if time.Since(p.lastFlush) >= p.window {
        p.flush()
    }
}

func (p *WindowedProcessor) flush() {
    if len(p.buffer) == 0 {
        return
    }
    
    // Processing batch
    log.Printf("Processando janela de %d eventos\n", len(p.buffer))
    
    // Database insert batch
    // etc.
    
    p.buffer = p.buffer[:0]
    p.lastFlush = time.Now()
}
```

### Join de Streams

```go
type StreamJoiner struct {
    orders     map[string]Order
    payments   map[string]Payment
    cacheMutex sync.RWMutex
}

func (j *StreamJoiner) ProcessOrder(order Order) {
    j.cacheMutex.Lock()
    j.orders[order.ID] = order
    j.cacheMutex.Unlock()
    
    // Verifica se tem payment correspondente
    j.tryJoin(order.ID)
}

func (j *StreamJoiner) ProcessPayment(payment Payment) {
    j.cacheMutex.Lock()
    j.payments[payment.OrderID] = payment
    j.cacheMutex.Unlock()
    
    // Verifica se tem order correspondente
    j.tryJoin(payment.OrderID)
}

func (j *StreamJoiner) tryJoin(orderID string) {
    j.cacheMutex.Lock()
    defer j.cacheMutex.Unlock()
    
    order, hasOrder := j.orders[orderID]
    payment, hasPayment := j.payments[orderID]
    
    if hasOrder && hasPayment {
        // Temos ambos! Processa o join
        completeOrder := CompleteOrder{
            Order:   order,
            Payment: payment,
        }
        
        log.Printf("Join completo para order=%s\n", orderID)
        sendToNextStage(completeOrder)
        
        // Remove do cache
        delete(j.orders, orderID)
        delete(j.payments, orderID)
    }
}
```

## Casos de Uso do Mundo Real

### 1. E-commerce: Pipeline de Pedidos

```go
func main() {
    // Producer: Recebe pedidos via HTTP
    go startOrderProducer()
    
    // Consumer 1: Valida pedidos
    go startValidationConsumer()
    
    // Consumer 2: Processa pagamentos
    go startPaymentConsumer()
    
    // Consumer 3: Atualiza estoque
    go startInventoryConsumer()
    
    // Consumer 4: Envia notificações
    go startNotificationConsumer()
}
```

### 2. Monitoramento de IoT

```go
func processSensorData(data SensorData) error {
    // Validação
    if data.Temperature > 100 {
        alert := Alert{
            Device:  data.DeviceID,
            Type:    "高温警報",
            Value:   data.Temperature,
            Time:    time.Now(),
        }
        sendAlert(alert)
    }
    
    // Agregação
    if data.Aggregatable {
        batch := getBatch(data.DeviceID)
        batch.Add(data)
        
        if batch.IsFull() {
            sendToDatabase(batch.Aggregate())
        }
    }
    
    return nil
}
```

## Testes

### Teste com Kafka Embutido

```go
type MockKafka struct {
    messages chan *sarama.ProducerMessage
    consumers []chan *sarama.ConsumerMessage
}

func TestProducer(t *testing.T) {
    topic := "test-orders"
    mock := NewMockKafka()
    
    producer := NewMockProducer(mock, topic)
    
    order := &Order{
        ID:       "123",
        Customer: "Test",
        Amount:   100.0,
    }
    
    err := producer.SendOrder(order)
    if err != nil {
        t.Fatalf("erro ao enviar: %v", err)
    }
    
    // Verifica mensagem recebida
    select {
    case msg := <-mock.messages:
        var received Order
        json.Unmarshal(msg.Value.([]byte), &received)
        
        if received.ID != order.ID {
            t.Errorf("ID mismatch: got %s, want %s", received.ID, order.ID)
        }
    case <-time.After(time.Second):
        t.Error("timeout esperando mensagem")
    }
}
```

## Próximos Passos

Agora que você domina Kafka com Go:

1. **Kafka Connect**: Integre com sistemas externos
2. **Kafka Streams**: Processamento stateful
3. **Schema Registry**: Gerenciamento de schemas Avro/Protobuf
4. **KSQL**: Queries SQL em streams

Explore mais tutoriais de Go:
- [Go e NATS: Mensageria Leve](/tutoriais/go-nats/)
- [Go e RabbitMQ: Mensageria Assíncrona](/tutoriais/go-rabbitmq/)
- [Go GraphQL: Criando APIs com gqlgen](/tutoriais/go-graphql-gqlgen/)
- [Go e Redis: Cache e Session Store](/tutoriais/go-redis-cache/)

## FAQ

**Q: Kafka ou RabbitMQ: qual escolher?**  
R: Kafka para streaming de alto volume e persistência. RabbitMQ para mensageria flexível com routing complexo.

**Q: Quantas partitions devo criar?**  
R: Regra geral: 2x o número máximo de consumers. Para 5 consumers, use 10 partitions.

**Q: Como garantir ordem das mensagens?**  
R: Use a mesma key para mensagens relacionadas. Kafka garante ordem por key dentro da mesma partition.

**Q: Kafka pode rodar em containers?**  
R: Sim, mas use volumes persistentes e considere usar Strimzi ou Kafka Operator para produção.

**Q: Qual a diferença entre OffsetOldest e OffsetNewest?**  
R: Oldest processa todas as mensagens desde o início. Newest processa apenas mensagens novas.

---

*Última atualização: 11 de fevereiro de 2026*
