Go e Temporal: Workflows Resilientes e Duráveis
Temporal é uma plataforma de orquestração de workflows que permite criar processos de negócio resilientes, duráveis e escaláveis. Desenvolvido pelos criadores do Cadence (Uber), é amplamente adotado por empresas como Netflix, Stripe e Shopify.
Neste guia, você aprenderá a construir workflows complexos em Go com garantias de execução, retries automáticos e compensação (Saga pattern).
Índice
- Conceitos Fundamentais
- Configuração do Ambiente
- Primeiro Workflow
- Activities e Retries
- Saga Pattern
- Signals e Queries
- Timers e Cron Jobs
- Padrões Avançados
Conceitos Fundamentais
Arquitetura Temporal
┌─────────────────────────────────────────────────────────────┐
│ Temporal Server │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Frontend │ │ History │ │ Matching │ │
│ │ (gRPC) │ │ (Events) │ │ (Tasks) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
│ Persistence: PostgreSQL / MySQL / Cassandra / Elasticsearch│
└─────────────────────────────────────────────────────────────┘
│
│ gRPC
▼
┌─────────────────────────────────────────────────────────────┐
│ Workers (Go) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Workflow │ │ Activity │ │ Activity │ │
│ │ Engine │ │ Executor │ │ Executor │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
│ Workflows são determinísticos (replayable) │
│ Activities contêm side-effects (I/O, APIs) │
└─────────────────────────────────────────────────────────────┘
Conceitos Chave
Workflows: Funções Go que definem a lógica de negócio. São determinísticas e podem ser “replayed” indefinidamente.
Activities: Funções que contêm side-effects (chamadas HTTP, acesso a DB, etc). São executadas uma única vez e podem falhar.
Workers: Processos que executam workflows e activities. Escalam horizontalmente.
Tasks: Unidades de trabalho atribuídas aos workers pelo servidor Temporal.
Configuração do Ambiente
Docker Compose
# docker-compose.yml
version: '3.8'
services:
temporal:
image: temporalio/auto-setup:latest
ports:
- "7233:7233" # Frontend gRPC
- "8233:8233" # Web UI
environment:
- DB=postgresql
- DB_PORT=5432
- POSTGRES_USER=temporal
- POSTGRES_PWD=temporal
- POSTGRES_SEEDS=postgresql
- DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development.yaml
depends_on:
- postgresql
postgresql:
image: postgres:13
environment:
POSTGRES_USER: temporal
POSTGRES_PASSWORD: temporal
POSTGRES_DB: temporal
ports:
- "5432:5432"
temporal-ui:
image: temporalio/ui:latest
ports:
- "8080:8080"
environment:
- TEMPORAL_ADDRESS=temporal:7233
- TEMPORAL_CORS_ORIGINS=http://localhost:3000
# Iniciar Temporal
docker-compose up -d
# Acessar UI
open http://localhost:8080
Instalação do SDK Go
go get go.temporal.io/sdk
go get go.temporal.io/sdk/client
go get go.temporal.io/sdk/worker
Primeiro Workflow
Cliente Temporal
package temporal
import (
"context"
"fmt"
"go.temporal.io/sdk/client"
)
func NewClient() (client.Client, error) {
c, err := client.Dial(client.Options{
HostPort: "localhost:7233",
Namespace: "default",
})
if err != nil {
return nil, fmt.Errorf("falha ao conectar no Temporal: %w", err)
}
return c, nil
}
Workflow Simples
package workflows
import (
"time"
"go.temporal.io/sdk/workflow"
)
// SimpleWorkflow é um workflow básico
type SimpleWorkflow struct{}
// Input e Output
type SimpleInput struct {
Name string
}
type SimpleOutput struct {
Message string
}
func (w *SimpleWorkflow) Execute(ctx workflow.Context, input SimpleInput) (SimpleOutput, error) {
// Configura opções de retry para activities
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 3,
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
// Executa activity
var result string
err := workflow.ExecuteActivity(ctx, activities.Greet, input.Name).Get(ctx, &result)
if err != nil {
return SimpleOutput{}, err
}
return SimpleOutput{Message: result}, nil
}
Activity
package activities
import (
"context"
"fmt"
)
// Activities contém todas as activities do sistema
type Activities struct{}
func NewActivities() *Activities {
return &Activities{}
}
func (a *Activities) Greet(ctx context.Context, name string) (string, error) {
// Simula chamada externa
return fmt.Sprintf("Olá, %s!", name), nil
}
func (a *Activities) ProcessPayment(ctx context.Context, orderID string, amount float64) error {
// Integração com gateway de pagamento
return nil
}
func (a *Activities) SendEmail(ctx context.Context, to, subject, body string) error {
// Integração com serviço de email
return nil
}
Worker
package worker
import (
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/client"
"myapp/workflows"
"myapp/activities"
)
func StartWorker(c client.Client) worker.Worker {
w := worker.New(c, "task-queue-name", worker.Options{})
// Registra workflows
w.RegisterWorkflow(workflows.SimpleWorkflow{}.Execute)
// Registra activities
acts := activities.NewActivities()
w.RegisterActivity(acts.Greet)
w.RegisterActivity(acts.ProcessPayment)
w.RegisterActivity(acts.SendEmail)
// Inicia worker em goroutine
go func() {
err := w.Run(worker.InterruptCh())
if err != nil {
log.Fatal(err)
}
}()
return w
}
Iniciando um Workflow
package main
import (
"context"
"fmt"
"log"
"go.temporal.io/sdk/client"
"myapp/temporal"
"myapp/workflows"
)
func main() {
c, err := temporal.NewClient()
if err != nil {
log.Fatal(err)
}
defer c.Close()
// Inicia workflow
options := client.StartWorkflowOptions{
ID: "simple-workflow-001",
TaskQueue: "task-queue-name",
}
input := workflows.SimpleInput{Name: "Temporal"}
we, err := c.ExecuteWorkflow(context.Background(), options,
workflows.SimpleWorkflow{}.Execute, input)
if err != nil {
log.Fatal(err)
}
// Aguarda resultado
var result workflows.SimpleOutput
err = we.Get(context.Background(), &result)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Resultado: %s\n", result.Message)
}
Activities e Retries
Retry Policies
func (w *OrderWorkflow) Execute(ctx workflow.Context, input OrderInput) (OrderOutput, error) {
// Retry padrão
ao := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: 10 * time.Second,
MaximumAttempts: 3,
NonRetryableErrorTypes: []string{"InvalidPaymentError"},
},
}
// Sem retry (para operações não idempotentes)
aoNoRetry := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 1,
},
}
// Retry infinito com backoff exponencial
aoInfinite := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: 5 * time.Minute,
// MaximumAttempts omitido = retry infinito
},
}
}
Heartbeats para Activities Longas
func (a *Activities) LongRunningProcess(ctx context.Context, items []Item) error {
// Registra heartbeat a cada 10s
for i, item := range items {
// Processa item
if err := process(item); err != nil {
return err
}
// Envia heartbeat
activity.RecordHeartbeat(ctx, i+1, len(items))
}
return nil
}
// Configuração do workflow
func (w *Workflow) Execute(ctx workflow.Context, items []Item) error {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 24 * time.Hour, // Activity pode durar muito
HeartbeatTimeout: 30 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)
return workflow.ExecuteActivity(ctx, a.LongRunningProcess, items).Get(ctx, nil)
}
Saga Pattern
Implementação de Saga
package workflows
import (
"fmt"
"go.temporal.io/sdk/workflow"
)
// Saga gerencia compensações
type Saga struct {
compensations []func() error
}
func NewSaga() *Saga {
return &Saga{
compensations: make([]func() error, 0),
}
}
func (s *Saga) AddCompensation(fn func() error) {
s.compensations = append(s.compensations, fn)
}
func (s *Saga) Compensate(ctx workflow.Context) error {
// Executa compensações na ordem inversa
for i := len(s.compensations) - 1; i >= 0; i-- {
if err := s.compensations[i](); err != nil {
workflow.GetLogger(ctx).Error("Compensation failed", "error", err)
// Continua tentando outras compensações
}
}
return nil
}
// OrderSagaWorkflow executa saga de pedido
func OrderSagaWorkflow(ctx workflow.Context, input OrderInput) error {
saga := NewSaga()
acts := activities.New()
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)
// Step 1: Reservar estoque
err := workflow.ExecuteActivity(ctx, acts.ReserveInventory, input.Items).Get(ctx, nil)
if err != nil {
return fmt.Errorf("falha ao reservar estoque: %w", err)
}
saga.AddCompensation(func() error {
return workflow.ExecuteActivity(ctx, acts.ReleaseInventory, input.Items).Get(ctx, nil)
})
// Step 2: Processar pagamento
var paymentID string
err = workflow.ExecuteActivity(ctx, acts.ProcessPayment, input.Total).Get(ctx, &paymentID)
if err != nil {
saga.Compensate(ctx)
return fmt.Errorf("falha no pagamento: %w", err)
}
saga.AddCompensation(func() error {
return workflow.ExecuteActivity(ctx, acts.RefundPayment, paymentID).Get(ctx, nil)
})
// Step 3: Criar ordem de envio
err = workflow.ExecuteActivity(ctx, acts.CreateShipment, input.Address).Get(ctx, nil)
if err != nil {
saga.Compensate(ctx)
return fmt.Errorf("falha ao criar envio: %w", err)
}
saga.AddCompensation(func() error {
return workflow.ExecuteActivity(ctx, acts.CancelShipment, input.Address).Get(ctx, nil)
})
// Step 4: Enviar confirmação
err = workflow.ExecuteActivity(ctx, acts.SendConfirmationEmail, input.Email).Get(ctx, nil)
if err != nil {
// Email não requer compensação
workflow.GetLogger(ctx).Warn("Falha ao enviar email", "error", err)
}
return nil
}
Signals e Queries
Signals (Comunicação Externa → Workflow)
// OrderWorkflow com signals
type OrderWorkflow struct {
state OrderState
approved bool
cancelChan workflow.ReceiveChannel
}
func OrderWithApprovalWorkflow(ctx workflow.Context, input OrderInput) (OrderOutput, error) {
w := &OrderWorkflow{
state: OrderStatePending,
}
// Configura signal handlers
selector := workflow.NewSelector(ctx)
// Signal de aprovação
approveChan := workflow.GetSignalChannel(ctx, "approve")
selector.AddReceive(approveChan, func(c workflow.ReceiveChannel, more bool) {
var approval ApprovalInfo
c.Receive(ctx, &approval)
w.approved = true
w.state = OrderStateApproved
})
// Signal de cancelamento
cancelChan := workflow.GetSignalChannel(ctx, "cancel")
selector.AddReceive(cancelChan, func(c workflow.ReceiveChannel, more bool) {
w.state = OrderStateCancelled
})
// Timer de timeout
timer := workflow.NewTimer(ctx, 24*time.Hour)
selector.AddFuture(timer, func(f workflow.Future) {
w.state = OrderStateExpired
})
// Aguarda um dos eventos
selector.Select(ctx)
switch w.state {
case OrderStateApproved:
// Continua com processamento
return processApprovedOrder(ctx, input)
case OrderStateCancelled:
return OrderOutput{Status: "cancelled"}, nil
case OrderStateExpired:
return OrderOutput{Status: "expired"}, nil
default:
return OrderOutput{}, fmt.Errorf("estado inesperado")
}
}
// Enviar signal
c.SignalWorkflow(ctx, workflowID, "", "approve", ApprovalInfo{
ApprovedBy: "manager@example.com",
ApprovedAt: time.Now(),
})
Queries (Consultar Estado do Workflow)
func init() {
workflow.RegisterQueryType("getOrderStatus", GetOrderStatus)
}
func OrderWorkflow(ctx workflow.Context, input OrderInput) error {
state := &OrderState{Status: "created"}
// Registra query handler
err := workflow.SetQueryHandler(ctx, "getOrderStatus", func() (OrderStatus, error) {
return OrderStatus{
ID: input.ID,
Status: state.Status,
UpdatedAt: state.UpdatedAt,
}, nil
})
if err != nil {
return err
}
// Resto do workflow...
}
// Consultar workflow em execução
response, err := c.QueryWorkflow(ctx, workflowID, "", "getOrderStatus")
if err != nil {
log.Fatal(err)
}
var status OrderStatus
err = response.Get(&status)
Timers e Cron Jobs
Timers
func ReminderWorkflow(ctx workflow.Context, input ReminderInput) error {
// Timer simples
workflow.Sleep(ctx, 1*time.Hour)
// Timer cancelável
timer := workflow.NewTimer(ctx, 30*time.Minute)
selector := workflow.NewSelector(ctx)
selector.AddFuture(timer, func(f workflow.Future) {
// Timer disparou
sendReminder()
})
// Ou aguarda signal para cancelar
cancelChan := workflow.GetSignalChannel(ctx, "cancel-reminder")
selector.AddReceive(cancelChan, func(c workflow.ReceiveChannel, more bool) {
// Cancela timer
})
selector.Select(ctx)
return nil
}
Cron Workflows
// ScheduleOptions para execução periódica
func StartCronWorkflow(c client.Client) error {
schedule, err := c.ScheduleClient().Create(ctx, client.ScheduleOptions{
ID: "daily-report-schedule",
Spec: client.ScheduleSpec{
CronExpressions: []string{"0 9 * * *"}, // 9h todo dia
},
Action: &client.ScheduleWorkflowAction{
Workflow: DailyReportWorkflow,
TaskQueue: "report-queue",
Args: []interface{}{ReportInput{Type: "daily"}},
},
})
if err != nil {
return err
}
return nil
}
func DailyReportWorkflow(ctx workflow.Context, input ReportInput) error {
// Gera relatório diário
return nil
}
Padrões Avançados
Child Workflows
func ParentWorkflow(ctx workflow.Context, orders []Order) error {
futures := make([]workflow.ChildWorkflowFuture, len(orders))
// Inicia child workflows em paralelo
for i, order := range orders {
childCtx := workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{
WorkflowID: fmt.Sprintf("order-%s", order.ID),
})
futures[i] = workflow.ExecuteChildWorkflow(childCtx, OrderWorkflow, order)
}
// Aguarda todos completarem
for i, future := range futures {
var result OrderResult
if err := future.Get(ctx, &result); err != nil {
// Handle error
}
}
return nil
}
Continue-As-New
func LongRunningWorkflow(ctx workflow.Context, state WorkflowState) error {
// Processa batch
for i := 0; i < 100; i++ {
if err := processItem(ctx, state.NextItem); err != nil {
return err
}
state.NextItem++
state.ProcessedCount++
// Limita history size
if state.ProcessedCount >= 1000 {
// Continua como novo workflow
return workflow.NewContinueAsNewError(ctx, LongRunningWorkflow, state)
}
}
return nil
}
Mutex Pattern
func MutexWorkflow(ctx workflow.Context, resourceID string, work func() error) error {
// Adquire lock
err := workflow.ExecuteActivity(ctx, AcquireLock, resourceID).Get(ctx, nil)
if err != nil {
return err
}
// Garante release
defer func() {
_ = workflow.ExecuteActivity(ctx, ReleaseLock, resourceID).Get(ctx, nil)
}()
return work()
}
Conclusão
Neste guia, você aprendeu:
✅ Fundamentos: Workflows, Activities, Workers ✅ Resiliência: Retries, timeouts, heartbeats ✅ Saga Pattern: Transações distribuídas com compensação ✅ Comunicação: Signals e Queries ✅ Agendamento: Timers e Cron jobs ✅ Escalabilidade: Child workflows, Continue-As-New
Próximos Passos
- Go e Dapr - Distributed Application Runtime
- Go Microservices - Arquitetura de microsserviços
- Go Observability - Monitoramento de workflows
FAQ
Q: Qual a diferença entre Temporal e Cadence? R: Temporal é um fork do Cadence com foco em cloud-native e multi-tenancy. Compartilham a maioria dos conceitos.
Q: Posso usar Temporal com outras linguagens? R: Sim! SDKs disponíveis para Go, Java, TypeScript, Python, .NET, PHP e Rust.
Q: Quanto tempo um workflow pode durar? R: Ilimitado. Workflows podem executar por anos, com event history ilimitado (usa Continue-As-New).
Q: É necessário usar o servidor Temporal? R: Sim, o servidor gerencia estado e orquestração. Pode ser self-hosted ou usar Temporal Cloud.