在 AWS EKS 上构建具备 ACID 保证的 CQRS 命令侧服务


在一个典型的CQRS系统中,命令处理器接收一个命令,执行业务逻辑,更新数据库中的聚合状态,然后发布一个领域事件。这个流程看似简单,但在分布式环境中却暗藏着一个致命的问题:如果在数据库事务提交成功后、事件发布到消息队列之前,服务实例发生崩溃或重启,会发生什么?

结果是灾难性的。写模型(Write Model)的状态已经改变,但本应通知读模型(Read Model)和其他下游系统的事件却永远丢失了。系统的数据进入了一种“分裂”状态,即写模型和读模型之间出现了永久性的不一致。在真实项目中,这种不一致性会直接导致业务故障和数据损坏。

传统的解决方案,例如两阶段提交(2PC)或跨数据库和消息队列的分布式事务,在基于Kubernetes的现代云原生架构中几乎是不可行的。它们过于复杂、性能低下,并且会与服务发现、弹性伸ซ缩等云原生特性产生严重冲突。我们需要一种更务实、更具韧性的方法。

本文将复盘一个在AWS EKS环境中,使用Go语言和PostgreSQL实现具备本地ACID事务保证的CQRS命令侧服务的全过程。核心思路是采用事务性发件箱(Transactional Outbox)模式,将状态变更和事件创建这两个操作,绑定在数据库的同一个本地ACID事务中。

初步构想与技术选型决策

我们的目标是确保“保存聚合状态”和“发布领域事件”这两个操作的原子性。要么都成功,要么都失败。Transactional Outbox模式的本质是将待发布的事件作为一条数据,与业务数据变更在同一个事务中持久化到数据库。随后,一个独立的、可靠的进程负责轮询这张“发件箱”表,并将事件真正地发布到消息中间件。

这个方案有几个显而易见的优势:

  1. 利用本地事务: 我们依赖的是数据库久经考验的ACID能力,避免了引入复杂的分布式事务协调器。
  2. 解耦: 命令处理的同步路径只关心核心业务逻辑和数据持久化,发布事件的逻辑被异步化,降低了请求延迟。
  3. 韧性: 即使在事件发布环节出现任何故障(网络问题、消息中间件不可用),由于事件已经可靠地存储在数据库中,发布器可以安全地重试,保证事件“至少一次”被传递。

基于此,技术栈的选择也变得清晰:

  • 运行时环境: AWS EKS。它提供了我们需要的弹性、可观测性和部署自动化能力。
  • 语言: Go。其高效的并发模型和简洁的语法非常适合构建云原生应用。我们将不依赖任何重型框架,以展示模式的本质。
  • 数据库: PostgreSQL on AWS RDS。它提供了强大的ACID事务支持和成熟的JSONB类型,非常适合存储结构化的事件载荷。
  • 消息中间件: AWS SQS/SNS。简单、可靠且与AWS生态系统深度集成。

步骤化实现:从数据库到Kubernetes

1. 数据库 Schema 设计

这是整个模式的基石。除了业务表(例如orders),我们还需要一张outbox_events表。

