在 Ruby on Rails 中利用 etcd 构建管理异构 AI 知识源的动态插件架构


团队在敏捷开发模式下推进一个复杂的 RAG (检索增强生成) 系统时,我们很快撞上了一堵墙。最初系统只对接了 ChromaDB,用于处理非结构化文本的向量检索。但随着业务迭代,产品经理要求引入新的知识源:一个用于分析复杂实体关系的图数据库。在传统的 Rails 架构中,这意味着修改配置文件、添加新的初始化器、编写新的服务类,最后走完一整套 CI/CD 流程进行部署。整个周期耗时数天,完全违背了敏捷“快速验证”的核心思想。更糟糕的是,每次微调知识源的配置(例如,更换 ChromaDB 的集合或调整图数据库的连接池大小)都需要一次完整的重新部署。这不仅效率低下,而且风险极高。

痛点非常明确:我们的单体 Rails 应用缺乏一个动态管理其核心组件(知识源)的能力。我们需要一种机制,允许我们在运行时动态地加载、卸去和重配置这些知识源,而无需触碰应用代码或重启服务。这本质上是要求我们为 Rails 应用构建一个微内核与插件化的架构,并为其配备一个可靠的分布式控制平面。

初步构想与技术选型

我们的目标是实现一个 KnowledgeSource 的插件系统。每个插件都是一个独立的 Ruby模块,负责与一种特定的数据存储(如 ChromaDB, Neo4j, Elasticsearch 等)进行交互。应用的核心逻辑不直接依赖任何具体的插件实现,而是通过一个中心化的 PluginManager 来与当前已激活的插件进行交互。

关键问题在于,如何让分布在多个 Pod 或服务器上的 Rails 实例就“哪些插件是激活的”以及“它们的配置是什么”达成共识?

  1. 数据库轮询? 这是最简单的想法。在数据库中建一张 plugins 表,Rails 实例定期去查询。但这会产生大量无效的数据库请求,实时性也差,配置变更的感知延迟很高。在生产环境中,这是一种粗暴且低效的方案。
  2. Redis Pub/Sub? 当配置变更时,通过 Redis发布一个消息,所有 Rails 实例订阅并作出反应。这比轮询好,但存在可靠性问题。如果一个实例在发布消息时短暂离线,它将永久错过这次更新。它缺乏状态存储,无法让新启动的实例知道当前的“最终状态”。
  3. etcd 作为控制平面? 这是最吸引我们的方案。etcd 是一个为分布式系统设计的强一致性键值存储。它最关键的特性是 watch 机制。客户端可以“监视”一个键或一个目录(前缀),当其发生变化时,etcd 会立即通知客户端。这完美契合我们的需求:
    • 状态存储: etcd 持久化存储了所有插件的配置和启用状态。新实例启动时,只需从 etcd 读取一次,就能获得完整的当前状态。
    • 实时通知: watch 机制保证了配置变更能以极低的延迟推送到所有实例。
    • 强一致性: 基于 Raft 协议,etcd 保证了所有 Rails 实例看到的数据视图是强一致的,避免了因配置不统一导致的行为异常。

决策很明确:我们将使用 Rails 作为应用框架,实现一个插件管理器,并利用 etcd 作为这个分布式插件系统的控制平面。

架构设计

整体架构可以用下图清晰地表示:

graph TD
    subgraph "控制层 (Control Plane)"
        A[运维人员/CI脚本] -- "etcdctl put/del" --> B[etcd Cluster]
    end

    subgraph "应用层 (Application Plane) - Rails Pod/Instance 1"
        B -- "Watch Events (PUT, DELETE)" --> C1[PluginManager Watcher Thread]
        C1 -- "通知更新" --> D1[PluginManager]
        D1 -- "加载/卸去/重配" --> E1[Plugin Chunks]
        subgraph E1
            P1[ChromaDB Plugin]
            P2[GraphDB Plugin]
        end
        F1[Rails Application Core] -- "query(source, params)" --> D1
    end

    subgraph "应用层 (Application Plane) - Rails Pod/Instance 2"
        B -- "Watch Events (PUT, DELETE)" --> C2[PluginManager Watcher Thread]
        C2 -- "通知更新" --> D2[PluginManager]
        D2 -- "加载/卸去/重配" --> E2[Plugin Chunks]
        subgraph E2
            P3[ChromaDB Plugin]
            P4[GraphDB Plugin]
        end
        F2[Rails Application Core] -- "query(source, params)" --> D2
    end

    G[ChromaDB]
    H[Neo4j GraphDB]

    P1 & P3 --> G
    P2 & P4 --> H

