Go e Temporal: Workflows Resilientes e Duráveis

Temporal é uma plataforma de orquestração de workflows que permite criar processos de negócio resilientes, duráveis e escaláveis. Desenvolvido pelos criadores do Cadence (Uber), é amplamente adotado por empresas como Netflix, Stripe e Shopify.

Neste guia, você aprenderá a construir workflows complexos em Go com garantias de execução, retries automáticos e compensação (Saga pattern).

Índice

  1. Conceitos Fundamentais
  2. Configuração do Ambiente
  3. Primeiro Workflow
  4. Activities e Retries
  5. Saga Pattern
  6. Signals e Queries
  7. Timers e Cron Jobs
  8. Padrões Avançados

Conceitos Fundamentais

Arquitetura Temporal

┌─────────────────────────────────────────────────────────────┐
│                   Temporal Server                          │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐      │
│  │  Frontend    │  │   History    │  │   Matching   │      │
│  │   (gRPC)     │  │   (Events)   │  │   (Tasks)    │      │
│  └──────────────┘  └──────────────┘  └──────────────┘      │
│                                                             │
│  Persistence: PostgreSQL / MySQL / Cassandra / Elasticsearch│
└─────────────────────────────────────────────────────────────┘
           │ gRPC
┌─────────────────────────────────────────────────────────────┐
│                   Workers (Go)                             │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐      │
│  │   Workflow   │  │   Activity   │  │   Activity   │      │
│  │   Engine     │  │   Executor   │  │   Executor   │      │
│  └──────────────┘  └──────────────┘  └──────────────┘      │
│                                                             │
│  Workflows são determinísticos (replayable)                │
│  Activities contêm side-effects (I/O, APIs)                │
└─────────────────────────────────────────────────────────────┘

Conceitos Chave

Workflows: Funções Go que definem a lógica de negócio. São determinísticas e podem ser “replayed” indefinidamente.

Activities: Funções que contêm side-effects (chamadas HTTP, acesso a DB, etc). São executadas uma única vez e podem falhar.

Workers: Processos que executam workflows e activities. Escalam horizontalmente.

Tasks: Unidades de trabalho atribuídas aos workers pelo servidor Temporal.


Configuração do Ambiente

Docker Compose

# docker-compose.yml
version: '3.8'

services:
  temporal:
    image: temporalio/auto-setup:latest
    ports:
      - "7233:7233"      # Frontend gRPC
      - "8233:8233"      # Web UI
    environment:
      - DB=postgresql
      - DB_PORT=5432
      - POSTGRES_USER=temporal
      - POSTGRES_PWD=temporal
      - POSTGRES_SEEDS=postgresql
      - DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development.yaml
    depends_on:
      - postgresql

  postgresql:
    image: postgres:13
    environment:
      POSTGRES_USER: temporal
      POSTGRES_PASSWORD: temporal
      POSTGRES_DB: temporal
    ports:
      - "5432:5432"

  temporal-ui:
    image: temporalio/ui:latest
    ports:
      - "8080:8080"
    environment:
      - TEMPORAL_ADDRESS=temporal:7233
      - TEMPORAL_CORS_ORIGINS=http://localhost:3000
# Iniciar Temporal
docker-compose up -d

# Acessar UI
open http://localhost:8080

Instalação do SDK Go

go get go.temporal.io/sdk
go get go.temporal.io/sdk/client
go get go.temporal.io/sdk/worker

Primeiro Workflow

Cliente Temporal

package temporal

import (
    "context"
    "fmt"

    "go.temporal.io/sdk/client"
)

func NewClient() (client.Client, error) {
    c, err := client.Dial(client.Options{
        HostPort: "localhost:7233",
        Namespace: "default",
    })
    if err != nil {
        return nil, fmt.Errorf("falha ao conectar no Temporal: %w", err)
    }
    return c, nil
}

Workflow Simples

package workflows

