Janeiro 2026 · ~13 min

Go Concurrency Patterns: Goroutines e Channels Avançados

Aprenda patterns avançados de concorrência em Go: worker pools, fan-out/fan-in, rate limiting com channels, e context package. Tutorial completo com exemplos práticos de código.

A concorrência é um dos recursos mais poderosos de Go. Enquanto muitas linguagens tratam concorrência como um recurso avançado complexo, Go a torna acessível através de goroutines e channels. Este tutorial explora patterns avançados que você encontrará em aplicações Go de produção.

Pré-requisitos: Conhecimento básico de Go (variáveis, funções, structs) e familiaridade com goroutines e channels. Se você é novo em Go, confira nosso guia para iniciantes.

Sumário dos Patterns

PatternUse CaseComplexidade
Worker PoolProcessar tarefas em paralelo com controle⭐⭐
Fan-Out/Fan-InDistribuir trabalho e agregar resultados⭐⭐⭐
PipelineProcessar dados em estágios sequenciais⭐⭐
Rate LimitingControlar throughput de requisições⭐⭐⭐
Context CancellationCancelar operações de forma segura⭐⭐
Select StatementMultiplexar channels⭐⭐⭐

1. Worker Pool Pattern

O pattern Worker Pool é essencial quando você precisa processar um grande número de tarefas de forma concorrente, mas quer limitar o número de goroutines simultâneas.

Problema

Imagine que você precisa processar 10.000 arquivos. Iniciar 10.000 goroutines simultâneas esgotaria recursos rapidamente.

Solução: Worker Pool

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// Job representa uma unidade de trabalho
type Job struct {
    ID   int
    Data string
}

// Result representa o resultado do processamento
type Result struct {
    JobID int
    Value int
    Error error
}

// Worker processa jobs do canal
type Worker struct {
    ID         int
    jobs       <-chan Job
    results    chan<- Result
    wg         *sync.WaitGroup
}

func (w *Worker) Start(ctx context.Context) {
    defer w.wg.Done()
    for {
        select {
        case job, ok := <-w.jobs:
            if !ok {
                fmt.Printf("Worker %d: canal fechado, encerrando\n", w.ID)
                return
            }
            w.processJob(job)
        case <-ctx.Done():
            fmt.Printf("Worker %d: contexto cancelado, encerrando\n", w.ID)
            return
        }
    }
}

func (w *Worker) processJob(job Job) {
    // Simula processamento
    time.Sleep(100 * time.Millisecond)
    
    result := Result{
        JobID: job.ID,
        Value: len(job.Data) * 10,
    }
    
    fmt.Printf("Worker %d: processou Job %d\n", w.ID, job.ID)
    w.results <- result
}

// Pool gerencia os workers
type Pool struct {
    numWorkers int
    jobs       chan Job
    results    chan Result
    workers    []*Worker
    wg         sync.WaitGroup
    ctx        context.Context
    cancel     context.CancelFunc
}

func NewPool(numWorkers int) *Pool {
    ctx, cancel := context.WithCancel(context.Background())
    return &Pool{
        numWorkers: numWorkers,
        jobs:       make(chan Job, 100),
        results:    make(chan Result, 100),
        ctx:        ctx,
        cancel:     cancel,
    }
}

func (p *Pool) Start() {
    for i := 1; i <= p.numWorkers; i++ {
        worker := &Worker{
            ID:      i,
            jobs:    p.jobs,
            results: p.results,
            wg:      &p.wg,
        }
        p.workers = append(p.workers, worker)
        p.wg.Add(1)
        go worker.Start(p.ctx)
    }
}

func (p *Pool) Submit(job Job) {
    p.jobs <- job
}

func (p *Pool) Results() <-chan Result {
    return p.results
}

func (p *Pool) Shutdown() {
    close(p.jobs)
    p.wg.Wait()
    close(p.results)
}