数据流和控制流如下:

  1. 控制流: 运维人员通过 etcdctl 命令行工具向 etcd 集群写入或删除插件配置。例如,写入 /app/plugins/chroma_main 这个 key 来启用 ChromaDB 插件。
  2. 通知流: Rails 应用中的 PluginManager 会启动一个后台线程,持续 watch etcd 中 /app/plugins/ 这个前缀。当有任何 key 发生变化,etcd 会将事件推送给所有 watcher。
  3. 执行流: Watcher 线程收到事件后,解析出是哪个插件以及发生了什么变化(新增、更新、删除),然后通知主 PluginManager 执行相应的操作:加载新插件、重新配置现有插件或卸载插件。
  4. 数据流: Rails 的业务逻辑需要查询知识源时,它不直接调用具体的插件代码,而是调用 PluginManager.query(:chroma_main, {...})PluginManager 负责将请求路由到正确、且已激活的插件实例上。

步骤化实现

1. 定义插件接口规范

为了让 PluginManager 能够统一管理所有插件,我们首先需要定义一个标准的接口。在 Ruby 中,使用 module 来定义这种行为契约是自然的选择。

app/plugins/knowledge_source.rb:

# frozen_string_literal: true

# 所有知识源插件都必须包含此模块,并实现其定义的方法。
# 这为 PluginManager 提供了一个统一的交互契约。
module KnowledgeSource
  # @!method setup(config)
  #   使用从 etcd 获取的配置来初始化插件。
  #   这个方法必须是幂等的,可以被重复调用以应用新配置。
  #   例如,建立数据库连接池,初始化客户端等。
  #   @param config [Hash] 插件的具体配置
  #   @return [void]
  #
  # @!method query(params)
  #   执行查询的核心方法。
  #   @param params [Hash] 查询所需的参数
  #   @return [Hash] 结构化的查询结果
  #
  # @!method health_check
  #   检查插件所依赖的外部服务的健康状况。
  #   @return [Hash] 包含状态和可选信息的哈希,例如: { status: 'ok' }
  #
  # @!method teardown
  #   优雅地关闭插件,释放资源。
  #   例如,断开数据库连接。
  #   @return [void]
  def self.included(base)
    base.extend(ClassMethods)
  end

  module ClassMethods
    def setup(config)
      raise NotImplementedError, "#{self.name} must implement the 'setup' class method."
    end

    def query(params)
      raise NotImplementedError, "#{self.name} must implement the 'query' class method."
    end

    def health_check
      raise NotImplementedError, "#{self.name} must implement the 'health_check' class method."
    end

    def teardown
      raise NotImplementedError, "#{self.name} must implement the 'teardown' class method."
    end
  end
end

这里的关键是定义了 setup, query, health_check, teardown 四个核心方法,构成了插件的生命周期。

2. 实现一个具体的 ChromaDB 插件

接下来,我们基于这个接口实现一个 ChromaDB 插件。

app/plugins/chroma_plugin.rb:

# frozen_string_literal: true

require 'chroma-db'
require_relative 'knowledge_source'