import (
    "time"

    "go.temporal.io/sdk/workflow"
)

// SimpleWorkflow é um workflow básico
type SimpleWorkflow struct{}

// Input e Output
type SimpleInput struct {
    Name string
}

type SimpleOutput struct {
    Message string
}

func (w *SimpleWorkflow) Execute(ctx workflow.Context, input SimpleInput) (SimpleOutput, error) {
    // Configura opções de retry para activities
    ao := workflow.ActivityOptions{
        StartToCloseTimeout: 10 * time.Second,
        RetryPolicy: &temporal.RetryPolicy{
            InitialInterval:    time.Second,
            BackoffCoefficient: 2.0,
            MaximumInterval:    time.Minute,
            MaximumAttempts:    3,
        },
    }
    ctx = workflow.WithActivityOptions(ctx, ao)

    // Executa activity
    var result string
    err := workflow.ExecuteActivity(ctx, activities.Greet, input.Name).Get(ctx, &result)
    if err != nil {
        return SimpleOutput{}, err
    }

    return SimpleOutput{Message: result}, nil
}

Activity

package activities

import (
    "context"
    "fmt"
)

// Activities contém todas as activities do sistema
type Activities struct{}

func NewActivities() *Activities {
    return &Activities{}
}

func (a *Activities) Greet(ctx context.Context, name string) (string, error) {
    // Simula chamada externa
    return fmt.Sprintf("Olá, %s!", name), nil
}

func (a *Activities) ProcessPayment(ctx context.Context, orderID string, amount float64) error {
    // Integração com gateway de pagamento
    return nil
}

func (a *Activities) SendEmail(ctx context.Context, to, subject, body string) error {
    // Integração com serviço de email
    return nil
}

Worker

package worker

import (
    "go.temporal.io/sdk/worker"
    "go.temporal.io/sdk/client"
    
    "myapp/workflows"
    "myapp/activities"
)

func StartWorker(c client.Client) worker.Worker {
    w := worker.New(c, "task-queue-name", worker.Options{})

    // Registra workflows
    w.RegisterWorkflow(workflows.SimpleWorkflow{}.Execute)

    // Registra activities
    acts := activities.NewActivities()
    w.RegisterActivity(acts.Greet)
    w.RegisterActivity(acts.ProcessPayment)
    w.RegisterActivity(acts.SendEmail)

    // Inicia worker em goroutine
    go func() {
        err := w.Run(worker.InterruptCh())
        if err != nil {
            log.Fatal(err)
        }
    }()

    return w
}

Iniciando um Workflow

package main

import (
    "context"
    "fmt"
    "log"

    "go.temporal.io/sdk/client"
    
    "myapp/temporal"
    "myapp/workflows"
)

func main() {
    c, err := temporal.NewClient()
    if err != nil {
        log.Fatal(err)
    }
    defer c.Close()

    // Inicia workflow
    options := client.StartWorkflowOptions{
        ID:        "simple-workflow-001",
        TaskQueue: "task-queue-name",
    }

    input := workflows.SimpleInput{Name: "Temporal"}
    
    we, err := c.ExecuteWorkflow(context.Background(), options, 
        workflows.SimpleWorkflow{}.Execute, input)
    if err != nil {
        log.Fatal(err)
    }

    // Aguarda resultado
    var result workflows.SimpleOutput
    err = we.Get(context.Background(), &result)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("Resultado: %s\n", result.Message)
}

Activities e Retries

Retry Policies

