
关于
高吞吐量事件流和实时数据摄取,使用 Azure Event Hub TypeScript SDK。
name: azure-eventhub-ts description: "高吞吐量事件流和实时数据摄取。" risk: unknown source: community date_added: "2026-02-27"
Azure Event Hubs TypeScript SDK
高吞吐量事件流和实时数据摄取。
安装
npm install @azure/event-hubs @azure/identity
用于消费者组的检查点:
npm install @azure/eventhubs-checkpointstore-blob @azure/storage-blob
环境变量
EVENTHUB_NAMESPACE=<namespace>.servicebus.windows.net
EVENTHUB_NAME=my-eventhub
STORAGE_ACCOUNT_NAME=<storage-account>
STORAGE_CONTAINER_NAME=checkpoints
认证
import { EventHubProducerClient, EventHubConsumerClient } from "@azure/event-hubs";
import { DefaultAzureCredential } from "@azure/identity";
const fullyQualifiedNamespace = process.env.EVENTHUB_NAMESPACE!;
const eventHubName = process.env.EVENTHUB_NAME!;
const credential = new DefaultAzureCredential();
// 生产者
const producer = new EventHubProducerClient(fullyQualifiedNamespace, eventHubName, credential);
// 消费者
const consumer = new EventHubConsumerClient(
"$Default", // 消费者组
fullyQualifiedNamespace,
eventHubName,
credential
);
核心工作流
发送事件
const producer = new EventHubProducerClient(namespace, eventHubName, credential);
// 创建批次并添加事件
const batch = await producer.createBatch();
batch.tryAdd({ body: { temperature: 72.5, deviceId: "sensor-1" } });
batch.tryAdd({ body: { temperature: 68.2, deviceId: "sensor-2" } });
await producer.sendBatch(batch);
await producer.close();
发送到特定分区
// 按分区 ID
const batch = await producer.createBatch({ partitionId: "0" });
// 按分区键(一致性哈希)
const batch = await producer.createBatch({ partitionKey: "device-123" });
接收事件(简单模式)
const consumer = new EventHubConsumerClient("$Default", namespace, eventHubName, credential);
const subscription = consumer.subscribe({
processEvents: async (events, context) => {
for (const event of events) {
console.log(`Partition: ${context.partitionId}, Body: ${JSON.stringify(event.body)}`);
}
},
processError: async (err, context) => {
console.error(`Error on partition ${context.partitionId}: ${err.message}`);
},
});
// 一段时间后停止
setTimeout(async () => {
await subscription.close();
await consumer.close();
}, 60000);
带检查点的接收(生产环境)
import { EventHubConsumerClient } from "@azure/event-hubs";
import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
const containerClient = new ContainerClient(
`https://${storageAccount}.blob.core.windows.net/${containerName}`,
credential
);
const checkpointStore = new BlobCheckpointStore(containerClient);
const consumer = new EventHubConsumerClient(
"$Default",
namespace,
eventHubName,
credential,
checkpointStore
);
const subscription = consumer.subscribe({
processEvents: async (events, context) => {
for (const event of events) {
console.log(`Processing: ${JSON.stringify(event.body)}`);
}
// 处理批次后设置检查点
if (events.length > 0) {
await context.updateCheckpoint(events[events.length - 1]);
}
},
processError: async (err, context) => {
console.error(`Error: ${err.message}`);
},
});
从特定位置接收
const subscription = consumer.subscribe({
processEvents: async (events, context) => { /* ... */ },
processError: async (err, context) => { /* ... */ },
}, {
startPosition: {
// 从头开始
"0": { offset: "@earliest" },
// 从末尾开始(仅新事件)
"1": { offset: "@latest" },
// 从特定偏移量开始
"2": { offset: "12345" },
// 从特定时间开始
"3": { enqueuedOn: new Date("2024-01-01") },
},
});
Event Hub 属性
// 获取 Hub 信息
const hubProperties = await producer.getEventHubProperties();
console.log(`Partitions: ${hubProperties.partitionIds}`);
// 获取分区信息
const partitionProperties = await producer.getPartitionProperties("0");
console.log(`Last sequence: ${partitionProperties.lastEnqueuedSequenceNumber}`);
批处理选项
const subscription = consumer.subscribe(
{
processEvents: async (events, context) => { /* ... */ },
processError: async (err, context) => { /* ... */ },
},
{
maxBatchSize: 100, // 每批最大事件数
maxWaitTimeInSeconds: 30, // 批次最大等待时间
}
);
兼容工具
Claude CodeCursor
标签
AI与机器学习