module ChromaPlugin
  include KnowledgeSource

  # 使用 concurrent-ruby 的 atomics 来确保线程安全
  @client = Concurrent::AtomicReference.new(nil)
  @collection = Concurrent::AtomicReference.new(nil)
  @logger = Rails.logger

  class << self
    def setup(config)
      @logger.info("[ChromaPlugin] Setting up with config: #{config.inspect}")
      
      # 参数校验是生产级代码的必要部分
      url = config['url']
      collection_name = config['collection_name']
      unless url && collection_name
        @logger.error("[ChromaPlugin] Missing required config: 'url' or 'collection_name'.")
        return false
      end
      
      # 优雅地关闭旧连接
      teardown
      
      begin
        client = ChromaDB::Client.new(url)
        # ChromaDB Ruby 客户端的 get_or_create_collection 是幂等的
        collection = client.get_or_create_collection(collection_name)
        
        @client.set(client)
        @collection.set(collection)
        @logger.info("[ChromaPlugin] Setup successful. Connected to #{url}, collection: #{collection_name}.")
        true
      rescue StandardError => e
        @logger.error("[ChromaPlugin] Failed to setup: #{e.message}")
        @client.set(nil)
        @collection.set(nil)
        false
      end
    end

    def query(params)
      collection_instance = @collection.get
      unless collection_instance
        return { error: 'ChromaPlugin is not properly configured or connected.' }
      end

      query_texts = params[:query_texts]
      n_results = params.fetch(:n_results, 5)

      # 确保输入是有效的
      return { error: 'query_texts must be an array of strings.' } unless query_texts.is_a?(Array)

      begin
        results = collection_instance.query(query_texts: query_texts, n_results: n_results)
        { data: results }
      rescue StandardError => e
        @logger.error("[ChromaPlugin] Query failed: #{e.message}")
        { error: "Query failed due to an internal error: #{e.class.name}" }
      end
    end

    def health_check
      client_instance = @client.get
      if client_instance.nil?
        return { status: 'down', reason: 'Not initialized.' }
      end

      begin
        # 心跳检查是一种常见的健康检查方式
        heartbeat = client_instance.heartbeat
        if heartbeat > 0 # 简单的检查,实际可以更复杂
          { status: 'ok', heartbeat: heartbeat }
        else
          { status: 'down', reason: 'Heartbeat check failed.' }
        end
      rescue StandardError => e
        { status: 'down', reason: e.message }
      end
    end

    def teardown
      client_instance = @client.get
      if client_instance
        @logger.info("[ChromaPlugin] Tearing down connection.")
        # Ruby 客户端没有显式的 close 方法,所以我们只需置空引用
        # 如果是数据库连接池,这里会执行 pool.disconnect!
        @client.set(nil)
        @collection.set(nil)
      end
    end
  end
end

这个实现有几个关键点:

  • 线程安全: 使用 Concurrent::AtomicReference 来存储客户端和集合实例,确保在多线程环境下(如 Puma 服务器)的读写安全。
  • 幂等性: setup 方法在被重复调用时,能够正确地处理旧连接,并建立新连接,这对于动态重配置至关重要。
  • 健壮性: 包含了详细的日志、参数校验和异常处理。这是区分玩具代码和生产代码的核心。

3. 构建核心 PluginManager 和 etcd Watcher

这是整个系统的核心,负责与 etcd 通信并管理插件生命周期。

config/initializers/plugin_manager.rb:

# frozen_string_literal: true

require 'etcdv3'
require 'concurrent'