func (w *OrderWorkflow) Execute(ctx workflow.Context, input OrderInput) (OrderOutput, error) {
    // Retry padrão
    ao := workflow.ActivityOptions{
        StartToCloseTimeout: 30 * time.Second,
        RetryPolicy: &temporal.RetryPolicy{
            InitialInterval:    time.Second,
            BackoffCoefficient: 2.0,
            MaximumInterval:    10 * time.Second,
            MaximumAttempts:    3,
            NonRetryableErrorTypes: []string{"InvalidPaymentError"},
        },
    }

    // Sem retry (para operações não idempotentes)
    aoNoRetry := workflow.ActivityOptions{
        StartToCloseTimeout: 30 * time.Second,
        RetryPolicy: &temporal.RetryPolicy{
            MaximumAttempts: 1,
        },
    }

    // Retry infinito com backoff exponencial
    aoInfinite := workflow.ActivityOptions{
        StartToCloseTimeout: 30 * time.Second,
        RetryPolicy: &temporal.RetryPolicy{
            InitialInterval:    time.Second,
            BackoffCoefficient: 2.0,
            MaximumInterval:    5 * time.Minute,
            // MaximumAttempts omitido = retry infinito
        },
    }
}

Heartbeats para Activities Longas

func (a *Activities) LongRunningProcess(ctx context.Context, items []Item) error {
    // Registra heartbeat a cada 10s
    for i, item := range items {
        // Processa item
        if err := process(item); err != nil {
            return err
        }

        // Envia heartbeat
        activity.RecordHeartbeat(ctx, i+1, len(items))
    }
    return nil
}

// Configuração do workflow
func (w *Workflow) Execute(ctx workflow.Context, items []Item) error {
    ao := workflow.ActivityOptions{
        StartToCloseTimeout: 24 * time.Hour, // Activity pode durar muito
        HeartbeatTimeout:    30 * time.Second,
    }
    ctx = workflow.WithActivityOptions(ctx, ao)

    return workflow.ExecuteActivity(ctx, a.LongRunningProcess, items).Get(ctx, nil)
}

Saga Pattern

Implementação de Saga

package workflows

import (
    "fmt"

    "go.temporal.io/sdk/workflow"
)

// Saga gerencia compensações
type Saga struct {
    compensations []func() error
}

func NewSaga() *Saga {
    return &Saga{
        compensations: make([]func() error, 0),
    }
}

func (s *Saga) AddCompensation(fn func() error) {
    s.compensations = append(s.compensations, fn)
}

func (s *Saga) Compensate(ctx workflow.Context) error {
    // Executa compensações na ordem inversa
    for i := len(s.compensations) - 1; i >= 0; i-- {
        if err := s.compensations[i](); err != nil {
            workflow.GetLogger(ctx).Error("Compensation failed", "error", err)
            // Continua tentando outras compensações
        }
    }
    return nil
}

// OrderSagaWorkflow executa saga de pedido
func OrderSagaWorkflow(ctx workflow.Context, input OrderInput) error {
    saga := NewSaga()
    acts := activities.New()

    ao := workflow.ActivityOptions{
        StartToCloseTimeout: 10 * time.Second,
    }
    ctx = workflow.WithActivityOptions(ctx, ao)

    // Step 1: Reservar estoque
    err := workflow.ExecuteActivity(ctx, acts.ReserveInventory, input.Items).Get(ctx, nil)
    if err != nil {
        return fmt.Errorf("falha ao reservar estoque: %w", err)
    }
    saga.AddCompensation(func() error {
        return workflow.ExecuteActivity(ctx, acts.ReleaseInventory, input.Items).Get(ctx, nil)
    })

    // Step 2: Processar pagamento
    var paymentID string
    err = workflow.ExecuteActivity(ctx, acts.ProcessPayment, input.Total).Get(ctx, &paymentID)
    if err != nil {
        saga.Compensate(ctx)
        return fmt.Errorf("falha no pagamento: %w", err)
    }
    saga.AddCompensation(func() error {
        return workflow.ExecuteActivity(ctx, acts.RefundPayment, paymentID).Get(ctx, nil)
    })

    // Step 3: Criar ordem de envio
    err = workflow.ExecuteActivity(ctx, acts.CreateShipment, input.Address).Get(ctx, nil)
    if err != nil {
        saga.Compensate(ctx)
        return fmt.Errorf("falha ao criar envio: %w", err)
    }
    saga.AddCompensation(func() error {
        return workflow.ExecuteActivity(ctx, acts.CancelShipment, input.Address).Get(ctx, nil)
    })

    // Step 4: Enviar confirmação
    err = workflow.ExecuteActivity(ctx, acts.SendConfirmationEmail, input.Email).Get(ctx, nil)
    if err != nil {
        // Email não requer compensação
        workflow.GetLogger(ctx).Warn("Falha ao enviar email", "error", err)
    }

    return nil
}

