使用 Clojure 与 Git 构建一个支持时序查询的 CQRS 系统


我们需要一个存储系统,它不仅要记录当前状态,还要忠实地记录每一次状态变更的完整历史。对于金融风控规则、关键基础设施配置或任何需要强审计性的业务实体,追溯“在某个特定时间点,这个对象的状态是什么”至关重要。传统的数据库通过 updated_at 字段和日志表来解决这个问题,但这往往是事后补救,查询复杂,且无法保证变更的原子性和完整性。

Event Sourcing 是一个优雅的模型,但从零实现一个生产级的 Event Store 成本高昂。这让我们开始思考,我们是否能利用一个早已存在、身经百战的工具来充当我们的事实来源?这个工具就是 Git。Git 本质上是一个内容寻址的、不可变的、带有时间戳的快照日志。这听起来非常像一个 Event Store。

基于这个构想,我们决定用 Clojure 构建一个系统。该系统将 Git 作为写入模型和事件存储,通过 CQRS 模式将变更投射到一个优化的读取模型中,最终通过 RESTful API 提供强大的时序查询能力。Clojure 的不可变数据结构和函数式编程范式与 Git 的工作哲学天然契合。

架构蓝图:命令、提交与投射

整个系统的核心是 CQRS(命令查询职责分离)模式。Git 本身不适合快速随机读取,因此我们必须将读写路径分开。

  1. 写入模型 (Command Model): 所有的状态变更请求(命令)都会被转换成对一个 Git 仓库的提交 (Commit)。每个业务实体(例如一个风控规则)在仓库中表现为一个文件(如 rules/rule-123.json)。修改规则就是提交该文件的新版本。Commit Message 则记录了变更的元数据:谁、为什么、何时。
  2. 事件总线 (Event Bus): Git 的提交日志 git log 天然就是我们的事件流。每个 commit 都是一个不可变的事件。
  3. 读取模型 (Query Model): 一个独立的后台进程(我们称之为“投射器” Projector)会监听 Git 仓库的变更。它会读取新的 commit,解析其中的文件内容,并将其“投射”到一个为快速查询而优化的数据结构中。在这个实现里,我们将使用一个内存中的 Clojure atom 来保存读取模型,以支持时序查询。
  4. API 层 (API Layer): 提供标准的 RESTful 接口。POSTPUT 请求对应写入模型的命令,而 GET 请求则服务于读取模型,包括获取最新状态和指定时间点的历史状态。

下面是这个架构的流程图。

sequenceDiagram
    participant Client
    participant API
    participant CommandHandler
    participant GitRepo as "Git Repository (Write Model)"
    participant Projector as "Projector (Background Process)"
    participant ReadModel as "In-Memory Read Model"

    Client->>+API: POST /api/rules (body: rule data)
    API->>+CommandHandler: handle-update-rule(rule)
    CommandHandler->>+GitRepo: 1. Writes rule-123.json
    CommandHandler->>+GitRepo: 2. git add .
    CommandHandler->>+GitRepo: 3. git commit -m "Update rule 123"
    GitRepo-->>-CommandHandler: Commit successful (SHA: abc1234)
    CommandHandler-->>-API: { "commitId": "abc1234" }
    API-->>-Client: 202 OK

    loop Poll for changes
        Projector->>+GitRepo: git log --since=
        GitRepo-->>-Projector: New commits [abc1234, def5678]
        Projector->>+GitRepo: For each commit, git show :
        GitRepo-->>-Projector: File content at that commit
        Projector->>+ReadModel: update-read-model(commit_sha, timestamp, entity_id, content)
    end

    Client->>+API: GET /api/rules/123?as-of=2023-10-26T12:00:00Z
    API->>+ReadModel: query-rule-as-of("123", "2023-10-26T12:00:00Z")
    ReadModel-->>-API: Rule state at specified time
    API-->>-Client: 200 OK (rule data)

环境与依赖配置

我们的项目使用 deps.edn 进行依赖管理。核心依赖包括用于 Git 操作的 org.eclipse.jgit/org.eclipse.jgit,用于 Web 服务的 ringcompojure,以及 JSON 处理的 cheshire

