从WebAuthn无密码认证到Chakra UI实时更新的全栈事件溯源架构实践


一个棘手的问题摆在面前:如何构建一个多人协作的操作平台,既要保证操作的绝对安全,又要让所有在线用户的界面实时、精准地同步状态,同时还需保留一份不可篡改的完整操作审计日志。传统的 CRUD + WebSocket 方案能勉强应付,但在真实项目中,状态同步的复杂性、数据冲突的解决、以及审计日志的可靠性很快会成为维护的噩梦。尤其是在金融或运维这类高风险领域,一个操作的指令、执行过程、最终结果必须被精确记录。

我们决定彻底放弃传统的状态模型,转而拥抱事件溯源(Event Sourcing)。系统的核心不再是当前状态,而是一系列不可变的领域事件(Domain Events)。状态只是这些事件按顺序聚合的结果。这个决定引出了一系列新的技术挑战:如何安全地接收产生事件的指令(Command)?如何高效地将事件推送给前端?前端又该如何响应式地处理这些事件流?

这便是我们这次技术实践的起点。我们将构建一个端到端的解决方案,涵盖从前端到后端的完整流程:

  1. 认证层: 采用 WebAuthn 实现无密码认证,确保指令来源的安全。
  2. 网关层: 构建一个轻量级 API Gateway,负责处理指令、管理 SSE 连接。
  3. 核心逻辑层: 实现一个基础的 Event Sourcing 核心,包括 Command、Event 和 Aggregate。
  4. 推送层: 使用 Server-Sent Events (SSE) 将事件流实时推送到客户端。
  5. 展现层: 利用 Chakra UI 和 React Hooks 优雅地消费事件流,构建一个响应式的用户界面。

架构蓝图

在深入代码之前,我们先用流程图来定义整个系统的数据流和组件交互。这有助于理清思路,尤其是在处理这种跨越多个技术栈的复杂场景时。

sequenceDiagram
    participant User as 用户
    participant Frontend as Chakra UI 前端
    participant Gateway as Node.js API Gateway
    participant EventStore as 事件存储
    participant Auth as WebAuthn 服务

    User->>Frontend: 1. 触发登录/操作
    Frontend->>Auth: 2. 请求 WebAuthn Challenge
    Auth-->>Frontend: 3. 返回 Challenge
    User->>Frontend: 4. 使用生物识别签名
    Frontend->>Auth: 5. 发送签名后的 Challenge
    Auth-->>Frontend: 6. 验证成功,下发 JWT
    Frontend->>Frontend: 7. 存储 JWT

    Note over User, EventStore: 用户已登录,开始执行操作

    User->>Frontend: 8. 点击“部署服务”按钮
    Frontend->>Gateway: 9. 发起 POST /commands (携带JWT和Command)
    Gateway->>Gateway: 10. 验证 JWT
    Gateway->>EventStore: 11. 执行 Command, 生成 Events
    EventStore->>EventStore: 12. 持久化 Events
    EventStore-->>Gateway: 13. 返回成功
    Gateway->>Gateway: 14. 将新 Events 广播给所有 SSE 连接
    Gateway-->>Frontend: 15. 通过已建立的 SSE 连接推送 Event
    Frontend->>Frontend: 16. 接收 Event, 更新本地状态
    User->>Frontend: 17. 界面实时更新

这个流程清晰地展示了认证、指令处理和实时更新的分离。WebAuthn 负责入口安全,API Gateway 是指令和事件的中枢,Event Store 是事实的唯一来源,而 SSE 则是连接后端事实与前端展现的管道。

核心实现:事件溯源与 API 网关

我们的后端将使用 Node.js 和 Express 构建。它不仅是 API Gateway,也包含了事件溯源的核心逻辑。

1. 事件存储 (Event Store)

在生产环境中,事件存储通常会使用专门的数据库如 EventStoreDB 或基于 PostgreSQL/Kafka 构建。为了聚焦核心逻辑,我们先在内存中实现一个简单的 Event Store。这里的关键在于,它只允许追加事件,绝不允许修改或删除。

src/event-store.js

// src/event-store.js

import { EventEmitter } from 'events';

// 一个非常基础的内存事件存储,用于演示
// 生产环境应替换为持久化存储,如PostgreSQL, Kafka, or EventStoreDB
class InMemoryEventStore {
    constructor() {
        this.events = []; // 存储所有事件的数组
        this.emitter = new EventEmitter(); // 用于在事件保存后发出通知
    }

