
关于
Azure Queue Storage Python SDK。用于可靠的消息队列、任务分发和异步处理。
name: azure-storage-queue-py description: Azure Queue Storage Python SDK。用于可靠的消息队列、任务分发和异步处理。 risk: unknown source: community date_added: '2026-02-27'
Azure Queue Storage Python SDK
简单、经济高效的异步通信消息队列。
安装
pip install azure-storage-queue azure-identity
环境变量
AZURE_STORAGE_ACCOUNT_URL=https://<account>.queue.core.windows.net
认证
from azure.identity import DefaultAzureCredential
from azure.storage.queue import QueueServiceClient, QueueClient
credential = DefaultAzureCredential()
account_url = "https://<account>.queue.core.windows.net"
# 服务客户端
service_client = QueueServiceClient(account_url=account_url, credential=credential)
# 队列客户端
queue_client = QueueClient(account_url=account_url, queue_name="myqueue", credential=credential)
队列操作
# 创建队列
service_client.create_queue("myqueue")
# 获取队列客户端
queue_client = service_client.get_queue_client("myqueue")
# 删除队列
service_client.delete_queue("myqueue")
# 列出队列
for queue in service_client.list_queues():
print(queue.name)
发送消息
# 发送消息(字符串)
queue_client.send_message("Hello, Queue!")
# 带选项发送
queue_client.send_message(
content="Delayed message",
visibility_timeout=60, # 隐藏 60 秒
time_to_live=3600 # 1 小时后过期
)
# 发送 JSON
import json
data = {"task": "process", "id": 123}
queue_client.send_message(json.dumps(data))
接收消息
# 接收消息(使其暂时不可见)
messages = queue_client.receive_messages(
messages_per_page=10,
visibility_timeout=30 # 30 秒处理时间
)
for message in messages:
print(f"ID: {message.id}")
print(f"Content: {message.content}")
print(f"Dequeue count: {message.dequeue_count}")
# 处理消息...
# 处理后删除
queue_client.delete_message(message)
查看消息
# 查看但不隐藏(不影响可见性)
messages = queue_client.peek_messages(max_messages=5)
for message in messages:
print(message.content)
更新消息
# 延长可见性或更新内容
messages = queue_client.receive_messages()
for message in messages:
# 延长超时(需要更多时间)
queue_client.update_message(
message,
visibility_timeout=60
)
# 更新内容和超时
queue_client.update_message(
message,
content="Updated content",
visibility_timeout=60
)
删除消息
# 处理成功后删除
messages = queue_client.receive_messages()
for message in messages:
try:
# 处理...
queue_client.delete_message(message)
except Exception:
# 超时后消息重新变为可见
pass
清空队列
# 删除所有消息
queue_client.clear_messages()
队列属性
# 获取队列属性
properties = queue_client.get_queue_properties()
print(f"Approximate message count: {properties.approximate_message_count}")
# 设置/获取元数据
queue_client.set_queue_metadata(metadata={"environment": "production"})
properties = queue_client.get_queue_properties()
print(properties.metadata)
异步客户端
from azure.storage.queue.aio import QueueServiceClient, QueueClient
from azure.identity.aio import DefaultAzureCredential
async def queue_operations():
credential = DefaultAzureCredential()
async with QueueClient(
account_url="https://<account>.queue.core.windows.net",
queue_name="myqueue",
credential=credential
) as client:
# 发送
await client.send_message("Async message")
# 接收
async for message in client.receive_messages():
print(message.content)
await client.delete_message(message)
import asyncio
asyncio.run(queue_operations())
Base64 编码
from azure.storage.queue import QueueClient, BinaryBase64EncodePolicy, BinaryBase64DecodePolicy
# 用于二进制数据
queue_client = QueueClient(
account_url=account_url,
queue_name="myqueue",
credential=credential,
message_encode_policy=BinaryBase64EncodePolicy(),
message_decode_policy=BinaryBase64DecodePolicy()
)
# 发送字节
queue_client.send_message(b"Binary content")
最佳实践
- 处理后删除消息 以防止重复处理
- 设置适当的可见性超时 基于处理时间
- 处理 dequeue_count 用于毒消息检测
- 使用异步客户端 用于高吞吐量场景
- 使用 peek_messages 用于监控而不影响队列
- 设置 time_to_live 防止过期消息
- 考虑 Service Bus 用于需要更高级功能的场景
兼容工具
Claude CodeCursor
标签
后端开发
