基于 Neo4j 与 Raft 实现 API Gateway 的图关系型分布式限流架构


在设计一个复杂的 B2B SaaS 平台的 API Gateway 时,一个简单的基于键值(Key-Value)的速率限制器,例如使用 Redis 的 INCREXPIRE,很快就会暴露出其局限性。当限流规则不再是“每个用户每秒100次请求”这样简单,而是演变为“用户A属于组织B,组织B使用了套餐C,该套餐对API资源D的总调用量每月不超过100万次,同时,组织B的任何单个用户对资源D的调用每分钟不超过1000次”这类具有层级和关联关系的复杂业务约束时,传统模型便无能为力。

这种业务规则的本质是一个图(Graph)问题。用户、组织、订阅套餐、API资源之间通过不同的关系连接,限流决策需要遍历这个图的相关路径,并对路径上的所有约束进行原子性地校验和更新。这直接将我们引向了截然不同的技术选型。

定义复杂的技术问题:图结构的限流逻辑

首先,我们需要对问题进行精确的建模。限流的实体和它们之间的关系可以被可视化为一个属性图。

graph TD
    subgraph "Entities & Quotas"
        User(User A
key: user-a-key) Org(Organization B
id: org-b) Plan(Plan C
monthly_limit: 1,000,000) API(API Resource D
id: resource-d) UserLimit(User Limit
per_minute_limit: 1000) end User -- BELONGS_TO --> Org Org -- SUBSCRIBES_TO --> Plan Plan -- APPLIES_TO --> API User -- HAS_LIMIT_ON --> API UserLimit -- DEFINES --> User style User fill:#cce5ff,stroke:#333,stroke-width:2px style Org fill:#d4edda,stroke:#333,stroke-width:2px style Plan fill:#f8d7da,stroke:#333,stroke-width:2px style API fill:#fff3cd,stroke:#333,stroke-width:2px

这个模型的挑战在于,任何一次 API 调用,我们都需要:

  1. 原子性地检查从 User 节点出发,到 API 节点所有相关路径上的配额是否充足。
  2. 如果检查通过,必须原子性地更新所有相关路径上的计数器(例如,用户的分钟计数器和组织的月度计数器)。
  3. 整个决策过程(检查+更新)必须在极低延迟内完成(通常要求在15ms以内),以免成为 API Gateway 的性能瓶颈。
  4. 在分布式环境下,多个 API Gateway 实例对同一个组织的调用必须看到一致的配额视图,避免因并发竞争导致超额分配。

方案A:基于关系型数据库(RDBMS)的尝试

一个直接的想法是使用支持事务的关系型数据库(如 PostgreSQL)来存储这些关系。我们可以设计几张表:users, organizations, plans, api_resources 以及多个连接表。

优势分析:

  • 强一致性: RDBMS 的 ACID 事务是解决原子性更新问题的标准答案。一个 BEGIN...COMMIT 块可以确保检查和更新操作的原子性。
  • 技术成熟: 团队对 SQL 和 RDBMS 的运维经验丰富,技术栈引入成本低。

劣势分析:

  • 性能灾难: 查询层级关系在 SQL 中通常依赖递归公共表表达式(Recursive CTEs)。对于我们这种深度和广度都可能变化的图,这种查询的性能非常差。一次限流检查可能需要多次 JOIN 和复杂的子查询,在高并发场景下,数据库会迅速成为瓶颈,完全无法满足 API Gateway 的延迟要求。
  • 模型僵化: 业务规则的任何变更(例如增加“团队”层级)都可能导致复杂的 Schema 变更和数据迁移,缺乏灵活性。

在真实项目中,这种方案在原型验证阶段就因性能测试不达标而被否决。对一个简单的三层关系查询,在高并发下的 P99 延迟轻易超过了100ms。

方案B:引入图数据库 Neo4j

既然问题本质是图,那么使用图数据库 Neo4j 是一个自然的选择。它使用原生的图存储,对于遍历关系的查询具有无与伦比的性能优势。

数据模型定义 (Cypher DDL):

// Indexes for fast lookups
CREATE CONSTRAINT FOR (u:User) REQUIRE u.apiKey IS UNIQUE;
CREATE INDEX FOR (o:Organization) ON (o.orgId);
CREATE INDEX FOR (p:Plan) ON (p.planId);
CREATE INDEX FOR (r:ApiResource) ON (r.resourceId);

// Example data setup
CREATE (org:Organization {orgId: 'org-b', name: 'Org B Inc.'});
CREATE (user:User {apiKey: 'user-a-key-secret', name: 'User A', minuteCounter: 0, minuteTimestamp: 0});
CREATE (plan:Plan {planId: 'plan-c', monthlyLimit: 1000000, monthlyCounter: 0, monthlyTimestamp: 0});
CREATE (api:ApiResource {resourceId: 'resource-d', name: 'Data Processing API'});

