
关于
管理分布式事务和长时间运行业务流程的模式。
name: saga-orchestration description: "管理分布式事务和长时间运行业务流程的模式。" risk: unknown source: community date_added: "2026-02-27"
Saga 编排
管理分布式事务和长时间运行业务流程的模式。
不适用场景
- 任务与 Saga 编排无关
- 需要此范围之外的其他领域或工具
使用说明
- 明确目标、约束条件和所需输入。
- 应用相关最佳实践并验证结果。
- 提供可操作的步骤和验证方法。
- 如需详细示例,请打开
resources/implementation-playbook.md。
适用场景
- 协调多服务事务
- 实现补偿事务
- 管理长时间运行的业务工作流
- 处理分布式系统中的故障
- 构建订单履行流程
- 实现审批工作流
核心概念
1. Saga 类型
Choreography(编排式) Orchestration(协调式)
┌─────┐ ┌─────┐ ┌─────┐ ┌─────────────┐
│Svc A│─►│Svc B│─►│Svc C│ │ Orchestrator│
└─────┘ └─────┘ └─────┘ └──────┬──────┘
│ │ │ │
▼ ▼ ▼ ┌─────┼─────┐
Event Event Event ▼ ▼ ▼
┌────┐┌────┐┌────┐
│Svc1││Svc2││Svc3│
└────┘└────┘└────┘
2. Saga 执行状态
| 状态 | 描述 | | ---------------- | ------------------------------ | | Started | Saga 已启动 | | Pending | 等待步骤完成 | | Compensating | 因故障正在回滚 | | Completed | 所有步骤成功完成 | | Failed | 补偿后 Saga 失败 |
模板
模板 1:Saga 编排器基类
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Dict, Any, Optional
from datetime import datetime
import uuid
class SagaState(Enum):
STARTED = "started"
PENDING = "pending"
COMPENSATING = "compensating"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class SagaStep:
name: str
action: str
compensation: str
status: str = "pending"
result: Optional[Dict] = None
error: Optional[str] = None
executed_at: Optional[datetime] = None
compensated_at: Optional[datetime] = None
@dataclass
class Saga:
saga_id: str
saga_type: str
state: SagaState
data: Dict[str, Any]
steps: List[SagaStep]
current_step: int = 0
created_at: datetime = field(default_factory=datetime.utcnow)
updated_at: datetime = field(default_factory=datetime.utcnow)
class SagaOrchestrator(ABC):
"""Base class for saga orchestrators."""
def __init__(self, saga_store, event_publisher):
self.saga_store = saga_store
self.event_publisher = event_publisher
@abstractmethod
def define_steps(self, data: Dict) -> List[SagaStep]:
"""Define the saga steps."""
pass
@property
@abstractmethod
def saga_type(self) -> str:
"""Unique saga type identifier."""
pass
async def start(self, data: Dict) -> Saga:
"""Start a new saga."""
saga = Saga(
saga_id=str(uuid.uuid4()),
saga_type=self.saga_type,
state=SagaState.STARTED,
data=data,
steps=self.define_steps(data)
)
await self.saga_store.save(saga)
await self._execute_next_step(saga)
return saga
async def handle_step_completed(self, saga_id: str, step_name: str, result: Dict):
"""Handle successful step completion."""
saga = await self.saga_store.get(saga_id)
# Update step
for step in saga.steps:
if step.name == step_name:
step.status = "completed"
step.result = result
step.executed_at = datetime.utcnow()
break
saga.current_step += 1
saga.updated_at = datetime.utcnow()
# Check if saga is complete
if saga.current_step >= len(saga.steps):
saga.state = SagaState.COMPLETED
await self.saga_store.save(saga)
await self._on_saga_completed(saga)
else:
saga.state = SagaState.PENDING
await self.saga_store.save(saga)
await self._execute_next_step(saga)
async def handle_step_failed(self, saga_id: str, step_name: str, error: str):
"""Handle step failure - start compensation."""
saga = await self.saga_store.get(saga_id)
# Mark step as failed
for step in saga.steps:
if step.name == step_name:
step.status = "failed"
step.error = error
break
saga.state = SagaState.COMPENSATING
兼容工具
Claude CodeCursor
标签
后端开发