func main() {
    // Cria pool com 5 workers
    pool := NewPool(5)
    pool.Start()
    
    // Goroutine para coletar resultados
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        defer wg.Done()
        for result := range pool.Results() {
            fmt.Printf("Resultado recebido: Job %d = %d\n", 
                result.JobID, result.Value)
        }
    }()
    
    // Envia 15 jobs
    for i := 1; i <= 15; i++ {
        job := Job{
            ID:   i,
            Data: fmt.Sprintf("Dados do job %d", i),
        }
        pool.Submit(job)
    }
    
    // Aguarda processamento
    pool.Shutdown()
    wg.Wait()
    
    fmt.Println("\nPool encerrado com sucesso!")
}

Quando Usar Worker Pool

Use quando…Evite quando…
Processar muitas tarefas com paralelismo limitadoVocê tem poucas tarefas (overhead não vale a pena)
Precisar de controle de recursosCada tarefa depende fortemente das outras
Implementar APIs com rate limitingExistem requirements de ordem estrita

2. Fan-Out / Fan-In Pattern

Fan-Out distribui trabalho entre múltiplas goroutines. Fan-In combina os resultados de volta em um único canal.

Problema

Processar dados de múltiplas fontes simultaneamente e agregar resultados.

Implementação

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// Generator (Producer) - fan-out source
func Generator(id string, nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            fmt.Printf("%s gerando: %d\n", id, n)
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
            out <- n
        }
    }()
    return out
}

// Worker - processa dados (fan-out)
func Worker(id int, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            // Processamento pesado simulado
            result := n * n
            fmt.Printf("Worker %d: %d -> %d\n", id, n, result)
            time.Sleep(50 * time.Millisecond)
            out <- result
        }
    }()
    return out
}

// Merge - fan-in, combina múltiplos canais
func Merge(cs ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup
    
    // Função auxiliar para copiar de um canal
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            out <- n
        }
    }
    
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }
    
    // Goroutine para fechar out quando todos terminarem
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

// Demonstração do pattern
func main() {
    rand.Seed(time.Now().Unix())
    
    // === FAN-OUT ===
    // Múltiplas fontes gerando dados
    gen1 := Generator("Fonte-A", 1, 2, 3, 4, 5)
    gen2 := Generator("Fonte-B", 6, 7, 8, 9, 10)
    
    // Distribuir para múltiplos workers
    worker1 := Worker(1, gen1)
    worker2 := Worker(2, gen2)
    worker3 := Worker(3, gen1) // Reutiliza gen1 
    
    // === FAN-IN ===
    // Combinar resultados em um único canal
    merged := Merge(worker1, worker2, worker3)
    
    // Consumir resultados
    var sum int
    for result := range merged {
        sum += result
        fmt.Printf("Resultado recebido: %d (soma parcial: %d)\n", result, sum)
    }
    
    fmt.Printf("\nSoma total: %d\n", sum)
}

Aplicação Real: Web Crawler

// Crawler usa fan-out/fan-in para processar URLs
func Crawl(urls []string) []Page {
    var crawlers []<-chan Page
    
    // Fan-out: múltiplos crawlers
    for i := 0; i < numCrawlers; i++ {
        crawlers = append(crawlers, Crawler(urls[i::numCrawlers]))
    }
    
    // Fan-in: merge dos resultados
    pages := []Page{}
    for page := range Merge(crawlers...) {
        pages = append(pages, page)
    }
    return pages
}

3. Pipeline Pattern

Um Pipeline processa dados através de múltiplos estágios sequenciais, onde cada estágio pode executar concorrentemente.

Implementação de Pipeline

package main

import (
    "fmt"
    "strings"
)

// === ESTÁGIOS DO PIPELINE ===

// 1. Generator - produz dados
func Generator(files ...string) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        for _, file := range files {
            out <- file
        }
    }()
    return out
}

// 2. ReadFile - lê conteúdo
func ReadFile(in <-chan string) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        for filename := range in {
            // Simula leitura
            content := fmt.Sprintf("conteúdo do arquivo: %s", filename)
            out <- content
        }
    }()
    return out
}

// 3. ProcessContent - processa texto
func ProcessContent(in <-chan string) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        for content := range in {
            // Processamento: conta palavras
            words := strings.Fields(content)
            processed := fmt.Sprintf("%s | palavras: %d", 
                content, len(words))
            out <- processed
        }
    }()
    return out
}