;; deps.edn
{:paths ["src"]
 :deps  {org.clojure/clojure              {:mvn/version "1.11.1"}
         ;; Git 操作
         org.eclipse.jgit/org.eclipse.jgit  {:mvn/version "6.7.0.202309050840-r"}
         ;; Web 服务器与路由
         ring/ring-core                     {:mvn/version "1.10.0"}
         ring/ring-jetty-adapter            {:mvn/version "1.10.0"}
         ring/ring-json                     {:mvn/version "0.5.1"}
         compojure/compojure                {:mvn/version "1.7.0"}
         ;; JSON 序列化
         cheshire/cheshire                  {:mvn/version "5.12.0"}
         ;; 日志
         org.clojure/tools.logging          {:mvn/version "1.2.4"}
         ch.qos.logback/logback-classic     {:mvn/version "1.4.11"}}}

写入模型:与 JGit 的深度集成

写入模型的核心是安全、原子地与 Git 仓库交互。我们将其抽象为一个 protocol,以便未来可以替换实现(例如,从本地文件系统切换到内存中的 Git 实现进行测试)。

(ns git-cqrs.git-store
  (:require [clojure.java.io :as io]
            [clojure.tools.logging :as log])
  (:import (org.eclipse.jgit.api Git)
           (org.eclipse.jgit.lib Repository PersonIdent)
           (org.eclipse.jgit.revwalk RevWalk)
           (java.io File)
           (java.util.concurrentlocks ReentrantLock)))

;;; 定义一个协议来抽象 Git 操作
(defprotocol IGitStore
  (commit-entity! [this entity-type entity-id content author message] "将实体内容提交到 Git 仓库")
  (read-entity-at [this commit-ish entity-type entity-id] "读取指定 commit 的实体内容")
  (get-commit-log [this since-commit-sha] "获取某个 commit 之后的所有提交日志"))

;;; 基于 JGit 的文件系统实现
(defrecord JGitStore [^Git git-handle ^File repo-path ^ReentrantLock write-lock]
  IGitStore
  (commit-entity! [this entity-type entity-id content author message]
    ;; 在真实项目中,并发写入 Git 仓库是一个严重问题。
    ;; JGit 本身不是完全线程安全的,尤其是对于会修改索引和工作区的操作。
    ;; 这里使用一个简单的 ReentrantLock 来保证写入操作的串行化。
    ;; 在高并发场景下,这会成为瓶颈,可能需要一个专用的 actor 或写入队列来处理。
    (.lock write-lock)
    (try
      (let [repo-dir (.getRepository git-handle)
            file-path (str (name entity-type) "/" (str entity-id) ".json")
            target-file (io/file repo-path file-path)]
        (io/make-parents target-file)
        (spit target-file content)

        (-> (.add git-handle) (.addFilepattern file-path) .call)

        (let [committer (PersonIdent. "System" "[email protected]")
              author-ident (PersonIdent. (:name author) (:email author))]
          (-> (.commit git-handle)
              (.setAuthor author-ident)
              (.setCommitter committer)
              (.setMessage message)
              .call)))
      (catch Exception e
        (log/error e "Failed to commit entity" {:entity-id entity-id})
        ;; 简单的错误处理,真实场景需要更复杂的重试或补偿逻辑
        (throw (ex-info "Git commit failed" {:entity-id entity-id} e)))
      (finally
        (.unlock write-lock))))

  (read-entity-at [this commit-ish entity-type entity-id]
    ;; ... 实现读取逻辑,稍后详述 ...
    )

  (get-commit-log [this since-commit-sha]
    ;; ... 实现日志拉取,稍后详述 ...
    ))


(defn new-jgit-store [path-str]
  (let [repo-path (io/file path-str)
        git-handle (if (.exists (io/file repo-path ".git"))
                     (Git/open repo-path)
                     (do
                       (log/info "Initializing new Git repository at" path-str)
                       (Git/init)
                       (.setDirectory (io/file path-str))
                       (.call)))]
    (map->JGitStore {:git-handle git-handle
                     :repo-path repo-path
                     :write-lock (ReentrantLock.)})))