Signals e Queries

Signals (Comunicação Externa → Workflow)

// OrderWorkflow com signals
type OrderWorkflow struct {
    state      OrderState
    approved   bool
    cancelChan workflow.ReceiveChannel
}

func OrderWithApprovalWorkflow(ctx workflow.Context, input OrderInput) (OrderOutput, error) {
    w := &OrderWorkflow{
        state: OrderStatePending,
    }

    // Configura signal handlers
    selector := workflow.NewSelector(ctx)

    // Signal de aprovação
    approveChan := workflow.GetSignalChannel(ctx, "approve")
    selector.AddReceive(approveChan, func(c workflow.ReceiveChannel, more bool) {
        var approval ApprovalInfo
        c.Receive(ctx, &approval)
        w.approved = true
        w.state = OrderStateApproved
    })

    // Signal de cancelamento
    cancelChan := workflow.GetSignalChannel(ctx, "cancel")
    selector.AddReceive(cancelChan, func(c workflow.ReceiveChannel, more bool) {
        w.state = OrderStateCancelled
    })

    // Timer de timeout
    timer := workflow.NewTimer(ctx, 24*time.Hour)
    selector.AddFuture(timer, func(f workflow.Future) {
        w.state = OrderStateExpired
    })

    // Aguarda um dos eventos
    selector.Select(ctx)

    switch w.state {
    case OrderStateApproved:
        // Continua com processamento
        return processApprovedOrder(ctx, input)
    case OrderStateCancelled:
        return OrderOutput{Status: "cancelled"}, nil
    case OrderStateExpired:
        return OrderOutput{Status: "expired"}, nil
    default:
        return OrderOutput{}, fmt.Errorf("estado inesperado")
    }
}

// Enviar signal
c.SignalWorkflow(ctx, workflowID, "", "approve", ApprovalInfo{
    ApprovedBy: "manager@example.com",
    ApprovedAt: time.Now(),
})

Queries (Consultar Estado do Workflow)

func init() {
    workflow.RegisterQueryType("getOrderStatus", GetOrderStatus)
}

func OrderWorkflow(ctx workflow.Context, input OrderInput) error {
    state := &OrderState{Status: "created"}

    // Registra query handler
    err := workflow.SetQueryHandler(ctx, "getOrderStatus", func() (OrderStatus, error) {
        return OrderStatus{
            ID:        input.ID,
            Status:    state.Status,
            UpdatedAt: state.UpdatedAt,
        }, nil
    })
    if err != nil {
        return err
    }

    // Resto do workflow...
}

// Consultar workflow em execução
response, err := c.QueryWorkflow(ctx, workflowID, "", "getOrderStatus")
if err != nil {
    log.Fatal(err)
}

var status OrderStatus
err = response.Get(&status)

Timers e Cron Jobs

Timers

func ReminderWorkflow(ctx workflow.Context, input ReminderInput) error {
    // Timer simples
    workflow.Sleep(ctx, 1*time.Hour)
    
    // Timer cancelável
    timer := workflow.NewTimer(ctx, 30*time.Minute)
    
    selector := workflow.NewSelector(ctx)
    selector.AddFuture(timer, func(f workflow.Future) {
        // Timer disparou
        sendReminder()
    })
    
    // Ou aguarda signal para cancelar
    cancelChan := workflow.GetSignalChannel(ctx, "cancel-reminder")
    selector.AddReceive(cancelChan, func(c workflow.ReceiveChannel, more bool) {
        // Cancela timer
    })
    
    selector.Select(ctx)
    return nil
}