// 4. SaveResults - salva resultados
func SaveResults(in <-chan string) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        for data := range in {
            // Simula salvamento
            saved := fmt.Sprintf("[SALVO] %s", data)
            out <- saved
        }
    }()
    return out
}

// Compose conecta estágios
func Compose(stages ...func(<-chan string) <-chan string) func(<-chan string) <-chan string {
    return func(source <-chan string) <-chan string {
        c := source
        for _, stage := range stages {
            c = stage(c)
        }
        return c
    }
}

func main() {
    // Compor pipeline
    pipeline := Compose(
        ReadFile,
        ProcessContent,
        SaveResults,
    )
    
    // Executar
    files := Generator("doc1.txt", "doc2.txt", "doc3.txt")
    results := pipeline(files)
    
    // Consumir
    for result := range results {
        fmt.Println(result)
    }
}

Pipeline Paralelo (Com Buffer)

Para melhorar throughput, adicionamos parallelism a estágios individuais:

// ParallelStage executa múltiplas instâncias de um estágio
func ParallelStage(workers int, stage func(<-chan string) <-chan string) 
    func(<-chan string) <-chan string {
    return func(in <-chan string) <-chan string {
        // Fan-out para múltiplos workers
        var workers_out []<-chan string
        for i := 0; i < workers; i++ {
            workers_out = append(workers_out, stage(in))
        }
        // Fan-in dos resultados
        return Merge(workers_out...)
    }
}

// Uso: paralelizar estágio de processamento
processParallel := ParallelStage(4, ProcessContent)

4. Rate Limiting com Channels

Controle de taxa é essencial para não sobrecarregar APIs ou serviços externos.

Rate Limiting Básico

package main

import (
    "context"
    "fmt"
    "time"
)

// RateLimiter controla requests por segundo
type RateLimiter struct {
    requests chan struct{}
    interval time.Duration
}

func NewRateLimiter(reqPerSec int) *RateLimiter {
    rl := &RateLimiter{
        requests: make(chan struct{}, reqPerSec),
        interval: time.Second / time.Duration(reqPerSec),
    }
    
    // Preenche o bucket
    for i := 0; i < reqPerSec; i++ {
        rl.requests <- struct{}{}
    }
    
    // Refill contínuo
    go rl.refill(reqPerSec)
    
    return rl
}

func (rl *RateLimiter) refill(reqPerSec int) {
    ticker := time.NewTicker(rl.interval)
    defer ticker.Stop()
    
    for range ticker.C {
        select {
        case rl.requests <- struct{}{}:
            // Adicionou token
        default:
            // Bucket cheio, ignora
        }
    }
}