MATCH (u:User {apiKey: 'user-a-key-secret'}), (o:Organization {orgId: 'org-b'})
CREATE (u)-[:BELONGS_TO]->(o);

MATCH (o:Organization {orgId: 'org-b'}), (p:Plan {planId: 'plan-c'})
CREATE (o)-[:SUBSCRIBES_TO]->(p);

MATCH (p:Plan {planId: 'plan-c'}), (api:ApiResource {resourceId: 'resource-d'})
CREATE (p)-[:APPLIES_TO]->(api);

限流检查查询 (Cypher):

这个查询必须在一个事务内完成检查和更新。

// Parameters: $apiKey, $resourceId, $currentMinute, $currentMonth
MATCH (user:User {apiKey: $apiKey})-[_b:BELONGS_TO]->(org:Organization)-[_s:SUBSCRIBES_TO]->(plan:Plan)-[_a:APPLIES_TO]->(api:ApiResource {resourceId: $resourceId})

// --- Reset counters if time window has passed ---
SET user.minuteCounter = CASE WHEN user.minuteTimestamp <> $currentMinute THEN 0 ELSE user.minuteCounter END
SET user.minuteTimestamp = $currentMinute
SET plan.monthlyCounter = CASE WHEN plan.monthlyTimestamp <> $currentMonth THEN 0 ELSE plan.monthlyCounter END
SET plan.monthlyTimestamp = $currentMonth

// --- Check all limits in the path ---
WITH user, plan,
     (user.minuteCounter < 1000) AS userLimitOk,
     (plan.monthlyCounter < plan.monthlyLimit) AS planLimitOk

// If all limits are OK, proceed to increment
WHERE userLimitOk AND planLimitOk
SET user.minuteCounter = user.minuteCounter + 1
SET plan.monthlyCounter = plan.monthlyCounter + 1

RETURN true AS allowed

优势分析:

  • 模型匹配: 数据模型与业务问题高度一致,直观且易于扩展。
  • 查询性能: 对于深度遍历,Neo4j 的性能远超 RDBMS。上述查询在索引支持下,通常可以在几个毫秒内完成。

劣势与新的挑战:
这个方案看似完美,但它引入了一个致命的新问题:分布式一致性。API Gateway 是无状态且水平扩展的。假设我们有10个 Gateway 实例,它们都在高并发地处理来自 org-b 的请求。

  1. 竞争条件: 两个 Gateway 实例可能同时读取到 plan.monthlyCounter999999。它们都认为配额充足,然后都执行了 +1 操作。由于 Neo4j 事务的隔离级别,最终的结果可能是 1000001,导致配额超发。虽然数据库事务能保证单次更新的原子性,但“读-改-写”的整个流程在应用层不是原子的。我们需要数据库级别的行锁或更强的并发控制。
  2. 性能瓶颈转移: 虽然查询快,但现在每次请求都需要与 Neo4j 进行一次网络通信并执行一个写事务。这依然是同步阻塞操作,在高吞吐量下,数据库的连接数和写性能会成为新的瓶颈。

单纯地将 Gateway 直连 Neo4j 只是将问题从“查询性能”转移到了“分布式写一致性与吞吐量”上。

最终选择:引入 Raft 协议构建一致性状态机

为了解决上述问题,我们决定将限流逻辑抽象为一个独立的、高可用的分布式状态机服务,而这个状态机的核心就是 Raft 一致性协议。

架构设计:

sequenceDiagram
    participant Client
    participant Gateway as API Gateway Instance 1
    participant GatewayN as API Gateway Instance N
    participant RaftLeader as Raft Leader Node
    participant RaftFollower as Raft Follower Node
    participant Neo4j as Neo4j Database

    Client->>Gateway: API Request (apiKey: user-a-key)
    Gateway->>RaftLeader: Propose Command: {Increment, user-a-key, resource-d}
    RaftLeader->>RaftFollower: AppendEntries RPC
    RaftFollower-->>RaftLeader: Acknowledge
    Note over RaftLeader,RaftFollower: Quorum reached, command committed.
    RaftLeader->>Neo4j: Execute transactional Cypher query
    Neo4j-->>RaftLeader: Query Result (Allowed: true)
    RaftLeader-->>Gateway: Command Result (Allowed: true)
    Gateway->>Client: Forward request to upstream

