
关于
适用于 Python 消息传递的 Azure Service Bus SDK。用于队列、主题、订阅和企业消息模式。
name: azure-servicebus-py description: Azure Service Bus Python SDK 消息传递。用于队列、主题、订阅和企业消息传递模式。 risk: unknown source: community date_added: '2026-02-27'
Azure Service Bus Python SDK
用于可靠云通信的企业消息传递,支持队列和发布/订阅主题。
安装
pip install azure-servicebus azure-identity
环境变量
SERVICEBUS_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net
SERVICEBUS_QUEUE_NAME=myqueue
SERVICEBUS_TOPIC_NAME=mytopic
SERVICEBUS_SUBSCRIPTION_NAME=mysubscription
认证
from azure.identity import DefaultAzureCredential
from azure.servicebus import ServiceBusClient
credential = DefaultAzureCredential()
namespace = "<namespace>.servicebus.windows.net"
client = ServiceBusClient(
fully_qualified_namespace=namespace,
credential=credential
)
客户端类型
| 客户端 | 用途 | 获取方式 |
|--------|------|----------|
| ServiceBusClient | 连接管理 | 直接实例化 |
| ServiceBusSender | 发送消息 | client.get_queue_sender() / get_topic_sender() |
| ServiceBusReceiver | 接收消息 | client.get_queue_receiver() / get_subscription_receiver() |
发送消息(异步)
import asyncio
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage
from azure.identity.aio import DefaultAzureCredential
async def send_messages():
credential = DefaultAzureCredential()
async with ServiceBusClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
credential=credential
) as client:
sender = client.get_queue_sender(queue_name="myqueue")
async with sender:
# Single message
message = ServiceBusMessage("Hello, Service Bus!")
await sender.send_messages(message)
# Batch of messages
messages = [ServiceBusMessage(f"Message {i}") for i in range(10)]
await sender.send_messages(messages)
# Message batch (for size control)
batch = await sender.create_message_batch()
for i in range(100):
try:
batch.add_message(ServiceBusMessage(f"Batch message {i}"))
except ValueError: # Batch full
await sender.send_messages(batch)
batch = await sender.create_message_batch()
batch.add_message(ServiceBusMessage(f"Batch message {i}"))
await sender.send_messages(batch)
asyncio.run(send_messages())
接收消息(异步)
async def receive_messages():
credential = DefaultAzureCredential()
async with ServiceBusClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
credential=credential
) as client:
receiver = client.get_queue_receiver(queue_name="myqueue")
async with receiver:
# Receive batch
messages = await receiver.receive_messages(
max_message_count=10,
max_wait_time=5 # seconds
)
for msg in messages:
print(f"Received: {str(msg)}")
await receiver.complete_message(msg) # Remove from queue
asyncio.run(receive_messages())
接收模式
| 模式 | 行为 | 使用场景 |
|------|------|----------|
| PEEK_LOCK(默认) | 消息被锁定,必须完成/放弃 | 可靠处理 |
| RECEIVE_AND_DELETE | 接收时立即删除 | 最多一次投递 |
from azure.servicebus import ServiceBusReceiveMode
receiver = client.get_queue_receiver(
queue_name="myqueue",
receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE
)
消息处置
async with receiver:
messages = await receiver.receive_messages(max_message_count=1)
for msg in messages:
try:
# Process message...
await receiver.complete_message(msg) # Success - remove from queue
except ProcessingError:
await receiver.abandon_message(msg) # Retry later
except PermanentError:
await receiver.dead_letter_message(
msg,
reason="ProcessingFailed",
error_description="Could not process"
)
| 操作 | 效果 |
|------|------|
| complete_message() | 从队列中移除(成功) |
| abandon_message() | 释放锁定,立即重试 |
| dead_letter_message() | 移至死信队列 |
| defer_message() | 搁置,通过序列号接收 |
主题和订阅
# Send to topic
sender = client.get_topic_sender(topic_name="mytopic")
async with sender:
await sender.send_messages(ServiceBusMessage("Topic message"))
# Receive from subscripti
兼容工具
Claude CodeCursor
标签
运维部署