    /**
     * 保存事件到存储中
     * @param {string} aggregateId - 聚合ID
     * @param {Array<object>} events - 要保存的事件数组
     * @param {number} expectedVersion - 预期的聚合版本号,用于乐观并发控制
     * @throws {Error} 如果版本冲突
     */
    saveEvents(aggregateId, events, expectedVersion) {
        const currentVersion = this.getAggregateVersion(aggregateId);

        // 简单的乐观并发检查
        if (currentVersion !== expectedVersion) {
            console.error(`Concurrency error for aggregate ${aggregateId}. Expected version ${expectedVersion}, but found ${currentVersion}.`);
            throw new Error('Concurrency conflict: Aggregate has been modified by another operation.');
        }
        
        // 为每个事件附加元数据
        let version = currentVersion;
        for (const event of events) {
            version++;
            event.meta = {
                aggregateId,
                version,
                timestamp: new Date().toISOString()
            };
            this.events.push(event);
        }

        // 成功保存后,通过事件发射器通知监听者
        // 这是将事件推送到SSE的关键钩子
        this.emitter.emit('events_saved', events);
        console.log(`[EventStore] Saved ${events.length} events for aggregate ${aggregateId}. New version is ${version}.`);
    }

    /**
     * 根据聚合ID获取事件历史
     * @param {string} aggregateId 
     * @returns {Array<object>}
     */
    getEventsForAggregate(aggregateId) {
        return this.events.filter(e => e.meta.aggregateId === aggregateId);
    }

    /**
     * 获取指定聚合的当前版本号
     * @param {string} aggregateId 
     * @returns {number}
     */
    getAggregateVersion(aggregateId) {
        const aggregateEvents = this.getEventsForAggregate(aggregateId);
        if (aggregateEvents.length === 0) {
            return 0; // 如果没有事件,版本号为0
        }
        return aggregateEvents[aggregateEvents.length - 1].meta.version;
    }

    /**
     * 订阅事件保存的通知
     * @param {function} listener 
     */
    subscribe(listener) {
        this.emitter.on('events_saved', listener);
    }
}

// 使用单例模式,确保整个应用共享一个事件存储实例
export const eventStore = new InMemoryEventStore();

这个实现包含了几个关键点:

  • 乐观并发控制: expectedVersion 参数是实现乐观锁的关键。在读取一个聚合、处理指令、再保存新事件的整个过程中,如果聚合的版本发生了变化,意味着有其他操作捷足先登,本次操作必须失败并重试。
  • 事件发射器: EventEmitter 是解耦的关键。当事件被成功保存后,它会发出一个 events_saved 通知。我们的 SSE 服务将监听这个通知,以便将新事件广播出去。

2. 聚合 (Aggregate) 与指令 (Command)

我们以一个“服务部署”场景为例。Deployment 是我们的聚合根,它封装了状态和业务逻辑。

src/deployment-aggregate.js

// src/deployment-aggregate.js

import { eventStore } from './event-store.js';

// 定义事件类型
const EventTypes = {
    DEPLOYMENT_STARTED: 'DeploymentStarted',
    DEPLOYMENT_SUCCEEDED: 'DeploymentSucceeded',
    DEPLOYMENT_FAILED: 'DeploymentFailed',
};

class DeploymentAggregate {
    constructor(id) {
        this.id = id;
        this.version = 0;
        this.status = 'Pending';
        this.serviceName = null;
        this.deployedBy = null;
        this.history = [];
    }

    // 内部方法,根据事件更新聚合状态
    _apply(event) {
        switch (event.type) {
            case EventTypes.DEPLOYMENT_STARTED:
                this.status = 'InProgress';
                this.serviceName = event.payload.serviceName;
                this.deployedBy = event.payload.user;
                break;
            case EventTypes.DEPLOYMENT_SUCCEEDED:
                this.status = 'Succeeded';
                break;
            case EventTypes.DEPLOYMENT_FAILED:
                this.status = 'Failed';
                this.history.push({ status: 'Failed', reason: event.payload.reason });
                break;
        }
        this.version = event.meta.version;
        this.history.push({ status: this.status, timestamp: event.meta.timestamp });
    }

