
关于
Azure Monitor 数据摄入 Python SDK。通过日志摄入 API 向 Log Analytics 工作区发送自定义日志。
name: azure-monitor-ingestion-py description: 适用于 Python 的 Azure Monitor 摄取 SDK。用于通过日志摄取 API 将自定义日志发送到 Log Analytics 工作区。 risk: unknown source: community date_added: '2026-02-27'
适用于 Python 的 Azure Monitor 摄取 SDK
使用日志摄取 API 将自定义日志发送到 Azure Monitor Log Analytics 工作区。
安装
pip install azure-monitor-ingestion
pip install azure-identity
环境变量
# 数据收集端点 (DCE)
AZURE_DCE_ENDPOINT=https://<dce-name>.<region>.ingest.monitor.azure.com
# 数据收集规则 (DCR) 不可变 ID
AZURE_DCR_RULE_ID=dcr-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# DCR 中的流名称
AZURE_DCR_STREAM_NAME=Custom-MyTable_CL
前提条件
使用此 SDK 之前,你需要:
- Log Analytics 工作区 — 日志的目标位置
- 数据收集端点 (DCE) — 摄取端点
- 数据收集规则 (DCR) — 定义模式和目标
- 自定义表 — 在 Log Analytics 中(通过 DCR 或手动创建)
认证
from azure.monitor.ingestion import LogsIngestionClient
from azure.identity import DefaultAzureCredential
import os
client = LogsIngestionClient(
endpoint=os.environ["AZURE_DCE_ENDPOINT"],
credential=DefaultAzureCredential()
)
上传自定义日志
from azure.monitor.ingestion import LogsIngestionClient
from azure.identity import DefaultAzureCredential
import os
client = LogsIngestionClient(
endpoint=os.environ["AZURE_DCE_ENDPOINT"],
credential=DefaultAzureCredential()
)
rule_id = os.environ["AZURE_DCR_RULE_ID"]
stream_name = os.environ["AZURE_DCR_STREAM_NAME"]
logs = [
{"TimeGenerated": "2024-01-15T10:00:00Z", "Computer": "server1", "Message": "Application started"},
{"TimeGenerated": "2024-01-15T10:01:00Z", "Computer": "server1", "Message": "Processing request"},
{"TimeGenerated": "2024-01-15T10:02:00Z", "Computer": "server2", "Message": "Connection established"}
]
client.upload(rule_id=rule_id, stream_name=stream_name, logs=logs)
从 JSON 文件上传
import json
with open("logs.json", "r") as f:
logs = json.load(f)
client.upload(rule_id=rule_id, stream_name=stream_name, logs=logs)
自定义错误处理
使用回调处理部分失败:
failed_logs = []
def on_error(error):
print(f"Upload failed: {error.error}")
failed_logs.extend(error.failed_logs)
client.upload(
rule_id=rule_id,
stream_name=stream_name,
logs=logs,
on_error=on_error
)
# 重试失败的日志
if failed_logs:
print(f"Retrying {len(failed_logs)} failed logs...")
client.upload(rule_id=rule_id, stream_name=stream_name, logs=failed_logs)
忽略错误
def ignore_errors(error):
pass # 静默忽略上传失败
client.upload(
rule_id=rule_id,
stream_name=stream_name,
logs=logs,
on_error=ignore_errors
)
异步客户端
import asyncio
from azure.monitor.ingestion.aio import LogsIngestionClient
from azure.identity.aio import DefaultAzureCredential
async def upload_logs():
async with LogsIngestionClient(
endpoint=endpoint,
credential=DefaultAzureCredential()
) as client:
await client.upload(
rule_id=rule_id,
stream_name=stream_name,
logs=logs
)
asyncio.run(upload_logs())
主权云
from azure.identity import AzureAuthorityHosts, DefaultAzureCredential
from azure.monitor.ingestion import LogsIngestionClient
# Azure 政府云
credential = DefaultAzureCredential(authority=AzureAuthorityHosts.AZURE_GOVERNMENT)
client = LogsIngestionClient(
endpoint="https://example.ingest.monitor.azure.us",
credential=credential,
credential_scopes=["https://monitor.azure.us/.default"]
)
批处理行为
SDK 自动:
- 将日志分割为 1MB 或更小的块
- 使用 gzip 压缩每个块
- 并行上传各块
大型日志集无需手动批处理。
客户端类型
| 客户端 | 用途 |
|--------|---------|
| LogsIngestionClient | 用于上传日志的同步客户端 |
| LogsIngestionClient(aio) | 用于上传日志的异步客户端 |
关键概念
| 概念 | 描述 |
|---------|-------------|
| DCE | 数据收集端点 — 摄取 URL |
| DCR | 数据收集规则 — 定义模式、转换、目标 |
| 流 | DCR 内的命名数据流 |
| 自定义表 | Log Analytics 中的目标表(以 _CL 结尾) |
DCR 流名称格式
流名称遵循以下模式:
Custom-<TableName>_CL— 用于自定义表Microsoft-<TableName>— 用于内置表
最佳实践
- 使用 DefaultAzureCredential 进行认证
- 优雅地处理错误 — 使用
on_error回调处理部分失败 - 包含 TimeGenerated — 所有日志的必填字段
- 匹配 DCR 模式 — 日志字段必须匹配 DCR 配置中定义的列
兼容工具
Claude CodeCursor
标签
后端开发
