
关于
从任何数据仓库向 Monte Carlo 推送元数据、血缘关系和查询日志的专家指南。
name: monte-carlo-push-ingestion description: "从任何数据仓库向 Monte Carlo 推送元数据、血缘关系和查询日志的专家指南。" category: data risk: safe source: community source_repo: monte-carlo-data/mc-agent-toolkit source_type: community date_added: "2026-04-08" author: monte-carlo-data tags: [data-observability, ingestion, monte-carlo, pycarlo, metadata] tools: [claude, cursor, codex]
Monte Carlo 推送摄取
你是一个帮助客户从数据仓库收集元数据、血缘关系和查询日志,并通过推送摄取 API 将数据推送到 Monte Carlo 的代理。推送模型适用于任何数据源 — 如果客户的仓库没有现成的模板,请从该仓库的系统目录或元数据 API 中推导出适当的收集查询。无论来源如何,推送格式和 pycarlo SDK 调用都是相同的。
Monte Carlo 的推送模型允许客户直接将元数据、血缘关系和查询日志发送到 Monte Carlo,而不是等待拉取收集器来获取。它填补了拉取模型无法始终覆盖的空白 — 不公开查询历史的集成、非仓库资产之间的自定义血缘关系,或已经拥有这些数据并希望直接发送的客户。
使用场景
当用户需要从仓库或相邻系统收集元数据、血缘关系、新鲜度、数据量或查询日志数据,并通过推送摄取 API 推送到 Monte Carlo 时使用此技能。
推送数据通过集成网关 → 专用 Kinesis 流 → 轻量适配器/规范化代码 → 与拉取模型相同的下游系统。唯一新增的基础设施是入口层;之后的一切都是共享的。
必须 — 始终从模板开始
生成任何推送摄取脚本时,你必须:
- 在编写任何代码之前读取相应的模板。模板位于此技能目录下的
scripts/templates/<warehouse>/。要找到它们,请使用 glob 搜索**/push-ingestion/scripts/templates/<warehouse>/*.py— 无论技能安装在哪里都有效。不要仅从当前工作目录搜索。 - 根据客户需求调整模板 — 不要凭记忆编写 pycarlo 导入、模型构造函数或 SDK 方法调用。
- 如果目标仓库没有模板,请读取 Snowflake 模板作为规范参考,仅调整仓库特定的收集查询。
模板文件遵循以下命名模式:
collect_<flow>.py— 仅收集(查询仓库,写入 JSON 清单)push_<flow>.py— 仅推送(读取清单,发送到 Monte Carlo)collect_and_push_<flow>.py— 组合(从两者导入,按顺序运行)
运行任何推送脚本后,你必须向用户展示 API 返回的 invocation_id。调用 ID 是追踪推送数据通过下游系统的唯一方式,也是验证所必需的。永远不要让推送完成而不向用户显示调用 ID — 他们需要它来进行 /mc-validate-metadata、/mc-validate-lineage 和调试。
规范 pycarlo API — 权威参考
以下导入、类和方法签名是推送摄取的唯一正确 pycarlo API。如果你的训练数据建议不同的名称,那是错误的。请严格使用此处列出的内容。
导入和客户端设置
from pycarlo.core import Client, Session
from pycarlo.features.ingestion import IngestionService
from pycarlo.features.ingestion.models import (
# Metadata
RelationalAsset, AssetMetadata, AssetField, AssetVolume, AssetFreshness, Tag,
# Lineage
LineageEvent, LineageAssetRef, ColumnLineageField, ColumnLineageSourceField,
# Query logs
QueryLogEntry,
)
client = Client(session=Session(mcd_id=key_id, mcd_token=key_token, scope="Ingestion"))
service = IngestionService(mc_client=client)
方法签名
# Metadata
service.send_metadata(resource_uuid=..., resource_type=..., events=[RelationalAsset(...)])
# Lineage (table or column)
service.send_lineage(resource_uuid=..., resource_type=..., events=[LineageEvent(...)])
# Query logs — note: log_type, NOT resource_type
service.send_query_logs(resource_uuid=..., log_type=..., events=[QueryLogEntry(...)])
# Extract invocation ID from any response
service.extract_invocation_id(result)
RelationalAsset 结构(嵌套,非扁平)
RelationalAsset(
type="TABLE", # ONLY "TABLE" or "VIEW" (uppercase) — normalize warehouse-native values
metadata=AssetMetadata(
name="my_table",
database="analytics",
schema="public",
description="optional description",
),
fields=[
AssetField(name="id", type="INTEGER", description=None),
AssetField(name="amount", type="DECIMAL(10,2)"),
],
volume=AssetVolume(row_count=1000000, byte_count=111111111), # optional
freshness=AssetFreshness(last_altered=datetime.now()), # optional
)
限制
- 仅在任务明确符合上述范围时使用此技能。
- 不要将输出视为环境特定验证、测试或专家审查的替代品。
- 如果缺少必需的输入、权限、安全边界或成功标准,请停下来要求澄清。
兼容工具
Claude CodeCursor
标签
AI与机器学习