# PluginManager 是一个单例,负责管理所有知识源插件的生命周期。
# 它在应用启动时初始化,并启动一个后台线程来监听 etcd 的变化。
module PluginManager
  extend self

  # 使用线程安全的 Concurrent::Hash 来存储插件实例
  @plugins = Concurrent::Hash.new
  @logger = Rails.logger
  @etcd_prefix = '/app/plugins/'

  def start
    @logger.info('[PluginManager] Starting...')
    @etcd = Etcdv3.new(endpoints: ENV.fetch('ETCD_ENDPOINTS', 'http://localhost:2379'))
    
    # 1. 初始加载
    initial_load

    # 2. 启动后台 watcher 线程
    start_watcher
    
    # 3. 注册应用退出时的清理钩子
    at_exit { shutdown }
    
    @logger.info('[PluginManager] Started successfully.')
  end

  def query(plugin_id, params)
    plugin_module = @plugins[plugin_id.to_sym]
    if plugin_module
      plugin_module.query(params)
    else
      { error: "Plugin '#{plugin_id}' not found or not enabled." }
    end
  end

  def status
    @plugins.keys.each_with_object({}) do |id, memo|
      memo[id] = @plugins[id].health_check
    end
  end

  private

  def initial_load
    @logger.info('[PluginManager] Performing initial load from etcd...')
    begin
      response = @etcd.get(@etcd_prefix, prefix: true)
      response.kvs.each do |kv|
        handle_put_event(kv)
      end
    rescue GRPC::Unavailable => e
      @logger.error("[PluginManager] Initial load failed. Cannot connect to etcd: #{e.message}. The application might not function correctly.")
    end
  end

  def start_watcher
    Thread.new do
      loop do
        @logger.info('[PluginManager Watcher] Starting to watch for changes...')
        begin
          @etcd.watch(@etcd_prefix, prefix: true) do |event|
            # event 是一个 Etcdserverpb::WatchResponse 对象
            event.events.each do |e|
              case e.type
              when :PUT
                @logger.info("[PluginManager Watcher] Received PUT event for key: #{e.kv.key.force_encoding('UTF-8')}")
                handle_put_event(e.kv)
              when :DELETE
                @logger.info("[PluginManager Watcher] Received DELETE event for key: #{e.kv.key.force_encoding('UTF-8')}")
                handle_delete_event(e.kv)
              end
            end
          end
        rescue GRPC::Unavailable => e
          @logger.error("[PluginManager Watcher] Watch connection to etcd lost: #{e.message}. Retrying in 5 seconds...")
          sleep 5
        rescue StandardError => e
          @logger.error("[PluginManager Watcher] An unexpected error occurred: #{e.message}. Backtrace: #{e.backtrace.join("\n")}")
          sleep 5 # 避免错误导致 CPU 100%
        end
      end
    end
  end
  
  def handle_put_event(kv)
    plugin_id = extract_plugin_id(kv.key)
    return unless plugin_id

    begin
      config = JSON.parse(kv.value)
      plugin_class_name = config['plugin_class']
      enabled = config.fetch('enabled', false)

      # 确保类存在且符合规范
      plugin_module = plugin_class_name.safe_constantize
      unless plugin_module && plugin_module.is_a?(Module) && plugin_module.singleton_class.ancestors.include?(KnowledgeSource::ClassMethods)
          @logger.error("[PluginManager] Invalid plugin class '#{plugin_class_name}' for id '#{plugin_id}'.")
          return
      end

      if enabled
        @logger.info("[PluginManager] Enabling/Updating plugin '#{plugin_id}'...")
        if plugin_module.setup(config['config'] || {})
          @plugins[plugin_id] = plugin_module
          @logger.info("[PluginManager] Plugin '#{plugin_id}' is now active.")
        else
          @logger.error("[PluginManager] Failed to setup plugin '#{plugin_id}'. It will be disabled.")
          unload_plugin(plugin_id) # 确保设置失败的插件被清理
        end
      else
        @logger.info("[PluginManager] Disabling plugin '#{plugin_id}' as per configuration.")
        unload_plugin(plugin_id)
      end
    rescue JSON::ParserError
      @logger.error("[PluginManager] Invalid JSON config for key: #{kv.key}")
    rescue StandardError => e
      @logger.error("[PluginManager] Error processing PUT event for '#{plugin_id}': #{e.message}")
    end
  end

  def handle_delete_event(kv)
    plugin_id = extract_plugin_id(kv.key)
    return unless plugin_id

    @logger.info("[PluginManager] Unloading plugin '#{plugin_id}' due to key deletion.")
    unload_plugin(plugin_id)
  end

  def unload_plugin(plugin_id)
    plugin_module = @plugins.delete(plugin_id)
    if plugin_module
      plugin_module.teardown
      @logger.info("[PluginManager] Plugin '#{plugin_id}' has been successfully unloaded.")
    end
  end
  
  def extract_plugin_id(key)
    key.delete_prefix(@etcd_prefix).to_sym
  end

  def shutdown
    @logger.info('[PluginManager] Shutting down all plugins...')
    @plugins.keys.each { |id| unload_plugin(id) }
  end
end

# 启动管理器
PluginManager.start

这份代码实现了整个系统的控制逻辑:

  • 启动流程: 在 Rails 初始化时启动,首先从 etcd 全量拉取一次现有配置,然后启动一个独立的 watch 线程。
  • Watch 循环: 这个循环是持久的。如果与 etcd 的连接断开,它会进入重试逻辑,保证了连接的韧性。
  • 事件处理: handle_put_eventhandle_delete_event 是核心。它们解析 etcd 事件,动态加载/卸载 Ruby 模块,并调用其生命周期方法。safe_constantize 用于安全地将字符串转换为类常量,防止代码注入风险。
  • 资源清理: at_exit 钩子确保了在应用正常退出时,所有插件都能被优雅地关闭,释放数据库连接等资源。

4. 实践操作:动态管理插件

现在,整个系统已经就位。我们可以通过 etcdctl 来实时操控应用的行为了。