func (rl *RateLimiter) Acquire(ctx context.Context) error {
    select {
    case <-rl.requests:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

// Uso
func main() {
    // Permite 5 requisições por segundo
    limiter := NewRateLimiter(5)
    ctx := context.Background()
    
    for i := 1; i <= 15; i++ {
        if err := limiter.Acquire(ctx); err != nil {
            fmt.Printf("Erro: %v\n", err)
            return
        }
        fmt.Printf("Request %d executado em %s\n", i, time.Now().Format("15:04:05.000"))
    }
}

Rate Limiting Avançado: Token Bucket

package main

import (
    "context"
    "sync"
    "time"
)

// TokenBucket implementa token bucket algorithm
type TokenBucket struct {
    capacity int
    tokens   int
    refillRate time.Duration
    mu       sync.Mutex
    lastRefill time.Time
}

func NewTokenBucket(capacity int, refillRate time.Duration) *TokenBucket {
    return &TokenBucket{
        capacity:   capacity,
        tokens:     capacity,
        refillRate: refillRate,
        lastRefill: time.Now(),
    }
}

func (tb *TokenBucket) refill() {
    now := time.Now()
    elapsed := now.Sub(tb.lastRefill)
    
    // Calcular tokens a adicionar
    tokensToAdd := int(elapsed / tb.refillRate)
    if tokensToAdd > 0 {
        tb.tokens = min(tb.capacity, tb.tokens + tokensToAdd)
        tb.lastRefill = now
    }
}

func (tb *TokenBucket) TryAcquire() bool {
    tb.mu.Lock()
    defer tb.mu.Unlock()
    
    tb.refill()
    
    if tb.tokens > 0 {
        tb.tokens--
        return true
    }
    return false
}

func (tb *TokenBucket) Acquire(ctx context.Context) error {
    for {
        if tb.TryAcquire() {
            return nil
        }
        
        select {
        case <-time.After(tb.refillRate):
            // Tenta novamente após refill
        case <-ctx.Done():
            return ctx.Err()
        }
    }
}

5. Context Package para Cancellation

O context package é essencial para gerenciar timeouts e cancellation em operações concorrentes.

Padrões de Uso

package main

import (
    "context"
    "fmt"
    "time"
)

// ProcessTask com timeout
type Task struct {
    ID   int
    Data string
}

func ProcessTask(ctx context.Context, task Task) (string, error) {
    // Cria context com timeout específico para esta tarefa
    ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
    defer cancel()
    
    result := make(chan string, 1)
    
    go func() {
        // Simula processamento pesado
        time.Sleep(time.Duration(100+task.ID*100) * time.Millisecond)
        result <- fmt.Sprintf("Task %d processada: %s", task.ID, task.Data)
    }()
    
    select {
    case res := <-result:
        return res, nil
    case <-ctx.Done():
        return "", fmt.Errorf("task %d cancelada: %w", task.ID, ctx.Err())
    }
}

// Worker Pool com Context
func WorkerPool(ctx context.Context, tasks []Task, workers int) []string {
    var wg sync.WaitGroup
    taskChan := make(chan Task, len(tasks))
    resultChan := make(chan string, len(tasks))
    
    // Start workers
    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for task := range taskChan {
                select {
                case <-ctx.Done():
                    fmt.Printf("Worker %d: cancelado\n", id)
                    return
                default:
                    result, err := ProcessTask(ctx, task)
                    if err != nil {
                        resultChan <- fmt.Sprintf("Erro: %v", err)
                    } else {
                        resultChan <- result
                    }
                }
            }
        }(i)
    }
    
    // Send tasks
    go func() {
        for _, task := range tasks {
            select {
            case taskChan <- task:
            case <-ctx.Done():
                close(taskChan)
                return
            }
        }
        close(taskChan)
    }()
    
    // Wait and collect
    go func() {
        wg.Wait()
        close(resultChan)
    }()
    
    var results []string
    for res := range resultChan {
        results = append(results, res)
    }
    
    return results
}

// Propagation de Context
func ProcessWithChildContext(parentCtx context.Context) {
    // Context com valor
    ctx := context.WithValue(parentCtx, "user_id", "12345")
    
    // Context com deadline
    ctx, cancel := context.WithDeadline(ctx, time.Now().Add(5*time.Second))
    defer cancel()
    
    // Passar para funções downstream
    ProcessTask(ctx, Task{ID: 1, Data: "teste"})
}

Graceful Shutdown

func GracefulServer() {
    ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
    defer stop()
    
    srv := &http.Server{Addr: ":8080"}
    
    go func() {
        if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
            log.Fatalf("Erro: %v\n", err)
        }
    }()
    
    fmt.Println("Servidor rodando. Pressione Ctrl+C para parar.")
    
    <-ctx.Done()
    fmt.Println("\nShutdown iniciado...")
    
    // Cleanup com timeout
    shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    
    if err := srv.Shutdown(shutdownCtx); err != nil {
        log.Printf("Erro no shutdown: %v", err)
    }
    
    fmt.Println("Servidor encerrado com sucesso")
}

6. Select Statement Avançado

O select statement permite esperar por múltiplas operações de channels.

Pattern: Non-Blocking Send/Receive

select {
case ch <- data:
    // Enviou com sucesso
default:
    // Channel cheio, não bloqueia
}

// Non-blocking receive
select {
case data := <-ch:
    // Recebeu dados
default:
    // Sem dados disponíveis
}

Pattern: Timeout

func WithTimeout() {
    ch := make(chan string)
    
    select {
    case result := <-ch:
        fmt.Println(result)
    case <-time.After(5 * time.Second):
        fmt.Println("Timeout!")
    }
}

Pattern: Priority Select

