在一个典型的CQRS系统中,命令处理器接收一个命令,执行业务逻辑,更新数据库中的聚合状态,然后发布一个领域事件。这个流程看似简单,但在分布式环境中却暗藏着一个致命的问题:如果在数据库事务提交成功后、事件发布到消息队列之前,服务实例发生崩溃或重启,会发生什么?
结果是灾难性的。写模型(Write Model)的状态已经改变,但本应通知读模型(Read Model)和其他下游系统的事件却永远丢失了。系统的数据进入了一种“分裂”状态,即写模型和读模型之间出现了永久性的不一致。在真实项目中,这种不一致性会直接导致业务故障和数据损坏。
传统的解决方案,例如两阶段提交(2PC)或跨数据库和消息队列的分布式事务,在基于Kubernetes的现代云原生架构中几乎是不可行的。它们过于复杂、性能低下,并且会与服务发现、弹性伸ซ缩等云原生特性产生严重冲突。我们需要一种更务实、更具韧性的方法。
本文将复盘一个在AWS EKS环境中,使用Go语言和PostgreSQL实现具备本地ACID事务保证的CQRS命令侧服务的全过程。核心思路是采用事务性发件箱(Transactional Outbox)模式,将状态变更和事件创建这两个操作,绑定在数据库的同一个本地ACID事务中。
初步构想与技术选型决策
我们的目标是确保“保存聚合状态”和“发布领域事件”这两个操作的原子性。要么都成功,要么都失败。Transactional Outbox模式的本质是将待发布的事件作为一条数据,与业务数据变更在同一个事务中持久化到数据库。随后,一个独立的、可靠的进程负责轮询这张“发件箱”表,并将事件真正地发布到消息中间件。
这个方案有几个显而易见的优势:
- 利用本地事务: 我们依赖的是数据库久经考验的ACID能力,避免了引入复杂的分布式事务协调器。
- 解耦: 命令处理的同步路径只关心核心业务逻辑和数据持久化,发布事件的逻辑被异步化,降低了请求延迟。
- 韧性: 即使在事件发布环节出现任何故障(网络问题、消息中间件不可用),由于事件已经可靠地存储在数据库中,发布器可以安全地重试,保证事件“至少一次”被传递。
基于此,技术栈的选择也变得清晰:
- 运行时环境: AWS EKS。它提供了我们需要的弹性、可观测性和部署自动化能力。
- 语言: Go。其高效的并发模型和简洁的语法非常适合构建云原生应用。我们将不依赖任何重型框架,以展示模式的本质。
- 数据库: PostgreSQL on AWS RDS。它提供了强大的ACID事务支持和成熟的JSONB类型,非常适合存储结构化的事件载荷。
- 消息中间件: AWS SQS/SNS。简单、可靠且与AWS生态系统深度集成。
步骤化实现:从数据库到Kubernetes
1. 数据库 Schema 设计
这是整个模式的基石。除了业务表(例如orders
),我们还需要一张outbox_events
表。
-- 业务聚合表:订单
CREATE TABLE orders (
id UUID PRIMARY KEY,
customer_id UUID NOT NULL,
total_amount NUMERIC(10, 2) NOT NULL,
status VARCHAR(50) NOT NULL,
version INT NOT NULL DEFAULT 1,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- 事务性发件箱表
CREATE TABLE outbox_events (
id BIGSERIAL PRIMARY KEY,
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- `processed_at` 字段用于标记事件是否已被轮询器处理和发布。
-- NULL 表示未处理,非 NULL 表示已处理。
-- 创建索引以加速轮询查询。
processed_at TIMESTAMPTZ
);
CREATE INDEX idx_outbox_events_unprocessed ON outbox_events (processed_at) WHERE processed_at IS NULL;
这里的关键是outbox_events
表。processed_at
字段是核心,我们的轮询器将只查询processed_at IS NULL
的记录。这个索引的WHERE
子句至关重要,它使得索引只包含未处理的事件,极大地提高了查询效率。
2. Go 项目结构
一个清晰的项目结构对于可维护性至关重要。
/order-service
├── cmd
│ ├── api # API服务主入口
│ └── outbox # 发件箱轮询器主入口
├── internal
│ ├── application # 应用层/命令处理器
│ ├── domain # 领域模型和事件
│ ├── infrastructure
│ │ ├── persistence # 数据库仓储实现
│ │ └── publisher # 事件发布器实现 (SQS)
│ └── outbox # 发件箱轮询器逻辑
├── api # HTTP API 定义
└── deploy # Kubernetes manifests
3. 核心代码:事务性仓储
仓储(Repository)层是实现原子性操作的核心。它的方法必须接收一个数据库事务句柄pgx.Tx
作为参数,而不是一个连接池pgx.Pool
。这确保了调用方可以将多个数据库操作组合在同一个事务中。
internal/infrastructure/persistence/order_repository.go
:
package persistence
import (
"context"
"time"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/your-org/order-service/internal/domain"
)
type OrderRepository struct{}
func NewOrderRepository() *OrderRepository {
return &OrderRepository{}
}
// SaveOrderInTx 在一个给定的数据库事务中保存订单和其领域事件。
// 这是保证原子性的关键:更新订单状态和插入发件箱事件必须在同一个事务中完成。
func (r *OrderRepository) SaveOrderInTx(ctx context.Context, tx pgx.Tx, order *domain.Order, event *domain.OutboxEvent) error {
// 1. 更新或插入订单聚合
// 使用乐观锁 (version) 来处理并发写入冲突
updateSQL := `
UPDATE orders
SET total_amount = $2, status = $3, version = version + 1, updated_at = $4
WHERE id = $1 AND version = $5`
cmdTag, err := tx.Exec(ctx, updateSQL, order.ID, order.TotalAmount, order.Status, time.Now(), order.Version)
if err != nil {
// 这里的错误处理很关键,需要区分数据库错误和乐观锁冲突
// pgx 会在连接层面处理网络错误,这里主要是SQL执行错误
return err // 外部的事务管理器会回滚
}
if cmdTag.RowsAffected() == 0 {
return domain.ErrOrderConcurrency // 自定义错误,表示版本冲突
}
order.Version++ // 更新内存中的版本号
// 2. 将领域事件插入到发件箱表
insertEventSQL := `
INSERT INTO outbox_events (aggregate_id, aggregate_type, event_type, payload)
VALUES ($1, $2, $3, $4)`
_, err = tx.Exec(ctx, insertEventSQL, event.AggregateID, event.AggregateType, event.EventType, event.Payload)
if err != nil {
return err // 同样,让外部事务管理器回滚
}
return nil
}
// FindByID 用于获取订单,通常在事务外执行
func (r *OrderRepository) FindByID(ctx context.Context, pool PGXPool, id uuid.UUID) (*domain.Order, error) {
// ... 实现查询逻辑 ...
return nil, nil
}
// PGXPool 是一个接口,方便测试时 mock
type PGXPool interface {
Begin(context.Context) (pgx.Tx, error)
// ... 其他需要的方法
}
注意 SaveOrderInTx
的设计。它强制调用者提供一个事务 pgx.Tx
,从而将事务控制权交给了上层的应用服务。函数内部执行了两个SQL操作,它们要么一起成功,要么一起失败。
4. 应用服务与命令处理器
应用服务负责编排整个流程:开始事务、调用领域逻辑、使用仓储持久化、最后提交或回滚事务。
internal/application/order_service.go
:
package application
import (
"context"
"encoding/json"
"log/slog"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/your-org/order-service/internal/domain"
"github.com/your-org/order-service/internal/infrastructure/persistence"
)
type OrderService struct {
pool *pgxpool.Pool
repo *persistence.OrderRepository
logger *slog.Logger
}
func NewOrderService(pool *pgxpool.Pool, repo *persistence.OrderRepository, logger *slog.Logger) *OrderService {
return &OrderService{pool: pool, repo: repo, logger: logger}
}
type CreateOrderCommand struct {
CustomerID uuid.UUID
TotalAmount float64
}
func (s *OrderService) CreateOrder(ctx context.Context, cmd CreateOrderCommand) (*domain.Order, error) {
// 创建新的订单聚合
order, err := domain.NewOrder(cmd.CustomerID, cmd.TotalAmount)
if err != nil {
return nil, err
}
// 准备领域事件
eventPayload, _ := json.Marshal(map[string]interface{}{
"orderId": order.ID,
"customerId": order.CustomerID,
"totalAmount": order.TotalAmount,
})
outboxEvent := &domain.OutboxEvent{
AggregateID: order.ID,
AggregateType: "Order",
EventType: "OrderCreated",
Payload: eventPayload,
}
// 开启数据库事务
tx, err := s.pool.Begin(ctx)
if err != nil {
s.logger.Error("failed to begin transaction", "error", err)
return nil, err
}
// 确保事务在函数退出时被处理
defer tx.Rollback(ctx)
// 使用事务性仓储方法
// 这是一个模拟的新建操作,实际生产中需要区分Create和Update
// 这里为了简化,直接调用 SaveOrderInTx(假设已有记录)
// 正确的实现是先 Insert Order,再 Insert Outbox Event
// err = s.repo.CreateInTx(ctx, tx, order, outboxEvent)
// 假设是更新操作
err = s.repo.SaveOrderInTx(ctx, tx, order, outboxEvent)
if err != nil {
if err == domain.ErrOrderConcurrency {
s.logger.Warn("concurrency conflict on order update", "orderId", order.ID)
} else {
s.logger.Error("failed to save order in transaction", "error",err)
}
return nil, err
}
// 提交事务
if err := tx.Commit(ctx); err != nil {
s.logger.Error("failed to commit transaction", "error", err)
return nil, err
}
s.logger.Info("order created and event saved to outbox", "orderId", order.ID)
return order, nil
}
这里的 defer tx.Rollback(ctx)
是一个非常重要的防御性编程技巧。如果在 tx.Commit(ctx)
之前发生任何 panic
或 return
,事务都会被安全地回滚。只有在所有操作成功并显式调用 Commit
后,数据库的变更才会生效。
5. 发件箱轮询器 (Outbox Processor)
这是个独立的、持续运行的组件。它可以是主应用中的一个后台goroutine,但在EKS环境中,更健壮的做法是将其部署为一个独立的Deployment。这允许我们独立地扩展和监控它。
internal/outbox/processor.go
:
package outbox
import (
"context"
"log/slog"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/your-org/order-service/internal/domain"
"github.com/your-org/order-service/internal/infrastructure/publisher"
)
type Processor struct {
pool *pgxpool.Pool
publisher publisher.EventPublisher
logger *slog.Logger
batchSize int
}
func NewProcessor(pool *pgxpool.Pool, publisher publisher.EventPublisher, logger *slog.Logger, batchSize int) *Processor {
return &Processor{pool: pool, publisher: publisher, logger: logger, batchSize: batchSize}
}
func (p *Processor) Start(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
p.logger.Info("outbox processor started")
for {
select {
case <-ctx.Done():
p.logger.Info("outbox processor shutting down")
return
case <-ticker.C:
if err := p.processBatch(ctx); err != nil {
p.logger.Error("failed to process outbox batch", "error", err)
}
}
}
}
func (p *Processor) processBatch(ctx context.Context) error {
tx, err := p.pool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
// 使用 FOR UPDATE SKIP LOCKED 实现行级锁,允许多个轮询器实例并发工作而不会处理相同的事件。
// 这是水平扩展的关键。
rows, err := tx.Query(ctx, `
SELECT id, aggregate_id, aggregate_type, event_type, payload
FROM outbox_events
WHERE processed_at IS NULL
ORDER BY created_at
LIMIT $1
FOR UPDATE SKIP LOCKED`, p.batchSize)
if err != nil {
return err
}
defer rows.Close()
var eventsToProcess []domain.OutboxEvent
var eventIDsToUpdate []int64
for rows.Next() {
var event domain.OutboxEvent
var id int64
if err := rows.Scan(&id, &event.AggregateID, &event.AggregateType, &event.EventType, &event.Payload); err != nil {
// 在真实项目中,这里应该有更完善的错误处理,可能需要跳过这条损坏的记录
p.logger.Error("failed to scan outbox event", "error", err)
continue
}
event.ID = id
eventsToProcess = append(eventsToProcess, event)
eventIDsToUpdate = append(eventIDsToUpdate, id)
}
if len(eventsToProcess) == 0 {
return nil // 没有需要处理的事件
}
p.logger.Info("processing outbox events", "count", len(eventsToProcess))
// 批量或逐条发布事件到消息队列
for _, event := range eventsToProcess {
if err := p.publisher.Publish(ctx, event); err != nil {
// 如果发布失败,由于事务会回滚,这些事件将在下一轮被重新处理。
// 这里需要配置好发布器的重试和死信队列策略。
p.logger.Error("failed to publish event", "eventId", event.ID, "error", err)
return err
}
}
// 所有事件都成功发布后,更新发件箱表,将它们标记为已处理
_, err = tx.Exec(ctx, `
UPDATE outbox_events
SET processed_at = NOW()
WHERE id = ANY($1)`, eventIDsToUpdate)
if err != nil {
return err
}
return tx.Commit(ctx)
}
FOR UPDATE SKIP LOCKED
是PostgreSQL的一个强大特性,它允许当前事务锁定选定的行,而其他并发事务如果尝试锁定相同的行,则会直接跳过它们,而不是等待。这使得我们可以安全地运行多个轮询器实例,每个实例处理自己锁定的一批事件,从而实现水平扩展。
6. 部署到 AWS EKS
我们将创建两个独立的Kubernetes Deployment。
deploy/api-deployment.yaml
:
apiVersion: apps/v1
kind: Deployment
metadata:
name: order-api
labels:
app: order-api
spec:
replicas: 3
selector:
matchLabels:
app: order-api
template:
metadata:
labels:
app: order-api
spec:
containers:
- name: order-api
image: your-account.dkr.ecr.us-east-1.amazonaws.com/order-service:latest
command: ["/app/order-api"] # 指定运行API服务的可执行文件
ports:
- containerPort: 8080
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: order-db-secret
key: url
# ... 其他环境变量
resources:
requests:
cpu: "250m"
memory: "256Mi"
limits:
cpu: "500m"
memory: "512Mi"
readinessProbe:
httpGet:
path: /healthz
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
httpGet:
path: /healthz
port: 8080
initialDelaySeconds: 15
periodSeconds: 20
deploy/outbox-processor-deployment.yaml
:
apiVersion: apps/v1
kind: Deployment
metadata:
name: outbox-processor
labels:
app: outbox-processor
spec:
replicas: 2 # 可以安全地运行多个副本
selector:
matchLabels:
app: outbox-processor
template:
metadata:
labels:
app: outbox-processor
spec:
containers:
- name: outbox-processor
image: your-account.dkr.ecr.us-east-1.amazonaws.com/order-service:latest
command: ["/app/outbox-processor"] # 指定运行轮询器的可执行文件
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: order-db-secret
key: url
- name: SQS_QUEUE_URL
valueFrom:
configMapKeyRef:
name: order-service-config
key: sqs.queue.url
- name: OUTBOX_INTERVAL_MS
value: "1000"
- name: OUTBOX_BATCH_SIZE
value: "100"
resources:
requests:
cpu: "100m"
memory: "128Mi"
limits:
cpu: "200m"
memory: "256Mi"
将这两个服务分开部署,提供了极大的灵活性。API服务的资源需求可能与CPU和内存有关,而轮询器的资源需求则与数据库IO和网络有关。我们可以根据各自的负载独立地调整它们的副本数和资源分配。
最终成果与流程图
我们成功构建了一个在EKS上运行的、具备强一致性保证的CQRS命令侧服务。即使在Pod崩溃、网络分区或消息中间件短暂不可用的情况下,系统也能保证数据状态和领域事件的最终一致性。
整个流程可以用下面的时序图清晰地展示出来:
sequenceDiagram participant Client participant API Service (EKS Pod) participant PostgreSQL (RDS) participant Outbox Processor (EKS Pod) participant SQS Client->>API Service (EKS Pod): POST /orders (CreateOrderCommand) activate API Service (EKS Pod) API Service (EKS Pod)->>PostgreSQL (RDS): BEGIN TRANSACTION activate PostgreSQL (RDS) Note over API Service (EKS Pod), PostgreSQL (RDS): 在单个ACID事务中执行 API Service (EKS Pod)->>PostgreSQL (RDS): INSERT INTO orders (...) API Service (EKS Pod)->>PostgreSQL (RDS): INSERT INTO outbox_events (...) PostgreSQL (RDS)-->>API Service (EKS Pod): Success API Service (EKS Pod)->>PostgreSQL (RDS): COMMIT deactivate PostgreSQL (RDS) API Service (EKS Pod)-->>Client: 201 Created deactivate API Service (EKS Pod) loop 每隔N秒轮询 Outbox Processor (EKS Pod)->>PostgreSQL (RDS): SELECT ... FROM outbox_events ... FOR UPDATE SKIP LOCKED activate PostgreSQL (RDS) PostgreSQL (RDS)-->>Outbox Processor (EKS Pod): [Event1, Event2, ...] deactivate PostgreSQL (RDS) alt 成功获取到事件 Outbox Processor (EKS Pod)->>SQS: Publish(Event1) activate SQS SQS-->>Outbox Processor (EKS Pod): ACK deactivate SQS Outbox Processor (EKS Pod)->>SQS: Publish(Event2) activate SQS SQS-->>Outbox Processor (EKS Pod): ACK deactivate SQS Outbox Processor (EKS Pod)->>PostgreSQL (RDS): BEGIN TRANSACTION activate PostgreSQL (RDS) Outbox Processor (EKS Pod)->>PostgreSQL (RDS): UPDATE outbox_events SET processed_at=NOW() WHERE id IN (...) Outbox Processor (EKS Pod)->>PostgreSQL (RDS): COMMIT deactivate PostgreSQL (RDS) end end
方案的局限性与未来迭代
虽然事务性发件箱模式非常强大和务实,但它并非没有权衡。
- 事件延迟: 引入了额外的延迟。事件的发布时间取决于轮询器的轮询间隔。对于需要极低延迟的场景,这可能不是最佳选择。
- 发件箱表膨胀:
outbox_events
表会持续增长。在生产环境中,必须有一个定期的清理任务(归档或删除)来处理已经被处理过的旧事件,否则会影响数据库性能。 - 轮询开销: 即使没有新事件,轮询器也会持续对数据库产生查询负载。虽然通过优化索引可以将其降到很低,但负载依然存在。
一个更高级的替代方案是使用**变更数据捕获 (Change Data Capture, CDC)**,例如通过Debezium。CDC直接监听数据库的事务日志(如PostgreSQL的WAL),并将变更实时流式传输到消息队列。这种方式对业务代码是完全透明的,延迟更低,且避免了轮询。然而,CDC方案的引入会增加架构的复杂度,需要部署和维护额外的组件(如Kafka Connect、Debezium Connector),这对团队的运维能力提出了更高的要求。对于许多项目而言,本文实现的轮询式发件箱模式是在简单性、可靠性和性能之间的一个极佳平衡点。