Cron Workflows

// ScheduleOptions para execução periódica
func StartCronWorkflow(c client.Client) error {
    schedule, err := c.ScheduleClient().Create(ctx, client.ScheduleOptions{
        ID: "daily-report-schedule",
        Spec: client.ScheduleSpec{
            CronExpressions: []string{"0 9 * * *"}, // 9h todo dia
        },
        Action: &client.ScheduleWorkflowAction{
            Workflow:  DailyReportWorkflow,
            TaskQueue: "report-queue",
            Args:      []interface{}{ReportInput{Type: "daily"}},
        },
    })
    if err != nil {
        return err
    }

    return nil
}

func DailyReportWorkflow(ctx workflow.Context, input ReportInput) error {
    // Gera relatório diário
    return nil
}

Padrões Avançados

Child Workflows

func ParentWorkflow(ctx workflow.Context, orders []Order) error {
    futures := make([]workflow.ChildWorkflowFuture, len(orders))

    // Inicia child workflows em paralelo
    for i, order := range orders {
        childCtx := workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{
            WorkflowID: fmt.Sprintf("order-%s", order.ID),
        })
        
        futures[i] = workflow.ExecuteChildWorkflow(childCtx, OrderWorkflow, order)
    }

    // Aguarda todos completarem
    for i, future := range futures {
        var result OrderResult
        if err := future.Get(ctx, &result); err != nil {
            // Handle error
        }
    }

    return nil
}

Continue-As-New

func LongRunningWorkflow(ctx workflow.Context, state WorkflowState) error {
    // Processa batch
    for i := 0; i < 100; i++ {
        if err := processItem(ctx, state.NextItem); err != nil {
            return err
        }
        state.NextItem++
        state.ProcessedCount++

        // Limita history size
        if state.ProcessedCount >= 1000 {
            // Continua como novo workflow
            return workflow.NewContinueAsNewError(ctx, LongRunningWorkflow, state)
        }
    }

    return nil
}

Mutex Pattern

func MutexWorkflow(ctx workflow.Context, resourceID string, work func() error) error {
    // Adquire lock
    err := workflow.ExecuteActivity(ctx, AcquireLock, resourceID).Get(ctx, nil)
    if err != nil {
        return err
    }

    // Garante release
    defer func() {
        _ = workflow.ExecuteActivity(ctx, ReleaseLock, resourceID).Get(ctx, nil)
    }()

    return work()
}

Conclusão

Neste guia, você aprendeu:

Fundamentos: Workflows, Activities, Workers ✅ Resiliência: Retries, timeouts, heartbeats ✅ Saga Pattern: Transações distribuídas com compensação ✅ Comunicação: Signals e Queries ✅ Agendamento: Timers e Cron jobs ✅ Escalabilidade: Child workflows, Continue-As-New

Próximos Passos

  1. Go e Dapr - Distributed Application Runtime
  2. Go Microservices - Arquitetura de microsserviços
  3. Go Observability - Monitoramento de workflows

FAQ

Q: Qual a diferença entre Temporal e Cadence? R: Temporal é um fork do Cadence com foco em cloud-native e multi-tenancy. Compartilham a maioria dos conceitos.

Q: Posso usar Temporal com outras linguagens? R: Sim! SDKs disponíveis para Go, Java, TypeScript, Python, .NET, PHP e Rust.

Q: Quanto tempo um workflow pode durar? R: Ilimitado. Workflows podem executar por anos, com event history ilimitado (usa Continue-As-New).

Q: É necessário usar o servidor Temporal? R: Sim, o servidor gerencia estado e orquestração. Pode ser self-hosted ou usar Temporal Cloud.