
关于
Azure Event Hubs .NET SDK。
name: azure-eventhub-dotnet description: Azure Event Hubs SDK for .NET。 risk: unknown source: community date_added: '2026-02-27'
Azure.Messaging.EventHubs (.NET)
用于通过 Azure Event Hubs 发送和接收事件的高吞吐量事件流 SDK。
安装
# Core package (sending and simple receiving)
dotnet add package Azure.Messaging.EventHubs
# Processor package (production receiving with checkpointing)
dotnet add package Azure.Messaging.EventHubs.Processor
# Authentication
dotnet add package Azure.Identity
# For checkpointing (required by EventProcessorClient)
dotnet add package Azure.Storage.Blobs
当前版本: Azure.Messaging.EventHubs v5.12.2, Azure.Messaging.EventHubs.Processor v5.12.2
环境变量
EVENTHUB_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net
EVENTHUB_NAME=<event-hub-name>
# For checkpointing (EventProcessorClient)
BLOB_STORAGE_CONNECTION_STRING=<storage-connection-string>
BLOB_CONTAINER_NAME=<checkpoint-container>
# Alternative: Connection string auth (not recommended for production)
EVENTHUB_CONNECTION_STRING=Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=...
身份验证
using Azure.Identity;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
// Always use DefaultAzureCredential for production
var credential = new DefaultAzureCredential();
var fullyQualifiedNamespace = Environment.GetEnvironmentVariable("EVENTHUB_FULLY_QUALIFIED_NAMESPACE");
var eventHubName = Environment.GetEnvironmentVariable("EVENTHUB_NAME");
var producer = new EventHubProducerClient(
fullyQualifiedNamespace,
eventHubName,
credential);
所需 RBAC 角色:
- 发送: Azure Event Hubs Data Sender
- 接收: Azure Event Hubs Data Receiver
- 两者: Azure Event Hubs Data Owner
客户端类型
| 客户端 | 用途 | 何时使用 | |--------|---------|-------------| | EventHubProducerClient | 以批次方式立即发送事件 | 实时发送,完全控制批处理 | | EventHubBufferedProducerClient | 自动批处理与后台发送 | 高容量、即发即忘场景 | | EventHubConsumerClient | 简单事件读取 | 仅用于原型开发,不适用于生产 | | EventProcessorClient | 生产级事件处理 | 生产环境接收始终使用此客户端 |
核心工作流
1. 发送事件(批次)
using Azure.Identity;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
await using var producer = new EventHubProducerClient(
fullyQualifiedNamespace,
eventHubName,
new DefaultAzureCredential());
// Create a batch (respects size limits automatically)
using EventDataBatch batch = await producer.CreateBatchAsync();
// Add events to batch
var events = new[]
{
new EventData(BinaryData.FromString("{\"id\": 1, \"message\": \"Hello\"}")),
new EventData(BinaryData.FromString("{\"id\": 2, \"message\": \"World\"}"))
};
foreach (var eventData in events)
{
if (!batch.TryAdd(eventData))
{
// Batch is full - send it and create a new one
await producer.SendAsync(batch);
batch = await producer.CreateBatchAsync();
if (!batch.TryAdd(eventData))
{
throw new Exception("Event too large for empty batch");
}
}
}
// Send remaining events
if (batch.Count > 0)
{
await producer.SendAsync(batch);
}
2. 发送事件(缓冲 - 高容量)
using Azure.Messaging.EventHubs.Producer;
var options = new EventHubBufferedProducerClientOptions
{
MaximumWaitTime = TimeSpan.FromSeconds(1)
};
await using var producer = new EventHubBufferedProducerClient(
fullyQualifiedNamespace,
eventHubName,
new DefaultAzureCredential(),
options);
// Handle send success/failure
producer.SendEventBatchSucceededAsync += args =>
{
Console.WriteLine($"Batch sent: {args.EventBatch.Count} events");
return Task.CompletedTask;
};
producer.SendEventBatchFailedAsync += args =>
{
Console.WriteLine($"Batch failed: {args.Exception.Message}");
return Task.CompletedTask;
};
// Enqueue events (sent automatically in background)
for (int i = 0; i < 1000; i++)
{
await producer.EnqueueEventAsync(new EventData($"Event {i}"));
}
// Flush remaining events before disposing
await producer.FlushAsync();
3. 接收事件(生产级 - EventProcessorClient)
using Azure.Identity;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;
// Blob container for checkpointing
var blobClient = new BlobContainerClient(
Environment.GetEnvironmentVariable("BLOB_STORAGE_CONNECTION_STRING"),
Environment.GetEnvironmentVariable("BLOB_CONTAINER_NAME"));
await blobClient.CreateIfNotExistsAsync();
// Create processor
var processor = new EventProcessorClient(
blobClient,
EventHubConsumerClient.DefaultConsumerGroupName,
fullyQualifiedNamespace,
eventHubName,
new DefaultAzureCredential());
兼容工具
Claude CodeCursor
标签
后端开发