-- 业务聚合表:订单
CREATE TABLE orders (
    id UUID PRIMARY KEY,
    customer_id UUID NOT NULL,
    total_amount NUMERIC(10, 2) NOT NULL,
    status VARCHAR(50) NOT NULL,
    version INT NOT NULL DEFAULT 1,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- 事务性发件箱表
CREATE TABLE outbox_events (
    id BIGSERIAL PRIMARY KEY,
    aggregate_id UUID NOT NULL,
    aggregate_type VARCHAR(255) NOT NULL,
    event_type VARCHAR(255) NOT NULL,
    payload JSONB NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    -- `processed_at` 字段用于标记事件是否已被轮询器处理和发布。
    -- NULL 表示未处理,非 NULL 表示已处理。
    -- 创建索引以加速轮询查询。
    processed_at TIMESTAMPTZ
);

CREATE INDEX idx_outbox_events_unprocessed ON outbox_events (processed_at) WHERE processed_at IS NULL;

这里的关键是outbox_events表。processed_at字段是核心,我们的轮询器将只查询processed_at IS NULL的记录。这个索引的WHERE子句至关重要,它使得索引只包含未处理的事件,极大地提高了查询效率。

2. Go 项目结构

一个清晰的项目结构对于可维护性至关重要。

/order-service
├── cmd
│   ├── api         # API服务主入口
│   └── outbox      # 发件箱轮询器主入口
├── internal
│   ├── application # 应用层/命令处理器
│   ├── domain      # 领域模型和事件
│   ├── infrastructure
│   │   ├── persistence # 数据库仓储实现
│   │   └── publisher   # 事件发布器实现 (SQS)
│   └── outbox        # 发件箱轮询器逻辑
├── api             # HTTP API 定义
└── deploy          # Kubernetes manifests

3. 核心代码:事务性仓储

仓储(Repository)层是实现原子性操作的核心。它的方法必须接收一个数据库事务句柄pgx.Tx作为参数,而不是一个连接池pgx.Pool。这确保了调用方可以将多个数据库操作组合在同一个事务中。

internal/infrastructure/persistence/order_repository.go:

package persistence

import (
	"context"
	"time"

	"github.com/google/uuid"
	"github.com/jackc/pgx/v5"
	"github.com/your-org/order-service/internal/domain"
)

type OrderRepository struct{}

func NewOrderRepository() *OrderRepository {
	return &OrderRepository{}
}

// SaveOrderInTx 在一个给定的数据库事务中保存订单和其领域事件。
// 这是保证原子性的关键:更新订单状态和插入发件箱事件必须在同一个事务中完成。
func (r *OrderRepository) SaveOrderInTx(ctx context.Context, tx pgx.Tx, order *domain.Order, event *domain.OutboxEvent) error {
	// 1. 更新或插入订单聚合
	// 使用乐观锁 (version) 来处理并发写入冲突
	updateSQL := `
        UPDATE orders
        SET total_amount = $2, status = $3, version = version + 1, updated_at = $4
        WHERE id = $1 AND version = $5`

	cmdTag, err := tx.Exec(ctx, updateSQL, order.ID, order.TotalAmount, order.Status, time.Now(), order.Version)
	if err != nil {
		// 这里的错误处理很关键,需要区分数据库错误和乐观锁冲突
		// pgx 会在连接层面处理网络错误,这里主要是SQL执行错误
		return err // 外部的事务管理器会回滚
	}

	if cmdTag.RowsAffected() == 0 {
		return domain.ErrOrderConcurrency // 自定义错误,表示版本冲突
	}
	
	order.Version++ // 更新内存中的版本号

	// 2. 将领域事件插入到发件箱表
	insertEventSQL := `
        INSERT INTO outbox_events (aggregate_id, aggregate_type, event_type, payload)
        VALUES ($1, $2, $3, $4)`

	_, err = tx.Exec(ctx, insertEventSQL, event.AggregateID, event.AggregateType, event.EventType, event.Payload)
	if err != nil {
		return err // 同样,让外部事务管理器回滚
	}

	return nil
}

// FindByID 用于获取订单,通常在事务外执行
func (r *OrderRepository) FindByID(ctx context.Context, pool PGXPool, id uuid.UUID) (*domain.Order, error) {
    // ... 实现查询逻辑 ...
    return nil, nil
}

// PGXPool 是一个接口,方便测试时 mock
type PGXPool interface {
    Begin(context.Context) (pgx.Tx, error)
    // ... 其他需要的方法
}

注意 SaveOrderInTx 的设计。它强制调用者提供一个事务 pgx.Tx,从而将事务控制权交给了上层的应用服务。函数内部执行了两个SQL操作,它们要么一起成功,要么一起失败。

4. 应用服务与命令处理器

应用服务负责编排整个流程:开始事务、调用领域逻辑、使用仓储持久化、最后提交或回滚事务。

internal/application/order_service.go:

package application

import (
	"context"
	"encoding/json"
	"log/slog"

	"github.com/google/uuid"
	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/your-org/order-service/internal/domain"
	"github.com/your-org/order-service/internal/infrastructure/persistence"
)

type OrderService struct {
	pool       *pgxpool.Pool
	repo       *persistence.OrderRepository
	logger     *slog.Logger
}

func NewOrderService(pool *pgxpool.Pool, repo *persistence.OrderRepository, logger *slog.Logger) *OrderService {
	return &OrderService{pool: pool, repo: repo, logger: logger}
}

type CreateOrderCommand struct {
	CustomerID  uuid.UUID
	TotalAmount float64
}

func (s *OrderService) CreateOrder(ctx context.Context, cmd CreateOrderCommand) (*domain.Order, error) {
	// 创建新的订单聚合
	order, err := domain.NewOrder(cmd.CustomerID, cmd.TotalAmount)
	if err != nil {
		return nil, err
	}

	// 准备领域事件
	eventPayload, _ := json.Marshal(map[string]interface{}{
		"orderId":     order.ID,
		"customerId":  order.CustomerID,
		"totalAmount": order.TotalAmount,
	})

	outboxEvent := &domain.OutboxEvent{
		AggregateID:   order.ID,
		AggregateType: "Order",
		EventType:     "OrderCreated",
		Payload:       eventPayload,
	}

	// 开启数据库事务
	tx, err := s.pool.Begin(ctx)
	if err != nil {
		s.logger.Error("failed to begin transaction", "error", err)
		return nil, err
	}
	// 确保事务在函数退出时被处理
	defer tx.Rollback(ctx)

	// 使用事务性仓储方法
	// 这是一个模拟的新建操作,实际生产中需要区分Create和Update
	// 这里为了简化,直接调用 SaveOrderInTx(假设已有记录)
	// 正确的实现是先 Insert Order,再 Insert Outbox Event
	// err = s.repo.CreateInTx(ctx, tx, order, outboxEvent) 
    
    // 假设是更新操作
	err = s.repo.SaveOrderInTx(ctx, tx, order, outboxEvent)
	if err != nil {
		if err == domain.ErrOrderConcurrency {
			s.logger.Warn("concurrency conflict on order update", "orderId", order.ID)
		} else {
			s.logger.Error("failed to save order in transaction", "error",err)
		}
		return nil, err
	}

	// 提交事务
	if err := tx.Commit(ctx); err != nil {
		s.logger.Error("failed to commit transaction", "error", err)
		return nil, err
	}

	s.logger.Info("order created and event saved to outbox", "orderId", order.ID)
	return order, nil
}

这里的 defer tx.Rollback(ctx) 是一个非常重要的防御性编程技巧。如果在 tx.Commit(ctx) 之前发生任何 panicreturn,事务都会被安全地回滚。只有在所有操作成功并显式调用 Commit 后,数据库的变更才会生效。

5. 发件箱轮询器 (Outbox Processor)

这是个独立的、持续运行的组件。它可以是主应用中的一个后台goroutine,但在EKS环境中,更健壮的做法是将其部署为一个独立的Deployment。这允许我们独立地扩展和监控它。

internal/outbox/processor.go:

package outbox

import (
	"context"
	"log/slog"
	"time"

	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/your-org/order-service/internal/domain"
	"github.com/your-org/order-service/internal/infrastructure/publisher"
)

type Processor struct {
	pool      *pgxpool.Pool
	publisher publisher.EventPublisher
	logger    *slog.Logger
	batchSize int
}

func NewProcessor(pool *pgxpool.Pool, publisher publisher.EventPublisher, logger *slog.Logger, batchSize int) *Processor {
	return &Processor{pool: pool, publisher: publisher, logger: logger, batchSize: batchSize}
}

func (p *Processor) Start(ctx context.Context, interval time.Duration) {
	ticker := time.NewTicker(interval)
	defer ticker.Stop()

	p.logger.Info("outbox processor started")

	for {
		select {
		case <-ctx.Done():
			p.logger.Info("outbox processor shutting down")
			return
		case <-ticker.C:
			if err := p.processBatch(ctx); err != nil {
				p.logger.Error("failed to process outbox batch", "error", err)
			}
		}
	}
}

func (p *Processor) processBatch(ctx context.Context) error {
	tx, err := p.pool.Begin(ctx)
	if err != nil {
		return err
	}
	defer tx.Rollback(ctx)

	// 使用 FOR UPDATE SKIP LOCKED 实现行级锁,允许多个轮询器实例并发工作而不会处理相同的事件。
	// 这是水平扩展的关键。
	rows, err := tx.Query(ctx, `
        SELECT id, aggregate_id, aggregate_type, event_type, payload
        FROM outbox_events
        WHERE processed_at IS NULL
        ORDER BY created_at
        LIMIT $1
        FOR UPDATE SKIP LOCKED`, p.batchSize)
	if err != nil {
		return err
	}
	defer rows.Close()

	var eventsToProcess []domain.OutboxEvent
	var eventIDsToUpdate []int64

	for rows.Next() {
		var event domain.OutboxEvent
		var id int64
		if err := rows.Scan(&id, &event.AggregateID, &event.AggregateType, &event.EventType, &event.Payload); err != nil {
			// 在真实项目中,这里应该有更完善的错误处理,可能需要跳过这条损坏的记录
			p.logger.Error("failed to scan outbox event", "error", err)
			continue
		}
		event.ID = id
		eventsToProcess = append(eventsToProcess, event)
		eventIDsToUpdate = append(eventIDsToUpdate, id)
	}

	if len(eventsToProcess) == 0 {
		return nil // 没有需要处理的事件
	}

	p.logger.Info("processing outbox events", "count", len(eventsToProcess))

	// 批量或逐条发布事件到消息队列
	for _, event := range eventsToProcess {
		if err := p.publisher.Publish(ctx, event); err != nil {
			// 如果发布失败,由于事务会回滚,这些事件将在下一轮被重新处理。
			// 这里需要配置好发布器的重试和死信队列策略。
			p.logger.Error("failed to publish event", "eventId", event.ID, "error", err)
			return err
		}
	}

	// 所有事件都成功发布后,更新发件箱表,将它们标记为已处理
	_, err = tx.Exec(ctx, `
        UPDATE outbox_events
        SET processed_at = NOW()
        WHERE id = ANY($1)`, eventIDsToUpdate)
	if err != nil {
		return err
	}

	return tx.Commit(ctx)
}

FOR UPDATE SKIP LOCKED 是PostgreSQL的一个强大特性,它允许当前事务锁定选定的行,而其他并发事务如果尝试锁定相同的行,则会直接跳过它们,而不是等待。这使得我们可以安全地运行多个轮询器实例,每个实例处理自己锁定的一批事件,从而实现水平扩展。

6. 部署到 AWS EKS

我们将创建两个独立的Kubernetes Deployment。

deploy/api-deployment.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: order-api
  labels:
    app: order-api
spec:
  replicas: 3
  selector:
    matchLabels:
      app: order-api
  template:
    metadata:
      labels:
        app: order-api
    spec:
      containers:
      - name: order-api
        image: your-account.dkr.ecr.us-east-1.amazonaws.com/order-service:latest
        command: ["/app/order-api"] # 指定运行API服务的可执行文件
        ports:
        - containerPort: 8080
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: order-db-secret
              key: url
        # ... 其他环境变量
        resources:
          requests:
            cpu: "250m"
            memory: "256Mi"
          limits:
            cpu: "500m"
            memory: "512Mi"
        readinessProbe:
          httpGet:
            path: /healthz
            port: 8080
          initialDelaySeconds: 5
          periodSeconds: 10
        livenessProbe:
          httpGet:
            path: /healthz
            port: 8080
          initialDelaySeconds: 15
          periodSeconds: 20

deploy/outbox-processor-deployment.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: outbox-processor
  labels:
    app: outbox-processor
spec:
  replicas: 2 # 可以安全地运行多个副本
  selector:
    matchLabels:
      app: outbox-processor
  template:
    metadata:
      labels:
        app: outbox-processor
    spec:
      containers:
      - name: outbox-processor
        image: your-account.dkr.ecr.us-east-1.amazonaws.com/order-service:latest
        command: ["/app/outbox-processor"] # 指定运行轮询器的可执行文件
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: order-db-secret
              key: url
        - name: SQS_QUEUE_URL
          valueFrom:
            configMapKeyRef:
              name: order-service-config
              key: sqs.queue.url
        - name: OUTBOX_INTERVAL_MS
          value: "1000"
        - name: OUTBOX_BATCH_SIZE
          value: "100"
        resources:
          requests:
            cpu: "100m"
            memory: "128Mi"
          limits:
            cpu: "200m"
            memory: "256Mi"

将这两个服务分开部署,提供了极大的灵活性。API服务的资源需求可能与CPU和内存有关,而轮询器的资源需求则与数据库IO和网络有关。我们可以根据各自的负载独立地调整它们的副本数和资源分配。

最终成果与流程图

我们成功构建了一个在EKS上运行的、具备强一致性保证的CQRS命令侧服务。即使在Pod崩溃、网络分区或消息中间件短暂不可用的情况下,系统也能保证数据状态和领域事件的最终一致性。

整个流程可以用下面的时序图清晰地展示出来:

sequenceDiagram
    participant Client
    participant API Service (EKS Pod)
    participant PostgreSQL (RDS)
    participant Outbox Processor (EKS Pod)
    participant SQS

    Client->>API Service (EKS Pod): POST /orders (CreateOrderCommand)
    activate API Service (EKS Pod)
    API Service (EKS Pod)->>PostgreSQL (RDS): BEGIN TRANSACTION
    activate PostgreSQL (RDS)
    Note over API Service (EKS Pod), PostgreSQL (RDS): 在单个ACID事务中执行
    API Service (EKS Pod)->>PostgreSQL (RDS): INSERT INTO orders (...)
    API Service (EKS Pod)->>PostgreSQL (RDS): INSERT INTO outbox_events (...)
    PostgreSQL (RDS)-->>API Service (EKS Pod): Success
    API Service (EKS Pod)->>PostgreSQL (RDS): COMMIT
    deactivate PostgreSQL (RDS)
    API Service (EKS Pod)-->>Client: 201 Created
    deactivate API Service (EKS Pod)

    loop 每隔N秒轮询
        Outbox Processor (EKS Pod)->>PostgreSQL (RDS): SELECT ... FROM outbox_events ... FOR UPDATE SKIP LOCKED
        activate PostgreSQL (RDS)
        PostgreSQL (RDS)-->>Outbox Processor (EKS Pod): [Event1, Event2, ...]
        deactivate PostgreSQL (RDS)
        
        alt 成功获取到事件
            Outbox Processor (EKS Pod)->>SQS: Publish(Event1)
            activate SQS
            SQS-->>Outbox Processor (EKS Pod): ACK
            deactivate SQS

            Outbox Processor (EKS Pod)->>SQS: Publish(Event2)
            activate SQS
            SQS-->>Outbox Processor (EKS Pod): ACK
            deactivate SQS

            Outbox Processor (EKS Pod)->>PostgreSQL (RDS): BEGIN TRANSACTION
            activate PostgreSQL (RDS)
            Outbox Processor (EKS Pod)->>PostgreSQL (RDS): UPDATE outbox_events SET processed_at=NOW() WHERE id IN (...)
            Outbox Processor (EKS Pod)->>PostgreSQL (RDS): COMMIT
            deactivate PostgreSQL (RDS)
        end
    end

方案的局限性与未来迭代

虽然事务性发件箱模式非常强大和务实,但它并非没有权衡。

  1. 事件延迟: 引入了额外的延迟。事件的发布时间取决于轮询器的轮询间隔。对于需要极低延迟的场景,这可能不是最佳选择。
  2. 发件箱表膨胀: outbox_events表会持续增长。在生产环境中,必须有一个定期的清理任务(归档或删除)来处理已经被处理过的旧事件,否则会影响数据库性能。
  3. 轮询开销: 即使没有新事件,轮询器也会持续对数据库产生查询负载。虽然通过优化索引可以将其降到很低,但负载依然存在。

一个更高级的替代方案是使用**变更数据捕获 (Change Data Capture, CDC)**,例如通过Debezium。CDC直接监听数据库的事务日志(如PostgreSQL的WAL),并将变更实时流式传输到消息队列。这种方式对业务代码是完全透明的,延迟更低,且避免了轮询。然而,CDC方案的引入会增加架构的复杂度,需要部署和维护额外的组件(如Kafka Connect、Debezium Connector),这对团队的运维能力提出了更高的要求。对于许多项目而言,本文实现的轮询式发件箱模式是在简单性、可靠性和性能之间的一个极佳平衡点。


  目录