    // 从事件历史中重建聚合状态
    static rehydrate(id, events) {
        const aggregate = new DeploymentAggregate(id);
        events.forEach(event => aggregate._apply(event));
        return aggregate;
    }

    // --- Command Handlers ---

    // 处理“开始部署”指令
    startDeployment(command) {
        if (this.status !== 'Pending') {
            throw new Error('Deployment has already been started.');
        }
        
        // 指令验证成功后,生成事件
        const event = {
            type: EventTypes.DEPLOYMENT_STARTED,
            payload: {
                serviceName: command.serviceName,
                user: command.user,
            },
        };

        return [event]; // 返回一个事件数组
    }
    
    // 处理“部署成功”指令
    succeedDeployment() {
        if (this.status !== 'InProgress') {
            throw new Error('Deployment is not in progress.');
        }

        const event = { type: EventTypes.DEPLOYMENT_SUCCEEDED, payload: {} };
        return [event];
    }
    
    // 处理“部署失败”指令
    failDeployment(command) {
        if (this.status !== 'InProgress') {
            throw new Error('Deployment is not in progress.');
        }

        const event = { type: EventTypes.DEPLOYMENT_FAILED, payload: { reason: command.reason } };
        return [event];
    }
}

// Command Handler Function - 这是与外界交互的入口
export function handleDeploymentCommand(command) {
    // 1. 从事件存储加载历史事件
    const events = eventStore.getEventsForAggregate(command.aggregateId);
    
    // 2. 重建聚合的当前状态
    const deployment = DeploymentAggregate.rehydrate(command.aggregateId, events);
    
    let newEvents;
    // 3. 根据指令类型调用相应的处理方法
    switch (command.type) {
        case 'StartDeployment':
            newEvents = deployment.startDeployment(command);
            break;
        case 'SucceedDeployment':
            newEvents = deployment.succeedDeployment();
            break;
        case 'FailDeployment':
            newEvents = deployment.failDeployment(command);
            break;
        default:
            throw new Error(`Unknown command type: ${command.type}`);
    }

    // 4. 将新生成的事件保存到事件存储
    // 传递当前版本号用于并发控制
    if (newEvents && newEvents.length > 0) {
        eventStore.saveEvents(command.aggregateId, newEvents, deployment.version);
    }
}

这个聚合的实现遵循了 Event Sourcing 的核心原则:

  • 状态由事件改变: 聚合的状态只能通过 _apply 方法在内部根据事件来修改。
  • 指令生成事件: 公开的指令处理方法 (startDeployment 等) 负责验证业务规则,然后生成事件,但它们自身不改变状态。
  • 加载与重建: handleDeploymentCommand 函数展示了标准流程:加载历史 -> 重建状态 -> 执行指令 -> 保存新事件。

3. API Gateway: 指令端点与 SSE 端点

现在我们来构建 Express 服务器,它将承载我们的 API Gateway。

src/server.js

// src/server.js
import express from 'express';
import cors from 'cors';
import bodyParser from 'body-parser';
import { v4 as uuidv4 } from 'uuid';
import { handleDeploymentCommand } from './deployment-aggregate.js';
import { eventStore } from './event-store.js';
import { setupAuthRoutes } from './auth.js'; // WebAuthn路由,稍后实现
import { authMiddleware } from './auth.js'; // JWT验证中间件

const app = express();
const PORT = process.env.PORT || 4000;

app.use(cors());
app.use(bodyParser.json());

// --- WebAuthn 认证路由 ---
// 这个函数会添加 /auth/register-challenge, /auth/register 等路由
setupAuthRoutes(app);

// --- SSE (Server-Sent Events) ---
let clients = [];

// 事件监听器,当有新事件保存时,广播给所有连接的客户端
eventStore.subscribe((events) => {
    console.log(`[SSE] Broadcasting ${events.length} new events to ${clients.length} clients.`);
    const eventData = `data: ${JSON.stringify(events)}\n\n`;
    clients.forEach(client => client.res.write(eventData));
});

app.get('/events', authMiddleware, (req, res) => {
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');
    res.flushHeaders(); // 立刻发送头部信息

    const clientId = uuidv4();
    const newClient = { id: clientId, res };
    clients.push(newClient);
    console.log(`[SSE] Client ${clientId} connected. Total clients: ${clients.length}`);
    
    // 当客户端关闭连接时,将其从列表中移除
    req.on('close', () => {
        clients = clients.filter(client => client.id !== clientId);
        console.log(`[SSE] Client ${clientId} disconnected. Total clients: ${clients.length}`);
    });
});


