我们需要一个存储系统,它不仅要记录当前状态,还要忠实地记录每一次状态变更的完整历史。对于金融风控规则、关键基础设施配置或任何需要强审计性的业务实体,追溯“在某个特定时间点,这个对象的状态是什么”至关重要。传统的数据库通过 updated_at 字段和日志表来解决这个问题,但这往往是事后补救,查询复杂,且无法保证变更的原子性和完整性。
Event Sourcing 是一个优雅的模型,但从零实现一个生产级的 Event Store 成本高昂。这让我们开始思考,我们是否能利用一个早已存在、身经百战的工具来充当我们的事实来源?这个工具就是 Git。Git 本质上是一个内容寻址的、不可变的、带有时间戳的快照日志。这听起来非常像一个 Event Store。
基于这个构想,我们决定用 Clojure 构建一个系统。该系统将 Git 作为写入模型和事件存储,通过 CQRS 模式将变更投射到一个优化的读取模型中,最终通过 RESTful API 提供强大的时序查询能力。Clojure 的不可变数据结构和函数式编程范式与 Git 的工作哲学天然契合。
架构蓝图:命令、提交与投射
整个系统的核心是 CQRS(命令查询职责分离)模式。Git 本身不适合快速随机读取,因此我们必须将读写路径分开。
- 写入模型 (Command Model): 所有的状态变更请求(命令)都会被转换成对一个 Git 仓库的提交 (Commit)。每个业务实体(例如一个风控规则)在仓库中表现为一个文件(如
rules/rule-123.json)。修改规则就是提交该文件的新版本。Commit Message 则记录了变更的元数据:谁、为什么、何时。 - 事件总线 (Event Bus): Git 的提交日志
git log天然就是我们的事件流。每个 commit 都是一个不可变的事件。 - 读取模型 (Query Model): 一个独立的后台进程(我们称之为“投射器” Projector)会监听 Git 仓库的变更。它会读取新的 commit,解析其中的文件内容,并将其“投射”到一个为快速查询而优化的数据结构中。在这个实现里,我们将使用一个内存中的 Clojure
atom来保存读取模型,以支持时序查询。 - API 层 (API Layer): 提供标准的 RESTful 接口。
POST或PUT请求对应写入模型的命令,而GET请求则服务于读取模型,包括获取最新状态和指定时间点的历史状态。
下面是这个架构的流程图。
sequenceDiagram
participant Client
participant API
participant CommandHandler
participant GitRepo as "Git Repository (Write Model)"
participant Projector as "Projector (Background Process)"
participant ReadModel as "In-Memory Read Model"
Client->>+API: POST /api/rules (body: rule data)
API->>+CommandHandler: handle-update-rule(rule)
CommandHandler->>+GitRepo: 1. Writes rule-123.json
CommandHandler->>+GitRepo: 2. git add .
CommandHandler->>+GitRepo: 3. git commit -m "Update rule 123"
GitRepo-->>-CommandHandler: Commit successful (SHA: abc1234)
CommandHandler-->>-API: { "commitId": "abc1234" }
API-->>-Client: 202 OK
loop Poll for changes
Projector->>+GitRepo: git log --since=
GitRepo-->>-Projector: New commits [abc1234, def5678]
Projector->>+GitRepo: For each commit, git show :
GitRepo-->>-Projector: File content at that commit
Projector->>+ReadModel: update-read-model(commit_sha, timestamp, entity_id, content)
end
Client->>+API: GET /api/rules/123?as-of=2023-10-26T12:00:00Z
API->>+ReadModel: query-rule-as-of("123", "2023-10-26T12:00:00Z")
ReadModel-->>-API: Rule state at specified time
API-->>-Client: 200 OK (rule data)
环境与依赖配置
我们的项目使用 deps.edn 进行依赖管理。核心依赖包括用于 Git 操作的 org.eclipse.jgit/org.eclipse.jgit,用于 Web 服务的 ring 和 compojure,以及 JSON 处理的 cheshire。
;; deps.edn
{:paths ["src"]
:deps {org.clojure/clojure {:mvn/version "1.11.1"}
;; Git 操作
org.eclipse.jgit/org.eclipse.jgit {:mvn/version "6.7.0.202309050840-r"}
;; Web 服务器与路由
ring/ring-core {:mvn/version "1.10.0"}
ring/ring-jetty-adapter {:mvn/version "1.10.0"}
ring/ring-json {:mvn/version "0.5.1"}
compojure/compojure {:mvn/version "1.7.0"}
;; JSON 序列化
cheshire/cheshire {:mvn/version "5.12.0"}
;; 日志
org.clojure/tools.logging {:mvn/version "1.2.4"}
ch.qos.logback/logback-classic {:mvn/version "1.4.11"}}}
写入模型:与 JGit 的深度集成
写入模型的核心是安全、原子地与 Git 仓库交互。我们将其抽象为一个 protocol,以便未来可以替换实现(例如,从本地文件系统切换到内存中的 Git 实现进行测试)。
(ns git-cqrs.git-store
(:require [clojure.java.io :as io]
[clojure.tools.logging :as log])
(:import (org.eclipse.jgit.api Git)
(org.eclipse.jgit.lib Repository PersonIdent)
(org.eclipse.jgit.revwalk RevWalk)
(java.io File)
(java.util.concurrentlocks ReentrantLock)))
;;; 定义一个协议来抽象 Git 操作
(defprotocol IGitStore
(commit-entity! [this entity-type entity-id content author message] "将实体内容提交到 Git 仓库")
(read-entity-at [this commit-ish entity-type entity-id] "读取指定 commit 的实体内容")
(get-commit-log [this since-commit-sha] "获取某个 commit 之后的所有提交日志"))
;;; 基于 JGit 的文件系统实现
(defrecord JGitStore [^Git git-handle ^File repo-path ^ReentrantLock write-lock]
IGitStore
(commit-entity! [this entity-type entity-id content author message]
;; 在真实项目中,并发写入 Git 仓库是一个严重问题。
;; JGit 本身不是完全线程安全的,尤其是对于会修改索引和工作区的操作。
;; 这里使用一个简单的 ReentrantLock 来保证写入操作的串行化。
;; 在高并发场景下,这会成为瓶颈,可能需要一个专用的 actor 或写入队列来处理。
(.lock write-lock)
(try
(let [repo-dir (.getRepository git-handle)
file-path (str (name entity-type) "/" (str entity-id) ".json")
target-file (io/file repo-path file-path)]
(io/make-parents target-file)
(spit target-file content)
(-> (.add git-handle) (.addFilepattern file-path) .call)
(let [committer (PersonIdent. "System" "[email protected]")
author-ident (PersonIdent. (:name author) (:email author))]
(-> (.commit git-handle)
(.setAuthor author-ident)
(.setCommitter committer)
(.setMessage message)
.call)))
(catch Exception e
(log/error e "Failed to commit entity" {:entity-id entity-id})
;; 简单的错误处理,真实场景需要更复杂的重试或补偿逻辑
(throw (ex-info "Git commit failed" {:entity-id entity-id} e)))
(finally
(.unlock write-lock))))
(read-entity-at [this commit-ish entity-type entity-id]
;; ... 实现读取逻辑,稍后详述 ...
)
(get-commit-log [this since-commit-sha]
;; ... 实现日志拉取,稍后详述 ...
))
(defn new-jgit-store [path-str]
(let [repo-path (io/file path-str)
git-handle (if (.exists (io/file repo-path ".git"))
(Git/open repo-path)
(do
(log/info "Initializing new Git repository at" path-str)
(Git/init)
(.setDirectory (io/file path-str))
(.call)))]
(map->JGitStore {:git-handle git-handle
:repo-path repo-path
:write-lock (ReentrantLock.)})))
;;; 用法示例
;; (def store (new-jgit-store "/tmp/my-git-db"))
;; (commit-entity! store :rules "rule-123" "{ \"field\": \"amount\", \"op\": \">\", \"value\": 1000 }" {:name "admin" :email "[email protected]"} "Initial rule creation")
这段代码的核心在于 commit-entity! 函数。我们通过一个 ReentrantLock 保证了对 Git 写操作的线程安全,这是一个务实但有性能局限的方案。在生产环境中,任何可能导致仓库状态不一致的并发写都必须被阻止。
读取模型:投射器与时序数据结构
投射器的职责是从 Git 拉取最新的变更,并更新内存中的读取模型。我们的读取模型需要支持“as-of”查询,即查询任意时间点的状态。一个 sorted-map 是实现此功能的理想数据结构。
我们的读取模型 state 将会是一个 atom,其结构如下:
{
:rules { ; entity-type
"rule-123" { ; entity-id
;; 时间戳作为 key,自动排序
;; value 是 commit SHA 和解析后的数据
1672531200000 {:commit "abc1234", :data {...}},
1672534800000 {:commit "def5678", :data {...}}
}
},
:last-processed-sha "def5678" ; 最后处理的 commit SHA
}
1. 实现 Git 日志拉取和内容读取
我们需要在 JGitStore 中补全读取相关的函数。
;; 在 JGitStore record 中添加/修改
(defrecord JGitStore [^Git git-handle ^File repo-path ^ReentrantLock write-lock]
IGitStore
(commit-entity! [this entity-type entity-id content author message]
;; ... 如上 ...
)
(read-entity-at [this commit-ish entity-type entity-id]
(let [repo (.getRepository git-handle)
file-path (str (name entity-type) "/" (str entity-id) ".json")
commit-id (.resolve repo commit-ish)]
(when commit-id
(with-open [rev-walk (RevWalk. repo)
tree-walk (org.eclipse.jgit.treewalk.TreeWalk. repo)]
(let [commit (.parseCommit rev-walk commit-id)
tree (.getTree commit)]
(.addTree tree-walk tree)
(.setRecursive tree-walk true)
(.setFilter tree-walk (org.eclipse.jgit.treewalk.filter.PathFilter/create file-path))
(when (.next tree-walk)
(let [object-id (.getObjectId tree-walk 0)
loader (.open repo object-id)]
(String. (.getBytes loader) "UTF-8"))))))))
(get-commit-log [this since-commit-sha]
(let [repo (.getRepository git-handle)
log-cmd (.log git-handle)]
;; 如果提供了 since-commit-sha,就从那个 commit 的下一个开始
(when-let [start-commit (and since-commit-sha (.resolve repo since-commit-sha))]
(.not log-cmd start-commit))
(->> (.call log-cmd)
;; JGit 返回的是最新的 commit 在前,我们需要反转它以保证事件处理的顺序
reverse
(map (fn [commit]
{:sha (.getName commit)
:timestamp (* 1000 (.getCommitTime commit)) ; 转换为毫秒
:message (.getFullMessage commit)
:author {:name (-> commit .getAuthorIdent .getName)
:email (-> commit .getAuthorIdent .getEmailAddress)}}))))))
2. 投射器进程
投射器是一个简单的后台循环,它定期检查新的 commit 并更新 atom 状态。
(ns git-cqrs.projector
(:require [git-cqrs.git-store :as store]
[cheshire.core :as json]
[clojure.tools.logging :as log]))
;; 全局的读取模型状态
(def read-model (atom {:last-processed-sha nil}))
(defn- parse-entity-from-path [file-path]
(let [parts (clojure.string/split file-path #"/")]
(when (>= (count parts) 2)
{:type (keyword (first parts))
:id (-> (second parts)
(clojure.string/replace #".json$" ""))})))
(defn- process-commit [git-store commit]
(let [repo (.getRepository (:git-handle git-store))]
(with-open [rev-walk (RevWalk. repo)
tree-walk (org.eclipse.jgit.treewalk.TreeWalk. repo)]
(let [commit-obj (.parseCommit rev-walk (.resolve repo (:sha commit)))
tree (.getTree commit-obj)
parent-tree (when (pos? (.getParentCount commit-obj))
(->> 0 (.getParent commit-obj) (.parseCommit rev-walk) .getTree))]
(.addTree tree-walk tree)
(when parent-tree (.addTree tree-walk parent-tree))
(.setRecursive tree-walk true)
;; 使用 DiffFormatter 找出变更的文件
(let [diff-formatter (org.eclipse.jgit.diff.DiffFormatter. org.eclipse.jgit.util.io.DisabledOutputStream/INSTANCE)]
(.setRepository diff-formatter repo)
(->> (.scan diff-formatter (when parent-tree parent-tree) tree)
(map (fn [diff-entry]
;; 我们只关心修改和新增的文件
(let [path (if (= (.getChangeType diff-entry) org.eclipse.jgit.diff.DiffEntry$ChangeType/DELETE)
(.getOldPath diff-entry)
(.getNewPath diff-entry))]
path)))
(distinct)
(map (fn [path]
(when-let [{:keys [type id]} (parse-entity-from-path path)]
(let [content (store/read-entity-at git-store (:sha commit) type id)
data (try (json/parse-string content true) (catch Exception _ nil))]
(when data
(log/info "Projecting entity update" {:type type :id id :commit (:sha commit)})
(swap! read-model
(fn [current-model]
(-> current-model
(assoc-in [type id (:timestamp commit)] {:commit (:sha commit) :data data})
(assoc :last-processed-sha (:sha commit))))))))))
doall))))))
(defn run-projector! [git-store]
(log/info "Starting projector...")
(future
(while (not (.. Thread currentThread isInterrupted))
(try
(let [last-sha (get @read-model :last-processed-sha)
new-commits (store/get-commit-log git-store last-sha)]
(when (seq new-commits)
(log/info "Found" (count new-commits) "new commits to project.")
(doseq [commit new-commits]
(process-commit git-store commit))))
(catch Exception e
(log/error e "Error in projector loop")))
(Thread/sleep 5000)))) ;; 轮询周期,生产环境应使用更优雅的机制,如文件系统通知
这个投射器实现了一个关键点:它通过比较 commit 与其父 commit 的 tree 对象来找出变更的文件。这比遍历每个 commit 中的所有文件要高效得多。
查询服务与 RESTful API
最后一步是暴露 HTTP 接口。我们将使用 Compojure 来定义路由。
1. 查询逻辑
查询处理器直接从 read-model 这个 atom 中读取数据。
(ns git-cqrs.query-service
(:require [git-cqrs.projector :refer [read-model]]))
(defn find-latest-before [sorted-versions timestamp]
;; rseq a sorted-map returns a reverse chronological sequence
(->> (rseq sorted-versions)
(filter (fn [[ts _]] (<= ts timestamp)))
first))
(defn get-entity-as-of [entity-type entity-id timestamp]
(let [versions (get-in @read-model [entity-type entity-id])]
(when versions
(if-let [[_ value] (find-latest-before versions timestamp)]
(:data value)
nil))))
(defn get-entity-latest [entity-type entity-id]
(let [versions (get-in @read-model [entity-type entity-id])]
(when versions
(-> versions last val :data))))
(defn get-entity-history [entity-type entity-id]
(let [versions (get-in @read-model [entity-type entity-id])]
(map (fn [[ts {:keys [commit data]}]]
{:timestamp ts :commit commit :data data})
versions)))
find-latest-before 是时序查询的核心。它利用 sorted-map 的特性,高效地找到在指定时间点之前(或恰好在该时间点)的最后一个版本。
2. API 路由
(ns git-cqrs.api
(:require [compojure.core :refer :all]
[compojure.route :as route]
[ring.middleware.json :refer [wrap-json-response wrap-json-body]]
[ring.middleware.params :refer [wrap-params]]
[cheshire.core :as json]
[git-cqrs.git-store :as store]
[git-cqrs.query-service :as query]))
(defn- parse-timestamp [ts-str]
(try
(Long/parseLong ts-str)
(catch Exception _
(-> (java.time.Instant/parse ts-str)
.toEpochMilli))))
(defn api-routes [git-store]
(routes
;; 写入接口
(POST "/:entity-type/:id" [entity-type id :as request]
(let [body (:body request)
author {:name (get-in request [:headers "x-author-name"] "Unknown")
:email (get-in request [:headers "x-author-email"] "[email protected]")}
message (get-in request [:headers "x-commit-message"] "Default API commit")]
(try
(let [content (json/generate-string body)
commit (store/commit-entity! git-store (keyword entity-type) id content author message)]
{:status 202
:headers {"Content-Type" "application/json"}
:body (json/generate-string {:commitId (.getName commit)})})
(catch Exception e
{:status 500
:headers {"Content-Type" "application/json"}
:body (json/generate-string {:error "Failed to commit" :details (str e)})}))))
;; 查询接口
(GET "/:entity-type/:id" [entity-type id as-of]
(let [entity (if as-of
(query/get-entity-as-of (keyword entity-type) id (parse-timestamp as-of))
(query/get-entity-latest (keyword entity-type) id))]
(if entity
{:status 200
:headers {"Content-Type" "application/json"}
:body (json/generate-string entity)}
{:status 404
:body "Entity not found"})))
(GET "/:entity-type/:id/history" [entity-type id]
(let [history (query/get-entity-history (keyword entity-type) id)]
{:status 200
:headers {"Content-Type" "application/json"}
:body (json/generate-string history)}))
(route/not-found "Not Found")))
(defn create-app [git-store]
(-> (api-routes git-store)
(wrap-json-body {:keywords? true})
wrap-json-response
wrap-params))
;;; 主函数
(defn -main []
(let [store (store/new-jgit-store "/var/data/git-cqrs-repo")
projector-thread (projector/run-projector! store)
app (create-app store)]
(ring.adapter.jetty/run-jetty app {:port 3000 :join? false})
(log/info "API server started on port 3000")))
API 定义清晰地分离了命令和查询。写操作(POST)与 git-store 交互,而读操作(GET)则与 query-service 交互。as-of 参数的实现直接调用了我们之前写的时序查询函数。
局限性与未来迭代方向
这个基于 Git 和 CQRS 的系统提供了一个非常强大的、具备完整审计能力的数据存储模型,但它并非银弹。在真实项目中,我们必须清楚它的边界。
首先,写入性能是主要瓶颈。对 Git 的每次提交都涉及到文件 IO 和加锁,这决定了它不适用于高吞吐量的写入场景。我们的单线程锁机制虽然保证了安全,但也限制了并发能力。一个可能的优化是引入一个写入队列,将命令批量提交到 Git。
其次,读取模型的可扩展性。目前我们使用内存中的 atom,这对于中小型数据集非常快速,但如果实体数量或历史版本非常多,会消耗大量内存。一个自然的演进方向是将投射器的数据写入一个更专业的读取存储,如 PostgreSQL、Elasticsearch 或专门的时序数据库(如 InfluxDB),这取决于具体的查询需求。
再者,Git 仓库的维护。随着时间推移,Git 仓库会变得非常庞大。需要制定策略来管理它,例如对投射器使用浅克隆(shallow clone),或者定期对旧历史进行归档和剪枝。
最后,投射器的鲁棒性。当前的轮询机制很简单,但在生产环境中需要更可靠的机制来保证事件不丢失、不重复处理。可能需要引入持久化的游标来记录 last-processed-sha,并实现更复杂的错误重试逻辑。
这个架构的真正价值在于,它为那些“审计性”和“可追溯性”高于“写入吞吐量”的场景,提供了一个成本相对较低、概念清晰且技术栈成熟的解决方案。它将开发人员最熟悉的工具之一 —— Git,创造性地应用到了数据存储领域。