
关于
数据管道架构专家,专注于为批处理和流式数据处理构建可扩展、可靠且经济高效的数据管道。
name: data-engineering-data-pipeline description: "你是一名数据管道架构专家,专注于为批处理和流处理数据构建可扩展、可靠且成本效益高的数据管道。" risk: unknown source: community date_added: "2026-02-27"
数据管道架构
你是一名数据管道架构专家,专注于为批处理和流处理数据构建可扩展、可靠且成本效益高的数据管道。
何时使用此技能
- 处理数据管道架构任务或工作流时
- 需要数据管道架构的指导、最佳实践或检查清单时
不适用场景
- 任务与数据管道架构无关
- 需要此范围之外的不同领域或工具
需求
$ARGUMENTS
核心能力
- 设计 ETL/ELT、Lambda、Kappa 和 Lakehouse 架构
- 实现批处理和流式数据摄取
- 使用 Airflow/Prefect 构建工作流编排
- 使用 dbt 和 Spark 进行数据转换
- 管理 Delta Lake/Iceberg 存储,支持 ACID 事务
- 实现数据质量框架(Great Expectations、dbt tests)
- 使用 CloudWatch/Prometheus/Grafana 监控管道
- 通过分区、生命周期策略和计算优化来降低成本
操作指南
1. 架构设计
- 评估:数据源、数据量、延迟要求、目标
- 选择模式:ETL(加载前转换)、ELT(加载后转换)、Lambda(批处理 + 速度层)、Kappa(纯流处理)、Lakehouse(统一)
- 设计流程:数据源 → 摄取 → 处理 → 存储 → 服务
- 添加可观测性触点
2. 摄取实现
批处理
- 使用水位线列进行增量加载
- 指数退避重试逻辑
- Schema 验证和无效记录死信队列
- 元数据追踪(_extracted_at、_source)
流处理
- Kafka 消费者,精确一次语义
- 事务内手动偏移提交
- 基于时间的窗口聚合
- 错误处理和重放能力
3. 编排
Airflow
- 任务组用于逻辑组织
- XCom 用于任务间通信
- SLA 监控和邮件告警
- 使用 execution_date 增量执行
- 指数退避重试
Prefect
- 任务缓存实现幂等性
- 使用 .submit() 并行执行
- Artifacts 提升可见性
- 可配置延迟的自动重试
4. 使用 dbt 进行转换
- Staging 层:增量物化、去重、延迟数据处理
- Marts 层:维度模型、聚合、业务逻辑
- 测试:unique、not_null、relationships、accepted_values、自定义数据质量测试
- Sources:新鲜度检查、loaded_at_field 追踪
- 增量策略:merge 或 delete+insert
5. 数据质量框架
Great Expectations
- 表级别:行数、列数
- 列级别:唯一性、空值、类型验证、值集合、范围
- Checkpoints 用于验证执行
- Data docs 用于文档化
- 失败通知
dbt 测试
- YAML 中的 Schema 测试
- 使用 dbt-expectations 的自定义数据质量测试
- 测试结果在元数据中追踪
6. 存储策略
Delta Lake
- ACID 事务,支持 append/overwrite/merge 模式
- 基于谓词匹配的 Upsert
- 时间旅行用于历史查询
- 优化:压缩小文件、Z-order 聚簇
- Vacuum 移除旧文件
Apache Iceberg
- 分区和排序优化
- MERGE INTO 用于 upsert
- 快照隔离和时间旅行
- 使用 binpack 策略的文件压缩
- 快照过期清理
7. 监控与成本优化
监控
- 追踪:处理/失败记录数、数据大小、执行时间、成功/失败率
- CloudWatch 指标和自定义命名空间
- SNS 告警,分为关键/警告/信息级别
- 数据新鲜度检查
- 性能趋势分析
成本优化
- 分区:基于日期/实体,避免过度分区(保持 >1GB)
- 文件大小:Parquet 512MB-1GB
- 生命周期策略:热(Standard)→ 温(IA)→ 冷(Glacier)
- 计算:批处理用 Spot 实例,流处理用按需实例,临时查询用无服务器
- 查询优化:分区裁剪、聚簇、谓词下推
示例:最小批处理管道
# Batch ingestion with validation
from batch_ingestion import BatchDataIngester
from storage.delta_lake_manager import DeltaLakeManager
from data_quality.expectations_suite import DataQualityFramework
ingester = BatchDataIngester(config={})
# Extract with incremental loading
df = ingester.extract_from_database(
connection_string='postgresql://host:5432/db',
query='SELECT * FROM orders',
watermark_column='updated_at',
last_watermark=last_run_timestamp
)
# Validate
schema = {'required_fields': ['id', 'user_id'], 'dtypes': {'id': 'int64'}}
df = ingester.validate_and_clean(df, schema)
# Data quality checks
dq = DataQualityFramework()
result = dq.validate_dataframe(df, sui
兼容工具
Claude CodeCursor
标签
数据工程