1. 启用 ChromaDB 插件

创建一个 JSON 配置文件 chroma_config.json:

{
  "enabled": true,
  "plugin_class": "ChromaPlugin",
  "config": {
    "url": "http://chromadb:8000",
    "collection_name": "production_documents"
  }
}

然后通过 etcdctl 写入:

# 我们将这个配置写入到 /app/plugins/chroma_prod 这个 key 下
# 'chroma_prod' 将成为这个插件实例的唯一 ID
etcdctl put /app/plugins/chroma_prod "$(cat chroma_config.json)"

几乎在命令执行成功的瞬间,所有正在运行的 Rails 实例的日志中都会出现类似下面的输出:

I, [2023-10-27T10:35:01.123456 #1]  INFO -- : [PluginManager Watcher] Received PUT event for key: /app/plugins/chroma_prod
I, [2023-10-27T10:35:01.123500 #1]  INFO -- : [PluginManager] Enabling/Updating plugin 'chroma_prod'...
I, [2023-10-27T10:35:01.123600 #1]  INFO -- : [ChromaPlugin] Setting up with config: {"url"=>"http://chromadb:8000", "collection_name"=>"production_documents"}
I, [2023-10-27T10:35:01.250000 #1]  INFO -- : [ChromaPlugin] Setup successful. Connected to http://chromadb:8000, collection: production_documents.
I, [2023-10-27T10:35:01.250100 #1]  INFO -- : [PluginManager] Plugin 'chroma_prod' is now active.

此时,在 Rails console 中执行 PluginManager.query(:chroma_prod, query_texts: ["some query"]) 就能获得查询结果。

2. 动态修改配置

假设我们需要将 ChromaDB 指向一个新的集合。只需修改 JSON 文件:

{
  "enabled": true,
  "plugin_class": "ChromaPlugin",
  "config": {
    "url": "http://chromadb:8000",
    "collection_name": "new_experimental_collection"
  }
}

然后再次执行 put 命令覆盖原有的 key:

etcdctl put /app/plugins/chroma_prod "$(cat chroma_config.json)"

Rails 应用会再次收到 PUT 事件,PluginManager 会调用 ChromaPlugin.setup,插件内部逻辑会先 teardown 旧连接,再用新配置建立新连接,整个过程应用无感知、无中断。

3. 禁用或删除插件

要禁用一个插件,可以将 enabled 设为 false,或者直接删除 etcd 中的 key。

# 方式一:禁用
etcdctl put /app/plugins/chroma_prod '{"enabled": false, "plugin_class": "ChromaPlugin"}'

# 方式二:直接删除
etcdctl del /app/plugins/chroma_prod

无论哪种方式,Rails 实例都会收到相应的事件,并调用 unload_plugin 方法,执行 ChromaPlugin.teardown 并从 @plugins 哈希中移除该插件。之后任何对 :chroma_prodquery 调用都会失败。

遗留问题与未来迭代路径

这个架构成功地解决了我们最初的痛点,极大地提升了敏捷开发的效率和生产环境的灵活性。但它并非没有代价和局限性。

首先,系统的复杂度显著增加了。我们引入了 etcd 这个新的基础设施组件,需要专业的运维来保证其高可用性。整个团队也需要理解这种事件驱动、动态配置的模式,对开发和调试提出了更高的要求。

其次,当前的插件加载机制是基于 Ruby 的动态特性,但对于包含复杂依赖或需要编译原生扩展的 Gem 的插件,动态加载可能会遇到问题。一个更鲁棒的方案可能需要将插件打包成独立的 Gem,并通过某种机制来管理它们的依赖。

再者,安全性是一个重要的考量。目前任何能访问 etcd 的人都可以修改应用的行为。在生产环境中,必须为 etcd 的 /app/plugins/ 前缀配置严格的 ACLs,并对写入的配置值进行更严格的 Schema 校验,防止恶意或错误的配置导致应用崩溃。

未来的一个迭代方向是,可以围绕这个控制平面构建一个简单的管理后台 UI,让产品或运营人员也能以安全、可控的方式调整知识源配置,实现真正的业务驱动配置变更。另一个方向是探索将此模式扩展到应用的其他方面,比如动态管理后台任务的执行策略、A/B 测试的流量分配等,让整个 Rails 应用变得更加“活”起来。


  目录