
关于
为事件溯源系统设计和实现事件存储。适用于构建事件溯源基础设施、选择事件存储技术或实现事件持久化模式
name: event-store-design description: "设计和实现事件溯源系统的事件存储。用于构建事件溯源基础设施、选择事件存储技术或实现事件持久化模式。" risk: unknown source: community date_added: "2026-02-27"
事件存储设计
事件溯源应用程序事件存储设计综合指南。
不要在以下情况使用此技能
- 任务与事件存储设计无关
- 你需要此范围之外的不同领域或工具
说明
- 明确目标、约束和所需输入。
- 应用相关最佳实践并验证结果。
- 提供可操作的步骤和验证方法。
- 如需详细示例,请打开
resources/implementation-playbook.md。
在以下情况使用此技能
- 设计事件溯源基础设施
- 在事件存储技术之间进行选择
- 实现自定义事件存储
- 优化事件存储和检索
- 设置事件存储 Schema
- 规划事件存储扩展
核心概念
1. 事件存储架构
┌─────────────────────────────────────────────────────┐
│ Event Store │
├─────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Stream 1 │ │ Stream 2 │ │ Stream 3 │ │
│ │ (Aggregate) │ │ (Aggregate) │ │ (Aggregate) │ │
│ ├─────────────┤ ├─────────────┤ ├─────────────┤ │
│ │ Event 1 │ │ Event 1 │ │ Event 1 │ │
│ │ Event 2 │ │ Event 2 │ │ Event 2 │ │
│ │ Event 3 │ │ ... │ │ Event 3 │ │
│ │ ... │ │ │ │ Event 4 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────┤
│ Global Position: 1 → 2 → 3 → 4 → 5 → 6 → ... │
└─────────────────────────────────────────────────────┘
2. 事件存储要求
| 要求 | 描述 | | --- | --- | | 仅追加 | 事件不可变,只能追加 | | 有序 | 流内和全局排序 | | 版本化 | 乐观并发控制 | | 订阅 | 实时事件通知 | | 幂等 | 安全处理重复写入 |
技术对比
| 技术 | 最适合 | 局限性 | | --- | --- | --- | | EventStoreDB | 纯事件溯源 | 单一用途 | | PostgreSQL | 现有 Postgres 技术栈 | 需手动实现 | | Kafka | 高吞吐量流处理 | 不适合按流查询 | | DynamoDB | Serverless,AWS 原生 | 查询限制 | | Marten | .NET 生态系统 | 仅限 .NET |
模板
模板 1:PostgreSQL 事件存储 Schema
-- Events table
CREATE TABLE events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
stream_id VARCHAR(255) NOT NULL,
stream_type VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB DEFAULT '{}',
version BIGINT NOT NULL,
global_position BIGSERIAL,
created_at TIMESTAMPTZ DEFAULT NOW(),
CONSTRAINT unique_stream_version UNIQUE (stream_id, version)
);
-- Index for stream queries
CREATE INDEX idx_events_stream_id ON events(stream_id, version);
-- Index for global subscription
CREATE INDEX idx_events_global_position ON events(global_position);
-- Index for event type queries
CREATE INDEX idx_events_event_type ON events(event_type);
-- Index for time-based queries
CREATE INDEX idx_events_created_at ON events(created_at);
-- Snapshots table
CREATE TABLE snapshots (
stream_id VARCHAR(255) PRIMARY KEY,
stream_type VARCHAR(255) NOT NULL,
snapshot_data JSONB NOT NULL,
version BIGINT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Subscriptions checkpoint table
CREATE TABLE subscription_checkpoints (
subscription_id VARCHAR(255) PRIMARY KEY,
last_position BIGINT NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ DEFAULT NOW()
);
模板 2:Python 事件存储实现
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional, List
from uuid import UUID, uuid4
import json
import asyncpg
@dataclass
class Event:
stream_id: str
event_type: str
data: dict
metadata: dict = field(default_factory=dict)
event_id: UUID = field(default_factory=uuid4)
version: Optional[int] = None
global_position: Optional[int] = None
created_at: datetime = field(default_factory=datetime.utcnow)
class EventStore:
def __init__(self, pool: asyncpg.Pool):
self.pool = pool
async def append_events(
s
兼容工具
Claude CodeCursor
标签
通用