// --- Command Endpoint ---
app.post('/commands', authMiddleware, (req, res) => {
    try {
        const command = req.body;
        // 在真实项目中,user应该从JWT中解析出来,确保安全
        command.user = req.user.username; 
        
        console.log(`[Command] Received command: ${command.type} for aggregate ${command.aggregateId}`);
        handleDeploymentCommand(command);
        res.status(202).json({ message: 'Command accepted.' });
    } catch (error) {
        console.error(`[Command] Error processing command: ${error.message}`);
        // 区分业务错误和系统错误
        if (error.message.includes('Concurrency conflict')) {
            res.status(409).json({ error: error.message }); // 409 Conflict
        } else {
            res.status(400).json({ error: error.message }); // 400 Bad Request for business rule violations
        }
    }
});

app.listen(PORT, () => {
    console.log(`Server is running on http://localhost:${PORT}`);
});

这个服务器是整个系统的中枢:

  • /events 端点: 这是 SSE 的核心。它维持一个长连接,并将客户端的 response 对象保存在 clients 数组中。当 eventStore 发出通知时,它会遍历所有客户端并写入格式化的事件数据。一个常见的坑是忘记设置正确的 HTTP Headers,特别是 Content-Type: text/event-stream
  • /commands 端点: 这是接收指令的入口。它受到 authMiddleware 的保护,确保只有经过认证的用户才能提交指令。它将请求体中的指令传递给聚合的命令处理器,并处理可能发生的业务异常或并发冲突。
  • 客户端管理: 当前的 clients 数组实现很简单。在生产环境中,如果 API Gateway 是多实例部署的,这里必须使用外部消息中间件(如 Redis Pub/Sub)来广播事件,否则一个实例产生的事件无法通知到连接在其他实例上的客户端。

安全基石:WebAuthn 无密码认证

在高风险操作平台,密码认证的弱点是不可接受的。WebAuthn (FIDO2) 通过公钥加密提供了防钓鱼、防数据泄露的强大认证机制。我们使用 simple-webauthn 库来简化实现。

src/auth.js

// src/auth.js
import {
    generateRegistrationOptions,
    verifyRegistrationResponse,
    generateAuthenticationOptions,
    verifyAuthenticationResponse,
} from '@simplewebauthn/server';
import jwt from 'jsonwebtoken';

// 简单内存存储用户和认证器
// 生产环境应使用数据库
const users = {}; 
const JWT_SECRET = 'a-very-secret-key-for-our-project';
const RP_ID = 'localhost'; // Relying Party ID,必须与前端域名匹配
const RP_NAME = 'Realtime Event Sourcing Demo';

// 模拟数据库操作
const db = {
    getUser: (username) => users[username],
    saveUser: (user) => { users[user.username] = user; },
    getAuthenticators: (username) => users[username]?.authenticators || [],
    saveAuthenticator: (username, authenticator) => {
        if (!users[username].authenticators) {
            users[username].authenticators = [];
        }
        users[username].authenticators.push(authenticator);
    }
};

