
关于
适用于 Python 流处理的 Azure Event Hubs SDK。用于高吞吐量事件摄取、生产者、消费者和检查点。
name: azure-eventhub-py description: Azure Event Hubs Python SDK 流处理。用于高吞吐量事件摄取、生产者、消费者和检查点。 risk: unknown source: community date_added: '2026-02-27'
Azure Event Hubs Python SDK
用于高吞吐量事件摄取的大数据流平台。
安装
pip install azure-eventhub azure-identity
# 用于blob存储检查点
pip install azure-eventhub-checkpointstoreblob-aio
环境变量
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net
EVENT_HUB_NAME=my-eventhub
STORAGE_ACCOUNT_URL=https://<account>.blob.core.windows.net
CHECKPOINT_CONTAINER=checkpoints
认证
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient
credential = DefaultAzureCredential()
namespace = "<namespace>.servicebus.windows.net"
eventhub_name = "my-eventhub"
# 生产者
producer = EventHubProducerClient(
fully_qualified_namespace=namespace,
eventhub_name=eventhub_name,
credential=credential
)
# 消费者
consumer = EventHubConsumerClient(
fully_qualified_namespace=namespace,
eventhub_name=eventhub_name,
consumer_group="$Default",
credential=credential
)
客户端类型
| 客户端 | 用途 |
|--------|---------|
| EventHubProducerClient | 向Event Hub发送事件 |
| EventHubConsumerClient | 从Event Hub接收事件 |
| BlobCheckpointStore | 跟踪消费者进度 |
发送事件
from azure.eventhub import EventHubProducerClient, EventData
from azure.identity import DefaultAzureCredential
producer = EventHubProducerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
credential=DefaultAzureCredential()
)
with producer:
# 创建批次(处理大小限制)
event_data_batch = producer.create_batch()
for i in range(10):
try:
event_data_batch.add(EventData(f"Event {i}"))
except ValueError:
# 批次已满,发送并创建新批次
producer.send_batch(event_data_batch)
event_data_batch = producer.create_batch()
event_data_batch.add(EventData(f"Event {i}"))
# 发送剩余事件
producer.send_batch(event_data_batch)
发送到特定分区
# 按分区ID
event_data_batch = producer.create_batch(partition_id="0")
# 按分区键(一致性哈希)
event_data_batch = producer.create_batch(partition_key="user-123")
接收事件
简单接收
from azure.eventhub import EventHubConsumerClient
def on_event(partition_context, event):
print(f"Partition: {partition_context.partition_id}")
print(f"Data: {event.body_as_str()}")
partition_context.update_checkpoint(event)
consumer = EventHubConsumerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
consumer_group="$Default",
credential=DefaultAzureCredential()
)
with consumer:
consumer.receive(
on_event=on_event,
starting_position="-1", # 流的起始位置
)
使用Blob检查点存储(生产环境)
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
from azure.identity import DefaultAzureCredential
checkpoint_store = BlobCheckpointStore(
blob_account_url="https://<account>.blob.core.windows.net",
container_name="checkpoints",
credential=DefaultAzureCredential()
)
consumer = EventHubConsumerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
consumer_group="$Default",
credential=DefaultAzureCredential(),
checkpoint_store=checkpoint_store
)
def on_event(partition_context, event):
print(f"Received: {event.body_as_str()}")
# 处理后设置检查点
partition_context.update_checkpoint(event)
with consumer:
consumer.receive(on_event=on_event)
异步客户端
from azure.eventhub.aio import EventHubProducerClient, EventHubConsumerClient
from azure.identity.aio import DefaultAzureCredential
import asyncio
async def send_events():
credential = DefaultAzureCredential()
async with EventHubProducerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
credential=credential
) as producer:
batch = await producer.create_batch()
batch.add(EventData("Async event"))
await producer.send_batch(batch)
async def receive_events():
async def on_event(partition_context, event):
print(event.body_as_str())
await partition_context.update_checkpoint(event)
async with EventHubConsumerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
consumer_group="$Default",
credential=DefaultAzureCredential()
) as consumer:
await consumer.receive(on_event=on_event)
最佳实践
- 生产环境始终使用
BlobCheckpointStore进行检查点管理 - 使用批量发送以获得最佳吞吐量
- 为长时间运行的消费者实现优雅关闭
- 监控分区所有权以实现负载均衡
- 对瞬态故障使用重试策略
兼容工具
Claude CodeCursor
标签
AI与机器学习