;;; 用法示例
;; (def store (new-jgit-store "/tmp/my-git-db"))
;; (commit-entity! store :rules "rule-123" "{ \"field\": \"amount\", \"op\": \">\", \"value\": 1000 }" {:name "admin" :email "[email protected]"} "Initial rule creation")

这段代码的核心在于 commit-entity! 函数。我们通过一个 ReentrantLock 保证了对 Git 写操作的线程安全,这是一个务实但有性能局限的方案。在生产环境中,任何可能导致仓库状态不一致的并发写都必须被阻止。

读取模型:投射器与时序数据结构

投射器的职责是从 Git 拉取最新的变更,并更新内存中的读取模型。我们的读取模型需要支持“as-of”查询,即查询任意时间点的状态。一个 sorted-map 是实现此功能的理想数据结构。

我们的读取模型 state 将会是一个 atom,其结构如下:

{
  :rules { ; entity-type
    "rule-123" { ; entity-id
      ;; 时间戳作为 key,自动排序
      ;; value 是 commit SHA 和解析后的数据
      1672531200000 {:commit "abc1234", :data {...}},
      1672534800000 {:commit "def5678", :data {...}}
    }
  },
  :last-processed-sha "def5678" ; 最后处理的 commit SHA
}

1. 实现 Git 日志拉取和内容读取

我们需要在 JGitStore 中补全读取相关的函数。

;; 在 JGitStore record 中添加/修改
(defrecord JGitStore [^Git git-handle ^File repo-path ^ReentrantLock write-lock]
  IGitStore
  (commit-entity! [this entity-type entity-id content author message]
    ;; ... 如上 ...
    )

  (read-entity-at [this commit-ish entity-type entity-id]
    (let [repo (.getRepository git-handle)
          file-path (str (name entity-type) "/" (str entity-id) ".json")
          commit-id (.resolve repo commit-ish)]
      (when commit-id
        (with-open [rev-walk (RevWalk. repo)
                    tree-walk (org.eclipse.jgit.treewalk.TreeWalk. repo)]
          (let [commit (.parseCommit rev-walk commit-id)
                tree (.getTree commit)]
            (.addTree tree-walk tree)
            (.setRecursive tree-walk true)
            (.setFilter tree-walk (org.eclipse.jgit.treewalk.filter.PathFilter/create file-path))
            (when (.next tree-walk)
              (let [object-id (.getObjectId tree-walk 0)
                    loader (.open repo object-id)]
                (String. (.getBytes loader) "UTF-8"))))))))

  (get-commit-log [this since-commit-sha]
    (let [repo (.getRepository git-handle)
          log-cmd (.log git-handle)]
      ;; 如果提供了 since-commit-sha,就从那个 commit 的下一个开始
      (when-let [start-commit (and since-commit-sha (.resolve repo since-commit-sha))]
        (.not log-cmd start-commit))
      
      (->> (.call log-cmd)
           ;; JGit 返回的是最新的 commit 在前,我们需要反转它以保证事件处理的顺序
           reverse 
           (map (fn [commit]
                  {:sha (.getName commit)
                   :timestamp (* 1000 (.getCommitTime commit)) ; 转换为毫秒
                   :message (.getFullMessage commit)
                   :author {:name (-> commit .getAuthorIdent .getName)
                            :email (-> commit .getAuthorIdent .getEmailAddress)}}))))))

2. 投射器进程

投射器是一个简单的后台循环,它定期检查新的 commit 并更新 atom 状态。

(ns git-cqrs.projector
  (:require [git-cqrs.git-store :as store]
            [cheshire.core :as json]
            [clojure.tools.logging :as log]))

;; 全局的读取模型状态
(def read-model (atom {:last-processed-sha nil}))