export function setupAuthRoutes(app) {
    // 1. 生成注册选项
    app.post('/auth/register-challenge', (req, res) => {
        const { username } = req.body;
        if (!username || db.getUser(username)) {
            return res.status(400).json({ error: 'Username is required or already exists.' });
        }
        
        db.saveUser({ username, authenticators: [] });

        const options = generateRegistrationOptions({
            rpName: RP_NAME,
            rpID: RP_ID,
            userName: username,
            attestationType: 'none',
        });
        
        // 挑战需要临时保存,以便后续验证
        users[username].currentChallenge = options.challenge;
        res.json(options);
    });

    // 2. 验证注册响应
    app.post('/auth/register', async (req, res) => {
        const { username, response } = req.body;
        const user = db.getUser(username);
        if (!user) return res.status(404).send('User not found');

        try {
            const verification = await verifyRegistrationResponse({
                response,
                expectedChallenge: user.currentChallenge,
                expectedOrigin: 'http://localhost:3000',
                expectedRPID: RP_ID,
            });

            if (verification.verified) {
                db.saveAuthenticator(username, verification.registrationInfo);
            }
            res.json({ verified: verification.verified });
        } catch (error) {
            res.status(400).json({ error: error.message });
        }
    });

    // 3. 生成认证选项
    app.post('/auth/login-challenge', (req, res) => {
        const { username } = req.body;
        const user = db.getUser(username);
        if (!user) return res.status(404).json({ error: 'User not found.' });

        const options = generateAuthenticationOptions({
            allowCredentials: user.authenticators.map(auth => ({
                id: auth.credentialID,
                type: 'public-key',
            })),
        });

        user.currentChallenge = options.challenge;
        res.json(options);
    });

    // 4. 验证认证响应
    app.post('/auth/login', async (req, res) => {
        const { username, response } = req.body;
        const user = db.getUser(username);
        if (!user) return res.status(404).send('User not found');

        const authenticator = user.authenticators.find(
            auth => auth.credentialID.toString('base64url') === response.id
        );
        if (!authenticator) return res.status(400).send('Authenticator not found');

        try {
            const verification = await verifyAuthenticationResponse({
                response,
                expectedChallenge: user.currentChallenge,
                expectedOrigin: 'http://localhost:3000',
                expectedRPID: RP_ID,
                authenticator,
            });

            if (verification.verified) {
                const token = jwt.sign({ username }, JWT_SECRET, { expiresIn: '1h' });
                res.json({ verified: true, token });
            } else {
                res.status(401).json({ verified: false });
            }
        } catch (error) {
            res.status(400).json({ error: error.message });
        }
    });
}

// JWT 验证中间件
export function authMiddleware(req, res, next) {
    const authHeader = req.headers.authorization;
    if (!authHeader || !authHeader.startsWith('Bearer ')) {
        return res.sendStatus(401);
    }

    const token = authHeader.split(' ')[1];
    jwt.verify(token, JWT_SECRET, (err, user) => {
        if (err) {
            return res.sendStatus(403);
        }
        req.user = user;
        next();
    });
}

WebAuthn 的流程虽然多步,但逻辑是固定的:challenge -> response -> verification。成功登录后,我们签发一个标准的 JWT,后续的 /commands/events 端点都将使用这个 JWT 进行身份验证和授权。这是一个常见的错误点:开发者有时会忘记保护 SSE 连接,导致未经授权的用户也能订阅到敏感的事件流。

前端实现:Chakra UI 与实时事件消费

前端使用 React 和 Chakra UI。Chakra UI 提供了高质量的组件和优秀的开发体验,而 React Hooks (useState, useEffect, useReducer) 则是管理和响应事件流的完美工具。

1. 前端状态与事件处理

我们将使用 useReducer 来管理前端的状态。useReducer 非常适合事件驱动的场景,因为 reducer 函数本身就是一个状态转换机,它的逻辑 (state, action) => newState 与我们聚合中的 (state, event) => newState 思想如出一辙。

src/App.js

// src/App.js
import React, { useEffect, useReducer, createContext, useContext } from 'react';
import { ChakraProvider, Box, Heading, VStack } from '@chakra-ui/react';
import { AuthComponent } from './components/AuthComponent';
import { DeploymentDashboard } from './components/DeploymentDashboard';
import { apiClient } from './apiClient';

// 状态管理
const initialState = {
    isAuthenticated: false,
    user: null,
    token: null,
    deployments: {}, // 使用对象存储,以ID为键
};

function appReducer(state, action) {
    switch (action.type) {
        case 'LOGIN_SUCCESS':
            return {
                ...state,
                isAuthenticated: true,
                user: action.payload.user,
                token: action.payload.token,
            };
        case 'LOGOUT':
            return { ...initialState };
        case 'EVENTS_RECEIVED':
            // 这是一个关键的reducer,用于处理从SSE收到的事件
            const newDeployments = { ...state.deployments };
            for (const event of action.payload.events) {
                const { aggregateId } = event.meta;
                const existing = newDeployments[aggregateId] || { id: aggregateId, status: 'Pending', history: [] };

                // 根据事件类型更新本地投影
                switch (event.type) {
                    case 'DeploymentStarted':
                        existing.status = 'InProgress';
                        existing.serviceName = event.payload.serviceName;
                        existing.deployedBy = event.payload.user;
                        break;
                    case 'DeploymentSucceeded':
                        existing.status = 'Succeeded';
                        break;
                    case 'DeploymentFailed':
                        existing.status = 'Failed';
                        existing.reason = event.payload.reason;
                        break;
                    default: break;
                }
                existing.history.push({ status: existing.status, timestamp: event.meta.timestamp });
                newDeployments[aggregateId] = existing;
            }
            return { ...state, deployments: newDeployments };
        default:
            return state;
    }
}