工作流解析:

  1. API Gateway 无状态化: Gateway 自身不再直接与 Neo4j 通信。它只做一件事:将限流请求转化为一个命令(Command),并将其发送给 Raft 集群的 Leader 节点。
  2. Raft 集群: 我们部署一个小的(3或5节点)Raft 集群。这个集群维护一个复制状态机(Replicated State Machine)
  3. 日志复制: Leader 接收到来自 Gateway 的命令(例如 Increment(user-a-key, resource-d))后,将其作为一条日志条目(Log Entry)写入自己的日志,并并发地发送给所有 Follower。
  4. 提交与应用: 当 Leader 收到大多数 Follower 的确认后,该日志条目被视为已提交(Committed)。此时,Raft 保证了这条日志在所有节点上最终都是一致且顺序相同的。Leader 节点上的状态机应用(Apply)这条日志。
  5. 状态机实现: 我们的状态机就是与 Neo4j 交互的逻辑。当应用日志时,它会执行之前设计的那个事务性 Cypher 查询。因为所有命令都在 Leader 上被序列化执行,这就从根本上解决了并发写的竞争问题。所有对配额的修改都通过一个单点的、高可用的、顺序化的日志来驱动。
  6. 结果返回: 状态机执行完 Neo4j 查询后,将结果(允许或拒绝)返回给 Gateway。

核心实现代码(使用 Go 和 hashicorp/raft 库)

这是一个高度简化的核心逻辑,展示了状态机如何与 Neo4j 驱动交互。

1. 定义状态机 (FSM - Finite State Machine)

package ratelimiter

import (
	"encoding/json"
	"fmt"
	"io"
	"sync"
	"time"

	"github.com/hashicorp/raft"
	"github.com/neo4j/neo4j-go-driver/v4/neo4j"
)

// Command represents a single operation to be applied to the state machine.
type Command struct {
	Op         string `json:"op"`
	ApiKey     string `json:"api_key"`
	ResourceID string `json:"resource_id"`
}

// Result represents the outcome of applying a command.
type ApplyResult struct {
	Allowed bool  `json:"allowed"`
	Error   error `json:"-"`
}

// fsm is our Raft state machine. It applies commands to Neo4j.
type fsm struct {
	mu         sync.Mutex
	driver     neo4j.Driver
	logger     *log.Logger // Assume a structured logger
}

func NewFSM(driver neo4j.Driver, logger *log.Logger) *fsm {
	return &fsm{
		driver: driver,
		logger: logger,
	}
}

// Apply applies a Raft log entry to the state machine.
// This is the core of our consistency model.
func (f *fsm) Apply(log *raft.Log) interface{} {
	f.mu.Lock()
	defer f.mu.Unlock()

	var cmd Command
	if err := json.Unmarshal(log.Data, &cmd); err != nil {
		f.logger.Error("Failed to unmarshal command", "error", err)
		return &ApplyResult{Allowed: false, Error: fmt.Errorf("invalid command format")}
	}

	f.logger.Info("Applying command", "op", cmd.Op, "apiKey", cmd.ApiKey)

	switch cmd.Op {
	case "increment":
		return f.handleIncrement(cmd)
	default:
		return &ApplyResult{Allowed: false, Error: fmt.Errorf("unsupported command: %s", cmd.Op)}
	}
}

func (f *fsm) handleIncrement(cmd Command) *ApplyResult {
	session := f.driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
	defer session.Close()

	now := time.Now().UTC()
	currentMinute := now.Format("2006-01-02T15:04")
	currentMonth := now.Format("2006-01")

	params := map[string]interface{}{
		"apiKey":        cmd.ApiKey,
		"resourceId":    cmd.ResourceID,
		"currentMinute": currentMinute,
		"currentMonth":  currentMonth,
	}

	// This is where we execute the transactional Cypher query.
	// The Raft protocol ensures this method is called serially on the leader,
	// eliminating race conditions.
	result, err := session.WriteTransaction(func(tx neo4j.Transaction) (interface{}, error) {
		res, err := tx.Run(getRateLimitUpdateQuery(), params)
		if err != nil {
			return nil, err
		}

		if res.Next() {
			record := res.Record()
			allowed, ok := record.Get("allowed")
			if ok {
				return allowed.(bool), nil
			}
		}
		// If query returns no rows, it means one of the MATCH clauses failed,
		// e.g., invalid API key. We should deny access.
		return false, res.Err()
	})

	if err != nil {
		f.logger.Error("Neo4j transaction failed", "error", err, "apiKey", cmd.ApiKey)
		return &ApplyResult{Allowed: false, Error: err}
	}

	allowed := result.(bool)
	f.logger.Info("Command applied successfully", "allowed", allowed, "apiKey", cmd.ApiKey)
	return &ApplyResult{Allowed: allowed, Error: nil}
}