func PrioritySelect(hi, lo <-chan string) {
    for {
        select {
        case msg := <-hi:
            // Alta prioridade
            fmt.Printf("HI: %s\n", msg)
        default:
            select {
            case msg := <-hi:
                fmt.Printf("HI: %s\n", msg)
            case msg := <-lo:
                fmt.Printf("LO: %s\n", msg)
            }
        }
    }
}

Pattern: Sensor/Aggregator

func Aggregator(sensors ...<-chan Reading) <-chan Alert {
    alerts := make(chan Alert)
    
    go func() {
        defer close(alerts)
        
        // Timeout entre leituras
        timeout := time.NewTimer(1 * time.Minute)
        defer timeout.Stop()
        
        for {
            select {
            case reading := <-sensors[0]:
                if reading.Temperature > 100 {
                    alerts <- Alert{Sensor: 0, Level: Critical}
                }
                timeout.Reset(1 * time.Minute)
                
            case reading := <-sensors[1]:
                if reading.Pressure < 10 {
                    alerts <- Alert{Sensor: 1, Level: Warning}
                }
                timeout.Reset(1 * time.Minute)
                
            case <-timeout.C:
                alerts <- Alert{Level: Timeout, Message: "Sem dados"}
                return
            }
        }
    }()
    
    return alerts
}

Comparação com Outras Linguagens

LanguageConcurrency ModelPrimitivasNota
GoCSP (Communicating Sequential Processes)Goroutines, ChannelsSimples, built-in
Rustasync/await + ownershipTokio, async-stdSeguro em compile-time
JavaThreads + ExecutorsCompletableFuture, StreamsMais verboso
Pythonasync/await (single-threaded)asyncio, coroutinesGIL limita parallelism
Node.jsEvent loopPromises, async/awaitSingle-threaded

Vantagens de Go:

  • Goroutines são leves (2KB stack inicial)
  • Channels são type-safe
  • Garbage collector gerencia goroutines
  • Sem complexidade de ownership (vs Rust)

Melhores Práticas

✅ Faça

  1. Sempre use defer close(ch) quando apropriado
  2. Use context para timeout e cancellation
  3. Adicione buffer quando conhecer a carga
  4. Documente se um channel é usado para sinalização ou dados
  5. Use select para timeouts e multiplexação

❌ Evite

// ❌ Não: partilhar memória por comunicação
var counter int
var mu sync.Mutex

go func() {
    mu.Lock()
    counter++
    mu.Unlock()
}()

// ✅ Sim: comunicar por channels
updates := make(chan int)
go func() {
    updates <- 1
}()
counter += <-updates
// ❌ Não: deixar goroutines orfãs
go longRunningTask()
// Sem forma de cancelar!

// ✅ Sim: usar context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go longRunningTask(ctx)

Exercícios Práticos

1. Implemente um Throttler

Crie um rate limiter que permite “bursts” de requests.

Ver solução
type Throttler struct {
    maxBurst int
    tokens   chan struct{}
}

func NewThrottler(maxBurst int, refillRate time.Duration) *Throttler {
    t := &Throttler{
        maxBurst: maxBurst,
        tokens:   make(chan struct{}, maxBurst),
    }
    
    // Pre-fill
    for i := 0; i < maxBurst; i++ {
        t.tokens <- struct{}{}
    }
    
    go t.refill(refillRate)
    return t
}

2. Merge com Prioridade

Modifique o Merge para priorizar canais específicos.

3. Pipeline com Error Handling

Adicione tratamento de errors a cada estágio do pipeline.

Conclusão

Os patterns de concorrência de Go são poderosos e expressivos:

Worker Pools para controle de recursos
Fan-Out/Fan-In para processamento paralelo
Pipelines para fluxos de processamento
Rate Limiting para proteção de recursos
Context para cancellation e timeouts
Select para multiplexação elegante

Próximos Passos

Recursos


Tem dúvidas sobre concorrência em Go? Participe da nossa comunidade no Discord!

Para ampliar sua visão sobre concorrência, compare os padrões de Go com o modelo de ownership em Rust vs Go (que garante thread safety em tempo de compilação) e o async/await de Python com asyncio.