const AppContext = createContext();
export const useAppContext = () => useContext(AppContext);

function App() {
    const [state, dispatch] = useReducer(appReducer, initialState);

    // 设置API客户端的认证token
    useEffect(() => {
        apiClient.defaults.headers.common['Authorization'] = state.token ? `Bearer ${state.token}` : '';
    }, [state.token]);

    // 核心:建立和管理SSE连接
    useEffect(() => {
        if (!state.isAuthenticated) return;

        console.log('[SSE] Attempting to connect...');
        const eventSource = new EventSource(`http://localhost:4000/events?token=${state.token}`);

        eventSource.onopen = () => {
            console.log('[SSE] Connection opened.');
        };

        eventSource.onmessage = (event) => {
            const parsedEvents = JSON.parse(event.data);
            console.log('[SSE] Events received:', parsedEvents);
            dispatch({ type: 'EVENTS_RECEIVED', payload: { events: parsedEvents } });
        };

        eventSource.onerror = (err) => {
            console.error('[SSE] Connection error:', err);
            eventSource.close();
        };

        // 组件卸载时关闭连接
        return () => {
            console.log('[SSE] Closing connection.');
            eventSource.close();
        };
    }, [state.isAuthenticated, state.token]);


    return (
        <ChakraProvider>
            <AppContext.Provider value={{ state, dispatch }}>
                <Box p={8}>
                    <VStack spacing={8}>
                        <Heading>Real-time Deployment Dashboard</Heading>
                        {!state.isAuthenticated ? (
                            <AuthComponent />
                        ) : (
                            <DeploymentDashboard />
                        )}
                    </VStack>
                </Box>
            </AppContext.Provider>
        </ChakraProvider>
    );
}

export default App;

前端应用的核心逻辑:

  • Reducer as Projection: appReducer 担当了客户端的“投影”(Projection)角色。它接收来自后端的事件,并将它们应用到本地的状态树上,从而构建出一个只读的视图模型 (state.deployments)。这本质上是 CQRS 模式在前端的体现。
  • SSE 连接管理: useEffect hook 完美地处理了 SSE 连接的生命周期。它在用户认证后建立连接,并在用户登出或组件卸载时断开连接。一个需要注意的细节是,我们直接将 token 作为查询参数传递,因为 EventSource API 不支持设置自定义 Headers。因此,后端需要调整 authMiddleware 来同时从 Header 和查询参数中读取 token。

2. UI 组件:认证与仪表盘

AuthComponent 会使用 @simplewebauthn/browser 库来与浏览器和安全密钥交互,其代码与库的官方示例类似,主要负责调用 API 和处理浏览器的 navigator.credentials 调用。

我们重点看 DeploymentDashboard,它负责展示状态并发送指令。

src/components/DeploymentDashboard.js

// src/components/DeploymentDashboard.js
import React, { useState } from 'react';
import { useAppContext } from '../App';
import {
    Box, Button, VStack, HStack, Text, Input,
    Table, Thead, Tbody, Tr, Th, Td, Tag,
    useToast
} from '@chakra-ui/react';
import { v4 as uuidv4 } from 'uuid';
import { apiClient } from '../apiClient';