// getRateLimitUpdateQuery returns the complex Cypher query string.
func getRateLimitUpdateQuery() string {
    // Return the Cypher query from the previous section
	return `
		MATCH (user:User {apiKey: $apiKey})-[_b:BELONGS_TO]->(org:Organization)-[_s:SUBSCRIBES_TO]->(plan:Plan)-[_a:APPLIES_TO]->(api:ApiResource {resourceId: $resourceId})
		SET user.minuteCounter = CASE WHEN user.minuteTimestamp <> $currentMinute THEN 0 ELSE user.minuteCounter END,
		    user.minuteTimestamp = $currentMinute,
		    plan.monthlyCounter = CASE WHEN plan.monthlyTimestamp <> $currentMonth THEN 0 ELSE plan.monthlyCounter END,
		    plan.monthlyTimestamp = $currentMonth
		WITH user, plan,
		     (user.minuteCounter < 1000) AS userLimitOk,
		     (plan.monthlyCounter < plan.monthlyLimit) AS planLimitOk
		WHERE userLimitOk AND planLimitOk
		SET user.minuteCounter = user.minuteCounter + 1,
		    plan.monthlyCounter = plan.monthlyCounter + 1
		RETURN true AS allowed
	`
}

// Snapshot is used for log compaction. Not implemented here for brevity.
func (f *fsm) Snapshot() (raft.FSMSnapshot, error) {
	return &fsmSnapshot{}, nil
}

// Restore restores the FSM from a snapshot. Not implemented.
func (f *fsm) Restore(rc io.ReadCloser) error {
	return nil
}

type fsmSnapshot struct{}
func (s *fsmSnapshot) Persist(sink raft.SnapshotSink) error { return sink.Close() }
func (s *fsmSnapshot) Release() {}

2. API Gateway Client Logic

package gateway

import (
	"encoding/json"
	"time"

	"github.com/hashicorp/raft"
	"project/ratelimiter"
)

type Client struct {
	raftNode *raft.Raft
}

func (c *Client) CheckAndIncrement(apiKey, resourceID string) (bool, error) {
	if c.raftNode.State() != raft.Leader {
		// In a real system, you'd have a mechanism to find the leader.
		// For example, by querying the Raft peers.
		return false, fmt.Errorf("not the raft leader")
	}

	cmd := ratelimiter.Command{
		Op:         "increment",
		ApiKey:     apiKey,
		ResourceID: resourceID,
	}

	cmdBytes, err := json.Marshal(cmd)
	if err != nil {
		return false, err
	}

	// Apply the command to the Raft log. This is a blocking call.
	future := c.raftNode.Apply(cmdBytes, 500*time.Millisecond)
	if err := future.Error(); err != nil {
		return false, err
	}

	// The response from future.Response() is the interface{} returned by fsm.Apply().
	response, ok := future.Response().(*ratelimiter.ApplyResult)
	if !ok {
		return false, fmt.Errorf("unexpected response type from raft")
	}

	if response.Error != nil {
		return false, response.Error
	}

	return response.Allowed, nil
}

架构的扩展性与局限性

这个架构通过将一致性问题委托给 Raft 协议,将复杂的图查询交给 Neo4j,实现了关注点分离。

扩展性:

  • 规则动态性: 增加新的限流维度(如团队、项目)只需要修改 Neo4j 的图模型和 Cypher 查询,Raft 和 Gateway 的核心逻辑完全不受影响。
  • 性能优化: Gateway 可以增加一个本地缓存(如 Caffeine 或 a Guava Cache)来缓存允许通过的结果,有效期为几秒钟。这可以极大地减少对 Raft 集群的请求压力,只有在缓存未命中或过期时才需要走分布式一致性路径。这种模式被称为“Lease Read”,可以在牺牲微小时间窗口内的精确性的前提下,大幅提升系统吞吐量。

局限性与适用边界:

  • 引入的延迟: 核心瓶颈在于 raft.Apply 的耗时。它包括:一次网络往返到 Leader,Leader 并行地与 Follower 通信,一次到 Neo4j 的写事务。这个延迟通常在 5ms 到 20ms 之间,取决于网络条件和 Raft/Neo4j 的部署。这对于某些需要亚毫秒级响应的场景是不可接受的。
  • 运维复杂性: 维护一个独立的、高可用的 Raft 集群和 Neo4j 集群,相比于简单的 Redis 方案,运维成本呈指数级增长。这包括监控、备份、故障恢复、版本升级等一系列复杂工作。
  • 适用场景: 此架构的复杂性和成本决定了它仅适用于那些限流规则极其复杂、具有内在图结构、并且对配额一致性要求极高的场景,例如金融交易、电信计费或复杂的云服务配额管理。对于绝大多数应用而言,更简单的解决方案往往是更务实的选择。

  目录