团队在敏捷开发模式下推进一个复杂的 RAG (检索增强生成) 系统时,我们很快撞上了一堵墙。最初系统只对接了 ChromaDB,用于处理非结构化文本的向量检索。但随着业务迭代,产品经理要求引入新的知识源:一个用于分析复杂实体关系的图数据库。在传统的 Rails 架构中,这意味着修改配置文件、添加新的初始化器、编写新的服务类,最后走完一整套 CI/CD 流程进行部署。整个周期耗时数天,完全违背了敏捷“快速验证”的核心思想。更糟糕的是,每次微调知识源的配置(例如,更换 ChromaDB 的集合或调整图数据库的连接池大小)都需要一次完整的重新部署。这不仅效率低下,而且风险极高。
痛点非常明确:我们的单体 Rails 应用缺乏一个动态管理其核心组件(知识源)的能力。我们需要一种机制,允许我们在运行时动态地加载、卸去和重配置这些知识源,而无需触碰应用代码或重启服务。这本质上是要求我们为 Rails 应用构建一个微内核与插件化的架构,并为其配备一个可靠的分布式控制平面。
初步构想与技术选型
我们的目标是实现一个 KnowledgeSource
的插件系统。每个插件都是一个独立的 Ruby模块,负责与一种特定的数据存储(如 ChromaDB, Neo4j, Elasticsearch 等)进行交互。应用的核心逻辑不直接依赖任何具体的插件实现,而是通过一个中心化的 PluginManager
来与当前已激活的插件进行交互。
关键问题在于,如何让分布在多个 Pod 或服务器上的 Rails 实例就“哪些插件是激活的”以及“它们的配置是什么”达成共识?
- 数据库轮询? 这是最简单的想法。在数据库中建一张
plugins
表,Rails 实例定期去查询。但这会产生大量无效的数据库请求,实时性也差,配置变更的感知延迟很高。在生产环境中,这是一种粗暴且低效的方案。 - Redis Pub/Sub? 当配置变更时,通过 Redis发布一个消息,所有 Rails 实例订阅并作出反应。这比轮询好,但存在可靠性问题。如果一个实例在发布消息时短暂离线,它将永久错过这次更新。它缺乏状态存储,无法让新启动的实例知道当前的“最终状态”。
- etcd 作为控制平面? 这是最吸引我们的方案。etcd 是一个为分布式系统设计的强一致性键值存储。它最关键的特性是
watch
机制。客户端可以“监视”一个键或一个目录(前缀),当其发生变化时,etcd 会立即通知客户端。这完美契合我们的需求:- 状态存储: etcd 持久化存储了所有插件的配置和启用状态。新实例启动时,只需从 etcd 读取一次,就能获得完整的当前状态。
- 实时通知:
watch
机制保证了配置变更能以极低的延迟推送到所有实例。 - 强一致性: 基于 Raft 协议,etcd 保证了所有 Rails 实例看到的数据视图是强一致的,避免了因配置不统一导致的行为异常。
决策很明确:我们将使用 Rails 作为应用框架,实现一个插件管理器,并利用 etcd
作为这个分布式插件系统的控制平面。
架构设计
整体架构可以用下图清晰地表示:
graph TD subgraph "控制层 (Control Plane)" A[运维人员/CI脚本] -- "etcdctl put/del" --> B[etcd Cluster] end subgraph "应用层 (Application Plane) - Rails Pod/Instance 1" B -- "Watch Events (PUT, DELETE)" --> C1[PluginManager Watcher Thread] C1 -- "通知更新" --> D1[PluginManager] D1 -- "加载/卸去/重配" --> E1[Plugin Chunks] subgraph E1 P1[ChromaDB Plugin] P2[GraphDB Plugin] end F1[Rails Application Core] -- "query(source, params)" --> D1 end subgraph "应用层 (Application Plane) - Rails Pod/Instance 2" B -- "Watch Events (PUT, DELETE)" --> C2[PluginManager Watcher Thread] C2 -- "通知更新" --> D2[PluginManager] D2 -- "加载/卸去/重配" --> E2[Plugin Chunks] subgraph E2 P3[ChromaDB Plugin] P4[GraphDB Plugin] end F2[Rails Application Core] -- "query(source, params)" --> D2 end G[ChromaDB] H[Neo4j GraphDB] P1 & P3 --> G P2 & P4 --> H
数据流和控制流如下:
- 控制流: 运维人员通过
etcdctl
命令行工具向 etcd 集群写入或删除插件配置。例如,写入/app/plugins/chroma_main
这个 key 来启用 ChromaDB 插件。 - 通知流: Rails 应用中的
PluginManager
会启动一个后台线程,持续 watch etcd 中/app/plugins/
这个前缀。当有任何 key 发生变化,etcd 会将事件推送给所有 watcher。 - 执行流: Watcher 线程收到事件后,解析出是哪个插件以及发生了什么变化(新增、更新、删除),然后通知主
PluginManager
执行相应的操作:加载新插件、重新配置现有插件或卸载插件。 - 数据流: Rails 的业务逻辑需要查询知识源时,它不直接调用具体的插件代码,而是调用
PluginManager.query(:chroma_main, {...})
。PluginManager
负责将请求路由到正确、且已激活的插件实例上。
步骤化实现
1. 定义插件接口规范
为了让 PluginManager
能够统一管理所有插件,我们首先需要定义一个标准的接口。在 Ruby 中,使用 module
来定义这种行为契约是自然的选择。
app/plugins/knowledge_source.rb
:
# frozen_string_literal: true
# 所有知识源插件都必须包含此模块,并实现其定义的方法。
# 这为 PluginManager 提供了一个统一的交互契约。
module KnowledgeSource
# @!method setup(config)
# 使用从 etcd 获取的配置来初始化插件。
# 这个方法必须是幂等的,可以被重复调用以应用新配置。
# 例如,建立数据库连接池,初始化客户端等。
# @param config [Hash] 插件的具体配置
# @return [void]
#
# @!method query(params)
# 执行查询的核心方法。
# @param params [Hash] 查询所需的参数
# @return [Hash] 结构化的查询结果
#
# @!method health_check
# 检查插件所依赖的外部服务的健康状况。
# @return [Hash] 包含状态和可选信息的哈希,例如: { status: 'ok' }
#
# @!method teardown
# 优雅地关闭插件,释放资源。
# 例如,断开数据库连接。
# @return [void]
def self.included(base)
base.extend(ClassMethods)
end
module ClassMethods
def setup(config)
raise NotImplementedError, "#{self.name} must implement the 'setup' class method."
end
def query(params)
raise NotImplementedError, "#{self.name} must implement the 'query' class method."
end
def health_check
raise NotImplementedError, "#{self.name} must implement the 'health_check' class method."
end
def teardown
raise NotImplementedError, "#{self.name} must implement the 'teardown' class method."
end
end
end
这里的关键是定义了 setup
, query
, health_check
, teardown
四个核心方法,构成了插件的生命周期。
2. 实现一个具体的 ChromaDB 插件
接下来,我们基于这个接口实现一个 ChromaDB 插件。
app/plugins/chroma_plugin.rb
:
# frozen_string_literal: true
require 'chroma-db'
require_relative 'knowledge_source'
module ChromaPlugin
include KnowledgeSource
# 使用 concurrent-ruby 的 atomics 来确保线程安全
@client = Concurrent::AtomicReference.new(nil)
@collection = Concurrent::AtomicReference.new(nil)
@logger = Rails.logger
class << self
def setup(config)
@logger.info("[ChromaPlugin] Setting up with config: #{config.inspect}")
# 参数校验是生产级代码的必要部分
url = config['url']
collection_name = config['collection_name']
unless url && collection_name
@logger.error("[ChromaPlugin] Missing required config: 'url' or 'collection_name'.")
return false
end
# 优雅地关闭旧连接
teardown
begin
client = ChromaDB::Client.new(url)
# ChromaDB Ruby 客户端的 get_or_create_collection 是幂等的
collection = client.get_or_create_collection(collection_name)
@client.set(client)
@collection.set(collection)
@logger.info("[ChromaPlugin] Setup successful. Connected to #{url}, collection: #{collection_name}.")
true
rescue StandardError => e
@logger.error("[ChromaPlugin] Failed to setup: #{e.message}")
@client.set(nil)
@collection.set(nil)
false
end
end
def query(params)
collection_instance = @collection.get
unless collection_instance
return { error: 'ChromaPlugin is not properly configured or connected.' }
end
query_texts = params[:query_texts]
n_results = params.fetch(:n_results, 5)
# 确保输入是有效的
return { error: 'query_texts must be an array of strings.' } unless query_texts.is_a?(Array)
begin
results = collection_instance.query(query_texts: query_texts, n_results: n_results)
{ data: results }
rescue StandardError => e
@logger.error("[ChromaPlugin] Query failed: #{e.message}")
{ error: "Query failed due to an internal error: #{e.class.name}" }
end
end
def health_check
client_instance = @client.get
if client_instance.nil?
return { status: 'down', reason: 'Not initialized.' }
end
begin
# 心跳检查是一种常见的健康检查方式
heartbeat = client_instance.heartbeat
if heartbeat > 0 # 简单的检查,实际可以更复杂
{ status: 'ok', heartbeat: heartbeat }
else
{ status: 'down', reason: 'Heartbeat check failed.' }
end
rescue StandardError => e
{ status: 'down', reason: e.message }
end
end
def teardown
client_instance = @client.get
if client_instance
@logger.info("[ChromaPlugin] Tearing down connection.")
# Ruby 客户端没有显式的 close 方法,所以我们只需置空引用
# 如果是数据库连接池,这里会执行 pool.disconnect!
@client.set(nil)
@collection.set(nil)
end
end
end
end
这个实现有几个关键点:
- 线程安全: 使用
Concurrent::AtomicReference
来存储客户端和集合实例,确保在多线程环境下(如 Puma 服务器)的读写安全。 - 幂等性:
setup
方法在被重复调用时,能够正确地处理旧连接,并建立新连接,这对于动态重配置至关重要。 - 健壮性: 包含了详细的日志、参数校验和异常处理。这是区分玩具代码和生产代码的核心。
3. 构建核心 PluginManager
和 etcd Watcher
这是整个系统的核心,负责与 etcd 通信并管理插件生命周期。
config/initializers/plugin_manager.rb
:
# frozen_string_literal: true
require 'etcdv3'
require 'concurrent'
# PluginManager 是一个单例,负责管理所有知识源插件的生命周期。
# 它在应用启动时初始化,并启动一个后台线程来监听 etcd 的变化。
module PluginManager
extend self
# 使用线程安全的 Concurrent::Hash 来存储插件实例
@plugins = Concurrent::Hash.new
@logger = Rails.logger
@etcd_prefix = '/app/plugins/'
def start
@logger.info('[PluginManager] Starting...')
@etcd = Etcdv3.new(endpoints: ENV.fetch('ETCD_ENDPOINTS', 'http://localhost:2379'))
# 1. 初始加载
initial_load
# 2. 启动后台 watcher 线程
start_watcher
# 3. 注册应用退出时的清理钩子
at_exit { shutdown }
@logger.info('[PluginManager] Started successfully.')
end
def query(plugin_id, params)
plugin_module = @plugins[plugin_id.to_sym]
if plugin_module
plugin_module.query(params)
else
{ error: "Plugin '#{plugin_id}' not found or not enabled." }
end
end
def status
@plugins.keys.each_with_object({}) do |id, memo|
memo[id] = @plugins[id].health_check
end
end
private
def initial_load
@logger.info('[PluginManager] Performing initial load from etcd...')
begin
response = @etcd.get(@etcd_prefix, prefix: true)
response.kvs.each do |kv|
handle_put_event(kv)
end
rescue GRPC::Unavailable => e
@logger.error("[PluginManager] Initial load failed. Cannot connect to etcd: #{e.message}. The application might not function correctly.")
end
end
def start_watcher
Thread.new do
loop do
@logger.info('[PluginManager Watcher] Starting to watch for changes...')
begin
@etcd.watch(@etcd_prefix, prefix: true) do |event|
# event 是一个 Etcdserverpb::WatchResponse 对象
event.events.each do |e|
case e.type
when :PUT
@logger.info("[PluginManager Watcher] Received PUT event for key: #{e.kv.key.force_encoding('UTF-8')}")
handle_put_event(e.kv)
when :DELETE
@logger.info("[PluginManager Watcher] Received DELETE event for key: #{e.kv.key.force_encoding('UTF-8')}")
handle_delete_event(e.kv)
end
end
end
rescue GRPC::Unavailable => e
@logger.error("[PluginManager Watcher] Watch connection to etcd lost: #{e.message}. Retrying in 5 seconds...")
sleep 5
rescue StandardError => e
@logger.error("[PluginManager Watcher] An unexpected error occurred: #{e.message}. Backtrace: #{e.backtrace.join("\n")}")
sleep 5 # 避免错误导致 CPU 100%
end
end
end
end
def handle_put_event(kv)
plugin_id = extract_plugin_id(kv.key)
return unless plugin_id
begin
config = JSON.parse(kv.value)
plugin_class_name = config['plugin_class']
enabled = config.fetch('enabled', false)
# 确保类存在且符合规范
plugin_module = plugin_class_name.safe_constantize
unless plugin_module && plugin_module.is_a?(Module) && plugin_module.singleton_class.ancestors.include?(KnowledgeSource::ClassMethods)
@logger.error("[PluginManager] Invalid plugin class '#{plugin_class_name}' for id '#{plugin_id}'.")
return
end
if enabled
@logger.info("[PluginManager] Enabling/Updating plugin '#{plugin_id}'...")
if plugin_module.setup(config['config'] || {})
@plugins[plugin_id] = plugin_module
@logger.info("[PluginManager] Plugin '#{plugin_id}' is now active.")
else
@logger.error("[PluginManager] Failed to setup plugin '#{plugin_id}'. It will be disabled.")
unload_plugin(plugin_id) # 确保设置失败的插件被清理
end
else
@logger.info("[PluginManager] Disabling plugin '#{plugin_id}' as per configuration.")
unload_plugin(plugin_id)
end
rescue JSON::ParserError
@logger.error("[PluginManager] Invalid JSON config for key: #{kv.key}")
rescue StandardError => e
@logger.error("[PluginManager] Error processing PUT event for '#{plugin_id}': #{e.message}")
end
end
def handle_delete_event(kv)
plugin_id = extract_plugin_id(kv.key)
return unless plugin_id
@logger.info("[PluginManager] Unloading plugin '#{plugin_id}' due to key deletion.")
unload_plugin(plugin_id)
end
def unload_plugin(plugin_id)
plugin_module = @plugins.delete(plugin_id)
if plugin_module
plugin_module.teardown
@logger.info("[PluginManager] Plugin '#{plugin_id}' has been successfully unloaded.")
end
end
def extract_plugin_id(key)
key.delete_prefix(@etcd_prefix).to_sym
end
def shutdown
@logger.info('[PluginManager] Shutting down all plugins...')
@plugins.keys.each { |id| unload_plugin(id) }
end
end
# 启动管理器
PluginManager.start
这份代码实现了整个系统的控制逻辑:
- 启动流程: 在 Rails 初始化时启动,首先从 etcd 全量拉取一次现有配置,然后启动一个独立的
watch
线程。 - Watch 循环: 这个循环是持久的。如果与 etcd 的连接断开,它会进入重试逻辑,保证了连接的韧性。
- 事件处理:
handle_put_event
和handle_delete_event
是核心。它们解析 etcd 事件,动态加载/卸载 Ruby 模块,并调用其生命周期方法。safe_constantize
用于安全地将字符串转换为类常量,防止代码注入风险。 - 资源清理:
at_exit
钩子确保了在应用正常退出时,所有插件都能被优雅地关闭,释放数据库连接等资源。
4. 实践操作:动态管理插件
现在,整个系统已经就位。我们可以通过 etcdctl
来实时操控应用的行为了。
1. 启用 ChromaDB 插件
创建一个 JSON 配置文件 chroma_config.json
:
{
"enabled": true,
"plugin_class": "ChromaPlugin",
"config": {
"url": "http://chromadb:8000",
"collection_name": "production_documents"
}
}
然后通过 etcdctl
写入:
# 我们将这个配置写入到 /app/plugins/chroma_prod 这个 key 下
# 'chroma_prod' 将成为这个插件实例的唯一 ID
etcdctl put /app/plugins/chroma_prod "$(cat chroma_config.json)"
几乎在命令执行成功的瞬间,所有正在运行的 Rails 实例的日志中都会出现类似下面的输出:
I, [2023-10-27T10:35:01.123456 #1] INFO -- : [PluginManager Watcher] Received PUT event for key: /app/plugins/chroma_prod
I, [2023-10-27T10:35:01.123500 #1] INFO -- : [PluginManager] Enabling/Updating plugin 'chroma_prod'...
I, [2023-10-27T10:35:01.123600 #1] INFO -- : [ChromaPlugin] Setting up with config: {"url"=>"http://chromadb:8000", "collection_name"=>"production_documents"}
I, [2023-10-27T10:35:01.250000 #1] INFO -- : [ChromaPlugin] Setup successful. Connected to http://chromadb:8000, collection: production_documents.
I, [2023-10-27T10:35:01.250100 #1] INFO -- : [PluginManager] Plugin 'chroma_prod' is now active.
此时,在 Rails console 中执行 PluginManager.query(:chroma_prod, query_texts: ["some query"])
就能获得查询结果。
2. 动态修改配置
假设我们需要将 ChromaDB 指向一个新的集合。只需修改 JSON 文件:
{
"enabled": true,
"plugin_class": "ChromaPlugin",
"config": {
"url": "http://chromadb:8000",
"collection_name": "new_experimental_collection"
}
}
然后再次执行 put
命令覆盖原有的 key:
etcdctl put /app/plugins/chroma_prod "$(cat chroma_config.json)"
Rails 应用会再次收到 PUT
事件,PluginManager
会调用 ChromaPlugin.setup
,插件内部逻辑会先 teardown
旧连接,再用新配置建立新连接,整个过程应用无感知、无中断。
3. 禁用或删除插件
要禁用一个插件,可以将 enabled
设为 false
,或者直接删除 etcd 中的 key。
# 方式一:禁用
etcdctl put /app/plugins/chroma_prod '{"enabled": false, "plugin_class": "ChromaPlugin"}'
# 方式二:直接删除
etcdctl del /app/plugins/chroma_prod
无论哪种方式,Rails 实例都会收到相应的事件,并调用 unload_plugin
方法,执行 ChromaPlugin.teardown
并从 @plugins
哈希中移除该插件。之后任何对 :chroma_prod
的 query
调用都会失败。
遗留问题与未来迭代路径
这个架构成功地解决了我们最初的痛点,极大地提升了敏捷开发的效率和生产环境的灵活性。但它并非没有代价和局限性。
首先,系统的复杂度显著增加了。我们引入了 etcd 这个新的基础设施组件,需要专业的运维来保证其高可用性。整个团队也需要理解这种事件驱动、动态配置的模式,对开发和调试提出了更高的要求。
其次,当前的插件加载机制是基于 Ruby 的动态特性,但对于包含复杂依赖或需要编译原生扩展的 Gem 的插件,动态加载可能会遇到问题。一个更鲁棒的方案可能需要将插件打包成独立的 Gem,并通过某种机制来管理它们的依赖。
再者,安全性是一个重要的考量。目前任何能访问 etcd 的人都可以修改应用的行为。在生产环境中,必须为 etcd 的 /app/plugins/
前缀配置严格的 ACLs,并对写入的配置值进行更严格的 Schema 校验,防止恶意或错误的配置导致应用崩溃。
未来的一个迭代方向是,可以围绕这个控制平面构建一个简单的管理后台 UI,让产品或运营人员也能以安全、可控的方式调整知识源配置,实现真正的业务驱动配置变更。另一个方向是探索将此模式扩展到应用的其他方面,比如动态管理后台任务的执行策略、A/B 测试的流量分配等,让整个 Rails 应用变得更加“活”起来。