(defn- parse-entity-from-path [file-path]
  (let [parts (clojure.string/split file-path #"/")]
    (when (>= (count parts) 2)
      {:type (keyword (first parts))
       :id (-> (second parts)
               (clojure.string/replace #".json$" ""))})))

(defn- process-commit [git-store commit]
  (let [repo (.getRepository (:git-handle git-store))]
    (with-open [rev-walk (RevWalk. repo)
                tree-walk (org.eclipse.jgit.treewalk.TreeWalk. repo)]
      (let [commit-obj (.parseCommit rev-walk (.resolve repo (:sha commit)))
            tree (.getTree commit-obj)
            parent-tree (when (pos? (.getParentCount commit-obj))
                          (->> 0 (.getParent commit-obj) (.parseCommit rev-walk) .getTree))]
        (.addTree tree-walk tree)
        (when parent-tree (.addTree tree-walk parent-tree))
        (.setRecursive tree-walk true)
        
        ;; 使用 DiffFormatter 找出变更的文件
        (let [diff-formatter (org.eclipse.jgit.diff.DiffFormatter. org.eclipse.jgit.util.io.DisabledOutputStream/INSTANCE)]
          (.setRepository diff-formatter repo)
          (->> (.scan diff-formatter (when parent-tree parent-tree) tree)
               (map (fn [diff-entry]
                      ;; 我们只关心修改和新增的文件
                      (let [path (if (= (.getChangeType diff-entry) org.eclipse.jgit.diff.DiffEntry$ChangeType/DELETE)
                                   (.getOldPath diff-entry)
                                   (.getNewPath diff-entry))]
                        path)))
               (distinct)
               (map (fn [path]
                      (when-let [{:keys [type id]} (parse-entity-from-path path)]
                        (let [content (store/read-entity-at git-store (:sha commit) type id)
                              data (try (json/parse-string content true) (catch Exception _ nil))]
                          (when data
                            (log/info "Projecting entity update" {:type type :id id :commit (:sha commit)})
                            (swap! read-model
                                   (fn [current-model]
                                     (-> current-model
                                         (assoc-in [type id (:timestamp commit)] {:commit (:sha commit) :data data})
                                         (assoc :last-processed-sha (:sha commit))))))))))
               doall))))))

(defn run-projector! [git-store]
  (log/info "Starting projector...")
  (future
    (while (not (.. Thread currentThread isInterrupted))
      (try
        (let [last-sha (get @read-model :last-processed-sha)
              new-commits (store/get-commit-log git-store last-sha)]
          (when (seq new-commits)
            (log/info "Found" (count new-commits) "new commits to project.")
            (doseq [commit new-commits]
              (process-commit git-store commit))))
        (catch Exception e
          (log/error e "Error in projector loop")))
      (Thread/sleep 5000)))) ;; 轮询周期,生产环境应使用更优雅的机制,如文件系统通知

这个投射器实现了一个关键点:它通过比较 commit 与其父 commit 的 tree 对象来找出变更的文件。这比遍历每个 commit 中的所有文件要高效得多。

查询服务与 RESTful API

最后一步是暴露 HTTP 接口。我们将使用 Compojure 来定义路由。

1. 查询逻辑

查询处理器直接从 read-model 这个 atom 中读取数据。

(ns git-cqrs.query-service
  (:require [git-cqrs.projector :refer [read-model]]))

(defn find-latest-before [sorted-versions timestamp]
  ;; rseq a sorted-map returns a reverse chronological sequence
  (->> (rseq sorted-versions)
       (filter (fn [[ts _]] (<= ts timestamp)))
       first))

(defn get-entity-as-of [entity-type entity-id timestamp]
  (let [versions (get-in @read-model [entity-type entity-id])]
    (when versions
      (if-let [[_ value] (find-latest-before versions timestamp)]
        (:data value)
        nil))))

(defn get-entity-latest [entity-type entity-id]
  (let [versions (get-in @read-model [entity-type entity-id])]
    (when versions
      (-> versions last val :data))))

(defn get-entity-history [entity-type entity-id]
  (let [versions (get-in @read-model [entity-type entity-id])]
    (map (fn [[ts {:keys [commit data]}]]
           {:timestamp ts :commit commit :data data})
         versions)))

find-latest-before 是时序查询的核心。它利用 sorted-map 的特性,高效地找到在指定时间点之前(或恰好在该时间点)的最后一个版本。

2. API 路由

(ns git-cqrs.api
  (:require [compojure.core :refer :all]
            [compojure.route :as route]
            [ring.middleware.json :refer [wrap-json-response wrap-json-body]]
            [ring.middleware.params :refer [wrap-params]]
            [cheshire.core :as json]
            [git-cqrs.git-store :as store]
            [git-cqrs.query-service :as query]))

(defn- parse-timestamp [ts-str]
  (try
    (Long/parseLong ts-str)
    (catch Exception _
      (-> (java.time.Instant/parse ts-str)
          .toEpochMilli))))

(defn api-routes [git-store]
  (routes
    ;; 写入接口
    (POST "/:entity-type/:id" [entity-type id :as request]
      (let [body (:body request)
            author {:name (get-in request [:headers "x-author-name"] "Unknown")
                    :email (get-in request [:headers "x-author-email"] "[email protected]")}
            message (get-in request [:headers "x-commit-message"] "Default API commit")]
        (try
          (let [content (json/generate-string body)
                commit (store/commit-entity! git-store (keyword entity-type) id content author message)]
            {:status 202
             :headers {"Content-Type" "application/json"}
             :body (json/generate-string {:commitId (.getName commit)})})
          (catch Exception e
            {:status 500
             :headers {"Content-Type" "application/json"}
             :body (json/generate-string {:error "Failed to commit" :details (str e)})}))))

    ;; 查询接口
    (GET "/:entity-type/:id" [entity-type id as-of]
      (let [entity (if as-of
                     (query/get-entity-as-of (keyword entity-type) id (parse-timestamp as-of))
                     (query/get-entity-latest (keyword entity-type) id))]
        (if entity
          {:status 200
           :headers {"Content-Type" "application/json"}
           :body (json/generate-string entity)}
          {:status 404
           :body "Entity not found"})))

    (GET "/:entity-type/:id/history" [entity-type id]
      (let [history (query/get-entity-history (keyword entity-type) id)]
        {:status 200
         :headers {"Content-Type" "application/json"}
         :body (json/generate-string history)}))

    (route/not-found "Not Found")))

(defn create-app [git-store]
  (-> (api-routes git-store)
      (wrap-json-body {:keywords? true})
      wrap-json-response
      wrap-params))

;;; 主函数
(defn -main []
  (let [store (store/new-jgit-store "/var/data/git-cqrs-repo")
        projector-thread (projector/run-projector! store)
        app (create-app store)]
    (ring.adapter.jetty/run-jetty app {:port 3000 :join? false})
    (log/info "API server started on port 3000")))

API 定义清晰地分离了命令和查询。写操作(POST)与 git-store 交互,而读操作(GET)则与 query-service 交互。as-of 参数的实现直接调用了我们之前写的时序查询函数。

局限性与未来迭代方向

这个基于 Git 和 CQRS 的系统提供了一个非常强大的、具备完整审计能力的数据存储模型,但它并非银弹。在真实项目中,我们必须清楚它的边界。

首先,写入性能是主要瓶颈。对 Git 的每次提交都涉及到文件 IO 和加锁,这决定了它不适用于高吞吐量的写入场景。我们的单线程锁机制虽然保证了安全,但也限制了并发能力。一个可能的优化是引入一个写入队列,将命令批量提交到 Git。

其次,读取模型的可扩展性。目前我们使用内存中的 atom,这对于中小型数据集非常快速,但如果实体数量或历史版本非常多,会消耗大量内存。一个自然的演进方向是将投射器的数据写入一个更专业的读取存储,如 PostgreSQL、Elasticsearch 或专门的时序数据库(如 InfluxDB),这取决于具体的查询需求。

再者,Git 仓库的维护。随着时间推移,Git 仓库会变得非常庞大。需要制定策略来管理它,例如对投射器使用浅克隆(shallow clone),或者定期对旧历史进行归档和剪枝。

最后,投射器的鲁棒性。当前的轮询机制很简单,但在生产环境中需要更可靠的机制来保证事件不丢失、不重复处理。可能需要引入持久化的游标来记录 last-processed-sha,并实现更复杂的错误重试逻辑。

这个架构的真正价值在于,它为那些“审计性”和“可追溯性”高于“写入吞吐量”的场景,提供了一个成本相对较低、概念清晰且技术栈成熟的解决方案。它将开发人员最熟悉的工具之一 —— Git,创造性地应用到了数据存储领域。


  目录