一个棘手的问题摆在面前:如何构建一个多人协作的操作平台,既要保证操作的绝对安全,又要让所有在线用户的界面实时、精准地同步状态,同时还需保留一份不可篡改的完整操作审计日志。传统的 CRUD + WebSocket 方案能勉强应付,但在真实项目中,状态同步的复杂性、数据冲突的解决、以及审计日志的可靠性很快会成为维护的噩梦。尤其是在金融或运维这类高风险领域,一个操作的指令、执行过程、最终结果必须被精确记录。
我们决定彻底放弃传统的状态模型,转而拥抱事件溯源(Event Sourcing)。系统的核心不再是当前状态,而是一系列不可变的领域事件(Domain Events)。状态只是这些事件按顺序聚合的结果。这个决定引出了一系列新的技术挑战:如何安全地接收产生事件的指令(Command)?如何高效地将事件推送给前端?前端又该如何响应式地处理这些事件流?
这便是我们这次技术实践的起点。我们将构建一个端到端的解决方案,涵盖从前端到后端的完整流程:
- 认证层: 采用 WebAuthn 实现无密码认证,确保指令来源的安全。
- 网关层: 构建一个轻量级 API Gateway,负责处理指令、管理 SSE 连接。
- 核心逻辑层: 实现一个基础的 Event Sourcing 核心,包括 Command、Event 和 Aggregate。
- 推送层: 使用 Server-Sent Events (SSE) 将事件流实时推送到客户端。
- 展现层: 利用 Chakra UI 和 React Hooks 优雅地消费事件流,构建一个响应式的用户界面。
架构蓝图
在深入代码之前,我们先用流程图来定义整个系统的数据流和组件交互。这有助于理清思路,尤其是在处理这种跨越多个技术栈的复杂场景时。
sequenceDiagram participant User as 用户 participant Frontend as Chakra UI 前端 participant Gateway as Node.js API Gateway participant EventStore as 事件存储 participant Auth as WebAuthn 服务 User->>Frontend: 1. 触发登录/操作 Frontend->>Auth: 2. 请求 WebAuthn Challenge Auth-->>Frontend: 3. 返回 Challenge User->>Frontend: 4. 使用生物识别签名 Frontend->>Auth: 5. 发送签名后的 Challenge Auth-->>Frontend: 6. 验证成功,下发 JWT Frontend->>Frontend: 7. 存储 JWT Note over User, EventStore: 用户已登录,开始执行操作 User->>Frontend: 8. 点击“部署服务”按钮 Frontend->>Gateway: 9. 发起 POST /commands (携带JWT和Command) Gateway->>Gateway: 10. 验证 JWT Gateway->>EventStore: 11. 执行 Command, 生成 Events EventStore->>EventStore: 12. 持久化 Events EventStore-->>Gateway: 13. 返回成功 Gateway->>Gateway: 14. 将新 Events 广播给所有 SSE 连接 Gateway-->>Frontend: 15. 通过已建立的 SSE 连接推送 Event Frontend->>Frontend: 16. 接收 Event, 更新本地状态 User->>Frontend: 17. 界面实时更新
这个流程清晰地展示了认证、指令处理和实时更新的分离。WebAuthn 负责入口安全,API Gateway 是指令和事件的中枢,Event Store 是事实的唯一来源,而 SSE 则是连接后端事实与前端展现的管道。
核心实现:事件溯源与 API 网关
我们的后端将使用 Node.js 和 Express 构建。它不仅是 API Gateway,也包含了事件溯源的核心逻辑。
1. 事件存储 (Event Store)
在生产环境中,事件存储通常会使用专门的数据库如 EventStoreDB 或基于 PostgreSQL/Kafka 构建。为了聚焦核心逻辑,我们先在内存中实现一个简单的 Event Store。这里的关键在于,它只允许追加事件,绝不允许修改或删除。
src/event-store.js
// src/event-store.js
import { EventEmitter } from 'events';
// 一个非常基础的内存事件存储,用于演示
// 生产环境应替换为持久化存储,如PostgreSQL, Kafka, or EventStoreDB
class InMemoryEventStore {
constructor() {
this.events = []; // 存储所有事件的数组
this.emitter = new EventEmitter(); // 用于在事件保存后发出通知
}
/**
* 保存事件到存储中
* @param {string} aggregateId - 聚合ID
* @param {Array<object>} events - 要保存的事件数组
* @param {number} expectedVersion - 预期的聚合版本号,用于乐观并发控制
* @throws {Error} 如果版本冲突
*/
saveEvents(aggregateId, events, expectedVersion) {
const currentVersion = this.getAggregateVersion(aggregateId);
// 简单的乐观并发检查
if (currentVersion !== expectedVersion) {
console.error(`Concurrency error for aggregate ${aggregateId}. Expected version ${expectedVersion}, but found ${currentVersion}.`);
throw new Error('Concurrency conflict: Aggregate has been modified by another operation.');
}
// 为每个事件附加元数据
let version = currentVersion;
for (const event of events) {
version++;
event.meta = {
aggregateId,
version,
timestamp: new Date().toISOString()
};
this.events.push(event);
}
// 成功保存后,通过事件发射器通知监听者
// 这是将事件推送到SSE的关键钩子
this.emitter.emit('events_saved', events);
console.log(`[EventStore] Saved ${events.length} events for aggregate ${aggregateId}. New version is ${version}.`);
}
/**
* 根据聚合ID获取事件历史
* @param {string} aggregateId
* @returns {Array<object>}
*/
getEventsForAggregate(aggregateId) {
return this.events.filter(e => e.meta.aggregateId === aggregateId);
}
/**
* 获取指定聚合的当前版本号
* @param {string} aggregateId
* @returns {number}
*/
getAggregateVersion(aggregateId) {
const aggregateEvents = this.getEventsForAggregate(aggregateId);
if (aggregateEvents.length === 0) {
return 0; // 如果没有事件,版本号为0
}
return aggregateEvents[aggregateEvents.length - 1].meta.version;
}
/**
* 订阅事件保存的通知
* @param {function} listener
*/
subscribe(listener) {
this.emitter.on('events_saved', listener);
}
}
// 使用单例模式,确保整个应用共享一个事件存储实例
export const eventStore = new InMemoryEventStore();
这个实现包含了几个关键点:
- 乐观并发控制:
expectedVersion
参数是实现乐观锁的关键。在读取一个聚合、处理指令、再保存新事件的整个过程中,如果聚合的版本发生了变化,意味着有其他操作捷足先登,本次操作必须失败并重试。 - 事件发射器:
EventEmitter
是解耦的关键。当事件被成功保存后,它会发出一个events_saved
通知。我们的 SSE 服务将监听这个通知,以便将新事件广播出去。
2. 聚合 (Aggregate) 与指令 (Command)
我们以一个“服务部署”场景为例。Deployment
是我们的聚合根,它封装了状态和业务逻辑。
src/deployment-aggregate.js
// src/deployment-aggregate.js
import { eventStore } from './event-store.js';
// 定义事件类型
const EventTypes = {
DEPLOYMENT_STARTED: 'DeploymentStarted',
DEPLOYMENT_SUCCEEDED: 'DeploymentSucceeded',
DEPLOYMENT_FAILED: 'DeploymentFailed',
};
class DeploymentAggregate {
constructor(id) {
this.id = id;
this.version = 0;
this.status = 'Pending';
this.serviceName = null;
this.deployedBy = null;
this.history = [];
}
// 内部方法,根据事件更新聚合状态
_apply(event) {
switch (event.type) {
case EventTypes.DEPLOYMENT_STARTED:
this.status = 'InProgress';
this.serviceName = event.payload.serviceName;
this.deployedBy = event.payload.user;
break;
case EventTypes.DEPLOYMENT_SUCCEEDED:
this.status = 'Succeeded';
break;
case EventTypes.DEPLOYMENT_FAILED:
this.status = 'Failed';
this.history.push({ status: 'Failed', reason: event.payload.reason });
break;
}
this.version = event.meta.version;
this.history.push({ status: this.status, timestamp: event.meta.timestamp });
}
// 从事件历史中重建聚合状态
static rehydrate(id, events) {
const aggregate = new DeploymentAggregate(id);
events.forEach(event => aggregate._apply(event));
return aggregate;
}
// --- Command Handlers ---
// 处理“开始部署”指令
startDeployment(command) {
if (this.status !== 'Pending') {
throw new Error('Deployment has already been started.');
}
// 指令验证成功后,生成事件
const event = {
type: EventTypes.DEPLOYMENT_STARTED,
payload: {
serviceName: command.serviceName,
user: command.user,
},
};
return [event]; // 返回一个事件数组
}
// 处理“部署成功”指令
succeedDeployment() {
if (this.status !== 'InProgress') {
throw new Error('Deployment is not in progress.');
}
const event = { type: EventTypes.DEPLOYMENT_SUCCEEDED, payload: {} };
return [event];
}
// 处理“部署失败”指令
failDeployment(command) {
if (this.status !== 'InProgress') {
throw new Error('Deployment is not in progress.');
}
const event = { type: EventTypes.DEPLOYMENT_FAILED, payload: { reason: command.reason } };
return [event];
}
}
// Command Handler Function - 这是与外界交互的入口
export function handleDeploymentCommand(command) {
// 1. 从事件存储加载历史事件
const events = eventStore.getEventsForAggregate(command.aggregateId);
// 2. 重建聚合的当前状态
const deployment = DeploymentAggregate.rehydrate(command.aggregateId, events);
let newEvents;
// 3. 根据指令类型调用相应的处理方法
switch (command.type) {
case 'StartDeployment':
newEvents = deployment.startDeployment(command);
break;
case 'SucceedDeployment':
newEvents = deployment.succeedDeployment();
break;
case 'FailDeployment':
newEvents = deployment.failDeployment(command);
break;
default:
throw new Error(`Unknown command type: ${command.type}`);
}
// 4. 将新生成的事件保存到事件存储
// 传递当前版本号用于并发控制
if (newEvents && newEvents.length > 0) {
eventStore.saveEvents(command.aggregateId, newEvents, deployment.version);
}
}
这个聚合的实现遵循了 Event Sourcing 的核心原则:
- 状态由事件改变: 聚合的状态只能通过
_apply
方法在内部根据事件来修改。 - 指令生成事件: 公开的指令处理方法 (
startDeployment
等) 负责验证业务规则,然后生成事件,但它们自身不改变状态。 - 加载与重建:
handleDeploymentCommand
函数展示了标准流程:加载历史 -> 重建状态 -> 执行指令 -> 保存新事件。
3. API Gateway: 指令端点与 SSE 端点
现在我们来构建 Express 服务器,它将承载我们的 API Gateway。
src/server.js
// src/server.js
import express from 'express';
import cors from 'cors';
import bodyParser from 'body-parser';
import { v4 as uuidv4 } from 'uuid';
import { handleDeploymentCommand } from './deployment-aggregate.js';
import { eventStore } from './event-store.js';
import { setupAuthRoutes } from './auth.js'; // WebAuthn路由,稍后实现
import { authMiddleware } from './auth.js'; // JWT验证中间件
const app = express();
const PORT = process.env.PORT || 4000;
app.use(cors());
app.use(bodyParser.json());
// --- WebAuthn 认证路由 ---
// 这个函数会添加 /auth/register-challenge, /auth/register 等路由
setupAuthRoutes(app);
// --- SSE (Server-Sent Events) ---
let clients = [];
// 事件监听器,当有新事件保存时,广播给所有连接的客户端
eventStore.subscribe((events) => {
console.log(`[SSE] Broadcasting ${events.length} new events to ${clients.length} clients.`);
const eventData = `data: ${JSON.stringify(events)}\n\n`;
clients.forEach(client => client.res.write(eventData));
});
app.get('/events', authMiddleware, (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders(); // 立刻发送头部信息
const clientId = uuidv4();
const newClient = { id: clientId, res };
clients.push(newClient);
console.log(`[SSE] Client ${clientId} connected. Total clients: ${clients.length}`);
// 当客户端关闭连接时,将其从列表中移除
req.on('close', () => {
clients = clients.filter(client => client.id !== clientId);
console.log(`[SSE] Client ${clientId} disconnected. Total clients: ${clients.length}`);
});
});
// --- Command Endpoint ---
app.post('/commands', authMiddleware, (req, res) => {
try {
const command = req.body;
// 在真实项目中,user应该从JWT中解析出来,确保安全
command.user = req.user.username;
console.log(`[Command] Received command: ${command.type} for aggregate ${command.aggregateId}`);
handleDeploymentCommand(command);
res.status(202).json({ message: 'Command accepted.' });
} catch (error) {
console.error(`[Command] Error processing command: ${error.message}`);
// 区分业务错误和系统错误
if (error.message.includes('Concurrency conflict')) {
res.status(409).json({ error: error.message }); // 409 Conflict
} else {
res.status(400).json({ error: error.message }); // 400 Bad Request for business rule violations
}
}
});
app.listen(PORT, () => {
console.log(`Server is running on http://localhost:${PORT}`);
});
这个服务器是整个系统的中枢:
-
/events
端点: 这是 SSE 的核心。它维持一个长连接,并将客户端的response
对象保存在clients
数组中。当eventStore
发出通知时,它会遍历所有客户端并写入格式化的事件数据。一个常见的坑是忘记设置正确的 HTTP Headers,特别是Content-Type: text/event-stream
。 -
/commands
端点: 这是接收指令的入口。它受到authMiddleware
的保护,确保只有经过认证的用户才能提交指令。它将请求体中的指令传递给聚合的命令处理器,并处理可能发生的业务异常或并发冲突。 - 客户端管理: 当前的
clients
数组实现很简单。在生产环境中,如果 API Gateway 是多实例部署的,这里必须使用外部消息中间件(如 Redis Pub/Sub)来广播事件,否则一个实例产生的事件无法通知到连接在其他实例上的客户端。
安全基石:WebAuthn 无密码认证
在高风险操作平台,密码认证的弱点是不可接受的。WebAuthn (FIDO2) 通过公钥加密提供了防钓鱼、防数据泄露的强大认证机制。我们使用 simple-webauthn
库来简化实现。
src/auth.js
// src/auth.js
import {
generateRegistrationOptions,
verifyRegistrationResponse,
generateAuthenticationOptions,
verifyAuthenticationResponse,
} from '@simplewebauthn/server';
import jwt from 'jsonwebtoken';
// 简单内存存储用户和认证器
// 生产环境应使用数据库
const users = {};
const JWT_SECRET = 'a-very-secret-key-for-our-project';
const RP_ID = 'localhost'; // Relying Party ID,必须与前端域名匹配
const RP_NAME = 'Realtime Event Sourcing Demo';
// 模拟数据库操作
const db = {
getUser: (username) => users[username],
saveUser: (user) => { users[user.username] = user; },
getAuthenticators: (username) => users[username]?.authenticators || [],
saveAuthenticator: (username, authenticator) => {
if (!users[username].authenticators) {
users[username].authenticators = [];
}
users[username].authenticators.push(authenticator);
}
};
export function setupAuthRoutes(app) {
// 1. 生成注册选项
app.post('/auth/register-challenge', (req, res) => {
const { username } = req.body;
if (!username || db.getUser(username)) {
return res.status(400).json({ error: 'Username is required or already exists.' });
}
db.saveUser({ username, authenticators: [] });
const options = generateRegistrationOptions({
rpName: RP_NAME,
rpID: RP_ID,
userName: username,
attestationType: 'none',
});
// 挑战需要临时保存,以便后续验证
users[username].currentChallenge = options.challenge;
res.json(options);
});
// 2. 验证注册响应
app.post('/auth/register', async (req, res) => {
const { username, response } = req.body;
const user = db.getUser(username);
if (!user) return res.status(404).send('User not found');
try {
const verification = await verifyRegistrationResponse({
response,
expectedChallenge: user.currentChallenge,
expectedOrigin: 'http://localhost:3000',
expectedRPID: RP_ID,
});
if (verification.verified) {
db.saveAuthenticator(username, verification.registrationInfo);
}
res.json({ verified: verification.verified });
} catch (error) {
res.status(400).json({ error: error.message });
}
});
// 3. 生成认证选项
app.post('/auth/login-challenge', (req, res) => {
const { username } = req.body;
const user = db.getUser(username);
if (!user) return res.status(404).json({ error: 'User not found.' });
const options = generateAuthenticationOptions({
allowCredentials: user.authenticators.map(auth => ({
id: auth.credentialID,
type: 'public-key',
})),
});
user.currentChallenge = options.challenge;
res.json(options);
});
// 4. 验证认证响应
app.post('/auth/login', async (req, res) => {
const { username, response } = req.body;
const user = db.getUser(username);
if (!user) return res.status(404).send('User not found');
const authenticator = user.authenticators.find(
auth => auth.credentialID.toString('base64url') === response.id
);
if (!authenticator) return res.status(400).send('Authenticator not found');
try {
const verification = await verifyAuthenticationResponse({
response,
expectedChallenge: user.currentChallenge,
expectedOrigin: 'http://localhost:3000',
expectedRPID: RP_ID,
authenticator,
});
if (verification.verified) {
const token = jwt.sign({ username }, JWT_SECRET, { expiresIn: '1h' });
res.json({ verified: true, token });
} else {
res.status(401).json({ verified: false });
}
} catch (error) {
res.status(400).json({ error: error.message });
}
});
}
// JWT 验证中间件
export function authMiddleware(req, res, next) {
const authHeader = req.headers.authorization;
if (!authHeader || !authHeader.startsWith('Bearer ')) {
return res.sendStatus(401);
}
const token = authHeader.split(' ')[1];
jwt.verify(token, JWT_SECRET, (err, user) => {
if (err) {
return res.sendStatus(403);
}
req.user = user;
next();
});
}
WebAuthn 的流程虽然多步,但逻辑是固定的:challenge
-> response
-> verification
。成功登录后,我们签发一个标准的 JWT,后续的 /commands
和 /events
端点都将使用这个 JWT 进行身份验证和授权。这是一个常见的错误点:开发者有时会忘记保护 SSE 连接,导致未经授权的用户也能订阅到敏感的事件流。
前端实现:Chakra UI 与实时事件消费
前端使用 React 和 Chakra UI。Chakra UI 提供了高质量的组件和优秀的开发体验,而 React Hooks (useState
, useEffect
, useReducer
) 则是管理和响应事件流的完美工具。
1. 前端状态与事件处理
我们将使用 useReducer
来管理前端的状态。useReducer
非常适合事件驱动的场景,因为 reducer 函数本身就是一个状态转换机,它的逻辑 (state, action) => newState
与我们聚合中的 (state, event) => newState
思想如出一辙。
src/App.js
// src/App.js
import React, { useEffect, useReducer, createContext, useContext } from 'react';
import { ChakraProvider, Box, Heading, VStack } from '@chakra-ui/react';
import { AuthComponent } from './components/AuthComponent';
import { DeploymentDashboard } from './components/DeploymentDashboard';
import { apiClient } from './apiClient';
// 状态管理
const initialState = {
isAuthenticated: false,
user: null,
token: null,
deployments: {}, // 使用对象存储,以ID为键
};
function appReducer(state, action) {
switch (action.type) {
case 'LOGIN_SUCCESS':
return {
...state,
isAuthenticated: true,
user: action.payload.user,
token: action.payload.token,
};
case 'LOGOUT':
return { ...initialState };
case 'EVENTS_RECEIVED':
// 这是一个关键的reducer,用于处理从SSE收到的事件
const newDeployments = { ...state.deployments };
for (const event of action.payload.events) {
const { aggregateId } = event.meta;
const existing = newDeployments[aggregateId] || { id: aggregateId, status: 'Pending', history: [] };
// 根据事件类型更新本地投影
switch (event.type) {
case 'DeploymentStarted':
existing.status = 'InProgress';
existing.serviceName = event.payload.serviceName;
existing.deployedBy = event.payload.user;
break;
case 'DeploymentSucceeded':
existing.status = 'Succeeded';
break;
case 'DeploymentFailed':
existing.status = 'Failed';
existing.reason = event.payload.reason;
break;
default: break;
}
existing.history.push({ status: existing.status, timestamp: event.meta.timestamp });
newDeployments[aggregateId] = existing;
}
return { ...state, deployments: newDeployments };
default:
return state;
}
}
const AppContext = createContext();
export const useAppContext = () => useContext(AppContext);
function App() {
const [state, dispatch] = useReducer(appReducer, initialState);
// 设置API客户端的认证token
useEffect(() => {
apiClient.defaults.headers.common['Authorization'] = state.token ? `Bearer ${state.token}` : '';
}, [state.token]);
// 核心:建立和管理SSE连接
useEffect(() => {
if (!state.isAuthenticated) return;
console.log('[SSE] Attempting to connect...');
const eventSource = new EventSource(`http://localhost:4000/events?token=${state.token}`);
eventSource.onopen = () => {
console.log('[SSE] Connection opened.');
};
eventSource.onmessage = (event) => {
const parsedEvents = JSON.parse(event.data);
console.log('[SSE] Events received:', parsedEvents);
dispatch({ type: 'EVENTS_RECEIVED', payload: { events: parsedEvents } });
};
eventSource.onerror = (err) => {
console.error('[SSE] Connection error:', err);
eventSource.close();
};
// 组件卸载时关闭连接
return () => {
console.log('[SSE] Closing connection.');
eventSource.close();
};
}, [state.isAuthenticated, state.token]);
return (
<ChakraProvider>
<AppContext.Provider value={{ state, dispatch }}>
<Box p={8}>
<VStack spacing={8}>
<Heading>Real-time Deployment Dashboard</Heading>
{!state.isAuthenticated ? (
<AuthComponent />
) : (
<DeploymentDashboard />
)}
</VStack>
</Box>
</AppContext.Provider>
</ChakraProvider>
);
}
export default App;
前端应用的核心逻辑:
- Reducer as Projection:
appReducer
担当了客户端的“投影”(Projection)角色。它接收来自后端的事件,并将它们应用到本地的状态树上,从而构建出一个只读的视图模型 (state.deployments
)。这本质上是 CQRS 模式在前端的体现。 - SSE 连接管理:
useEffect
hook 完美地处理了 SSE 连接的生命周期。它在用户认证后建立连接,并在用户登出或组件卸载时断开连接。一个需要注意的细节是,我们直接将 token 作为查询参数传递,因为EventSource
API 不支持设置自定义 Headers。因此,后端需要调整authMiddleware
来同时从 Header 和查询参数中读取 token。
2. UI 组件:认证与仪表盘
AuthComponent
会使用 @simplewebauthn/browser
库来与浏览器和安全密钥交互,其代码与库的官方示例类似,主要负责调用 API 和处理浏览器的 navigator.credentials
调用。
我们重点看 DeploymentDashboard
,它负责展示状态并发送指令。
src/components/DeploymentDashboard.js
// src/components/DeploymentDashboard.js
import React, { useState } from 'react';
import { useAppContext } from '../App';
import {
Box, Button, VStack, HStack, Text, Input,
Table, Thead, Tbody, Tr, Th, Td, Tag,
useToast
} from '@chakra-ui/react';
import { v4 as uuidv4 } from 'uuid';
import { apiClient } from '../apiClient';
export function DeploymentDashboard() {
const { state, dispatch } = useAppContext();
const [serviceName, setServiceName] = useState('');
const toast = useToast();
const handleStartDeployment = async () => {
if (!serviceName) {
toast({ title: 'Service name is required', status: 'warning', duration: 3000, isClosable: true });
return;
}
const command = {
aggregateId: uuidv4(), // 为新部署生成唯一ID
type: 'StartDeployment',
serviceName: serviceName,
};
try {
await apiClient.post('/commands', command);
toast({ title: 'Deployment command sent', status: 'success', duration: 2000 });
setServiceName('');
} catch (error) {
toast({ title: 'Failed to send command', description: error.response?.data?.error || error.message, status: 'error' });
}
};
// 模拟部署成功或失败
const simulateOutcome = async (aggregateId, success) => {
const command = {
aggregateId,
type: success ? 'SucceedDeployment' : 'FailDeployment',
...(success ? {} : { reason: 'Simulated infrastructure failure.' })
};
try {
await apiClient.post('/commands', command);
} catch (error) {
toast({ title: 'Failed to simulate outcome', description: error.response?.data?.error || error.message, status: 'error' });
}
}
const deployments = Object.values(state.deployments);
return (
<VStack spacing={6} align="stretch" w="100%">
<Box p={5} shadow="md" borderWidth="1px">
<HStack>
<Input
placeholder="Enter service name (e.g., user-service)"
value={serviceName}
onChange={(e) => setServiceName(e.target.value)}
/>
<Button colorScheme="blue" onClick={handleStartDeployment}>Start New Deployment</Button>
<Button onClick={() => dispatch({ type: 'LOGOUT' })}>Logout</Button>
</HStack>
</Box>
<Table variant="simple">
<Thead>
<Tr>
<Th>Service Name</Th>
<Th>Status</Th>
<Th>Deployed By</Th>
<Th>Actions</Th>
</Tr>
</Thead>
<Tbody>
{deployments.map(dep => (
<Tr key={dep.id}>
<Td>{dep.serviceName || 'N/A'}</Td>
<Td>
<Tag colorScheme={
{ InProgress: 'yellow', Succeeded: 'green', Failed: 'red', Pending: 'gray' }[dep.status]
}>{dep.status}</Tag>
</Td>
<Td>{dep.deployedBy}</Td>
<Td>
{dep.status === 'InProgress' && (
<HStack>
<Button size="sm" colorScheme="green" onClick={() => simulateOutcome(dep.id, true)}>Simulate Success</Button>
<Button size="sm" colorScheme="red" onClick={() => simulateOutcome(dep.id, false)}>Simulate Fail</Button>
</HStack>
)}
</Td>
</Tr>
))}
</Tbody>
</Table>
</VStack>
);
}
这个组件是纯粹的声明式 UI。它从 AppContext
中读取 deployments
状态并渲染成表格。用户的操作(如点击按钮)会转化为一个 Command 对象,通过 apiClient
发送到后端的 /commands
端点。它自身不直接修改状态,状态的唯一来源是 SSE 推送的事件,由顶层的 App
组件处理。这就是单向数据流的美妙之处:UI 变得非常可预测。
方案局限性与未来展望
我们构建的这个系统虽然验证了核心架构,但在走向生产环境之前,仍有几个关键问题需要解决:
- 事件存储的持久化: 内存存储仅用于演示。必须替换为真正的持久化方案。使用 PostgreSQL 将事件存储为 JSONB 字段是一种常见且可靠的选择,配合表分区和索引能获得不错的性能。
- 快照(Snapshotting): 对于生命周期很长的聚合,每次都从头加载所有事件会变得非常慢。引入快照机制——定期将聚合的当前状态序列化并存储起来——是必要的性能优化。加载时,只需加载最新的快照和之后发生的事件即可。
- 读模型(Read Model): 当前前端的投影是实时在内存中计算的。对于复杂的查询需求(例如,按用户查询所有部署、统计成功率等),后端需要构建专门的、持久化的读模型。这通常通过一个独立的订阅者服务实现,它监听事件流并将数据写入一个为查询优化的数据库(如 Elasticsearch 或关系型数据库)。
- API Gateway 扩展性: 我们的 Node.js 服务器承载了太多职责。在微服务架构中,WebAuthn 认证、指令处理、SSE 事件推送可能会被拆分成不同的服务。API Gateway 则专注于路由、限流和认证,而后端服务之间通过消息队列(如 Kafka)进行通信,以实现更好的解耦和弹性。
- 事件 Schema 管理: 随着业务发展,事件的结构会发生变化。需要一套完整的 Schema 版本管理和迁移策略(类似 Avro 或 Protobuf),以确保新旧版本的消费者都能正确处理事件。