export function DeploymentDashboard() {
    const { state, dispatch } = useAppContext();
    const [serviceName, setServiceName] = useState('');
    const toast = useToast();

    const handleStartDeployment = async () => {
        if (!serviceName) {
            toast({ title: 'Service name is required', status: 'warning', duration: 3000, isClosable: true });
            return;
        }

        const command = {
            aggregateId: uuidv4(), // 为新部署生成唯一ID
            type: 'StartDeployment',
            serviceName: serviceName,
        };

        try {
            await apiClient.post('/commands', command);
            toast({ title: 'Deployment command sent', status: 'success', duration: 2000 });
            setServiceName('');
        } catch (error) {
            toast({ title: 'Failed to send command', description: error.response?.data?.error || error.message, status: 'error' });
        }
    };
    
    // 模拟部署成功或失败
    const simulateOutcome = async (aggregateId, success) => {
        const command = {
            aggregateId,
            type: success ? 'SucceedDeployment' : 'FailDeployment',
            ...(success ? {} : { reason: 'Simulated infrastructure failure.' })
        };
        try {
            await apiClient.post('/commands', command);
        } catch (error) {
             toast({ title: 'Failed to simulate outcome', description: error.response?.data?.error || error.message, status: 'error' });
        }
    }

    const deployments = Object.values(state.deployments);

    return (
        <VStack spacing={6} align="stretch" w="100%">
            <Box p={5} shadow="md" borderWidth="1px">
                <HStack>
                    <Input 
                        placeholder="Enter service name (e.g., user-service)"
                        value={serviceName}
                        onChange={(e) => setServiceName(e.target.value)}
                    />
                    <Button colorScheme="blue" onClick={handleStartDeployment}>Start New Deployment</Button>
                    <Button onClick={() => dispatch({ type: 'LOGOUT' })}>Logout</Button>
                </HStack>
            </Box>

            <Table variant="simple">
                <Thead>
                    <Tr>
                        <Th>Service Name</Th>
                        <Th>Status</Th>
                        <Th>Deployed By</Th>
                        <Th>Actions</Th>
                    </Tr>
                </Thead>
                <Tbody>
                    {deployments.map(dep => (
                        <Tr key={dep.id}>
                            <Td>{dep.serviceName || 'N/A'}</Td>
                            <Td>
                                <Tag colorScheme={
                                    { InProgress: 'yellow', Succeeded: 'green', Failed: 'red', Pending: 'gray' }[dep.status]
                                }>{dep.status}</Tag>
                            </Td>
                            <Td>{dep.deployedBy}</Td>
                            <Td>
                                {dep.status === 'InProgress' && (
                                    <HStack>
                                        <Button size="sm" colorScheme="green" onClick={() => simulateOutcome(dep.id, true)}>Simulate Success</Button>
                                        <Button size="sm" colorScheme="red" onClick={() => simulateOutcome(dep.id, false)}>Simulate Fail</Button>
                                    </HStack>
                                )}
                            </Td>
                        </Tr>
                    ))}
                </Tbody>
            </Table>
        </VStack>
    );
}

这个组件是纯粹的声明式 UI。它从 AppContext 中读取 deployments 状态并渲染成表格。用户的操作(如点击按钮)会转化为一个 Command 对象,通过 apiClient 发送到后端的 /commands 端点。它自身不直接修改状态,状态的唯一来源是 SSE 推送的事件,由顶层的 App 组件处理。这就是单向数据流的美妙之处:UI 变得非常可预测。

方案局限性与未来展望

我们构建的这个系统虽然验证了核心架构,但在走向生产环境之前,仍有几个关键问题需要解决:

  1. 事件存储的持久化: 内存存储仅用于演示。必须替换为真正的持久化方案。使用 PostgreSQL 将事件存储为 JSONB 字段是一种常见且可靠的选择,配合表分区和索引能获得不错的性能。
  2. 快照(Snapshotting): 对于生命周期很长的聚合,每次都从头加载所有事件会变得非常慢。引入快照机制——定期将聚合的当前状态序列化并存储起来——是必要的性能优化。加载时,只需加载最新的快照和之后发生的事件即可。
  3. 读模型(Read Model): 当前前端的投影是实时在内存中计算的。对于复杂的查询需求(例如,按用户查询所有部署、统计成功率等),后端需要构建专门的、持久化的读模型。这通常通过一个独立的订阅者服务实现,它监听事件流并将数据写入一个为查询优化的数据库(如 Elasticsearch 或关系型数据库)。
  4. API Gateway 扩展性: 我们的 Node.js 服务器承载了太多职责。在微服务架构中,WebAuthn 认证、指令处理、SSE 事件推送可能会被拆分成不同的服务。API Gateway 则专注于路由、限流和认证,而后端服务之间通过消息队列(如 Kafka)进行通信,以实现更好的解耦和弹性。
  5. 事件 Schema 管理: 随着业务发展,事件的结构会发生变化。需要一套完整的 Schema 版本管理和迁移策略(类似 Avro 或 Protobuf),以确保新旧版本的消费者都能正确处理事件。

  目录