
关于
Redis 数据结构模式、缓存策略、分布式锁、速率限制、发布/订阅和生产应用的连接管理。
name: redis-patterns description: Redis 数据结构模式、缓存策略、分布式锁、速率限制、发布/订阅以及生产应用的连接管理。 origin: ECC
Redis 模式
Redis 常见后端用例的最佳实践快速参考。
工作原理
Redis 是一个内存数据结构存储,支持字符串、哈希、列表、集合、有序集合、流等多种数据类型。单个 Redis 命令在单实例上是原子性的;多步骤工作流需要 Lua 脚本、MULTI/EXEC 事务或显式同步来保持原子性。数据可通过 RDB 快照或 AOF 日志进行可选持久化。客户端通过 TCP 使用 RESP 协议通信;连接池对于避免每次请求的握手开销至关重要。
何时激活
- 为应用添加缓存
- 实现速率限制或节流
- 构建分布式锁或协调机制
- 设置会话或令牌存储
- 使用 Pub/Sub 或 Redis Streams 进行消息传递
- 在生产环境配置 Redis(连接池、淘汰策略、集群)
数据结构速查表
| 用例 | 结构 | 示例键 |
|------|------|--------|
| 简单缓存 | String | product:123 |
| 用户会话 | Hash | session:abc |
| 排行榜 | Sorted Set | scores:weekly |
| 唯一访客 | Set | visitors:2024-01-01 |
| 动态流 | List | feed:user:456 |
| 事件流 | Stream | events:orders |
| 计数器/速率限制 | String (INCR) | ratelimit:user:123 |
| 布隆过滤器/HLL | HyperLogLog | hll:pageviews |
核心模式
Cache-Aside(延迟加载)
import redis
import json
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def get_product(product_id: int):
cache_key = f"product:{product_id}"
cached = r.get(cache_key)
if cached:
return json.loads(cached)
product = db.query("SELECT * FROM products WHERE id = %s", product_id)
r.setex(cache_key, 3600, json.dumps(product)) # TTL: 1 hour
return product
Write-Through 缓存
def update_product(product_id: int, data: dict):
# Write to DB first
db.execute("UPDATE products SET ... WHERE id = %s", product_id)
# Immediately update cache
cache_key = f"product:{product_id}"
r.setex(cache_key, 3600, json.dumps(data))
缓存失效
# Tag-based invalidation — group related keys under a set
def cache_product(product_id: int, category_id: int, data: dict):
key = f"product:{product_id}"
tag = f"tag:category:{category_id}"
pipe = r.pipeline(transaction=True)
pipe.setex(key, 3600, json.dumps(data))
pipe.sadd(tag, key)
pipe.expire(tag, 3600)
pipe.execute()
def invalidate_category(category_id: int):
tag = f"tag:category:{category_id}"
keys = r.smembers(tag)
if keys:
r.delete(*keys)
r.delete(tag)
会话存储
import time
import uuid
def create_session(user_id: int, ttl: int = 86400) -> str:
session_id = str(uuid.uuid4())
key = f"session:{session_id}"
pipe = r.pipeline(transaction=True)
pipe.hset(key, mapping={
"user_id": user_id,
"created_at": int(time.time()),
})
pipe.expire(key, ttl)
pipe.execute()
return session_id
def get_session(session_id: str) -> dict | None:
data = r.hgetall(f"session:{session_id}")
return data if data else None
def delete_session(session_id: str):
r.delete(f"session:{session_id}")
速率限制
固定窗口(简单)
def is_rate_limited(user_id: int, limit: int = 100, window: int = 60) -> bool:
key = f"ratelimit:{user_id}:{int(time.time()) // window}"
pipe = r.pipeline(transaction=True)
pipe.incr(key)
pipe.expire(key, window)
count, _ = pipe.execute()
return count > limit
滑动窗口(Lua — 原子操作)
-- sliding_window.lua
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
local count = redis.call('ZCARD', key)
if count < limit then
-- Use unique member (now + sequence) to avoid collisions within the same millisecond
local seq_key = key .. ':seq'
local seq = redis.call('INCR', seq_key)
redis.call('EXPIRE', seq_key, math.ceil(window / 1000))
redis.call('ZADD', key, now, now .. '-' .. seq)
redis.call('EXPIRE', key, math.ceil(window / 1000))
return 1
end
return 0
sliding_window = r.register_script(open('sliding_window.lua').read())
def allow_request(user_id: int) -> bool:
key = f"ratelimit:sliding:{user_id}"
now = int(time.time() * 1000)
return bool(sliding_window(keys=[key], args=[now, 60000, 100]))
分布式锁
分布式锁(单节点 — SET NX PX)
import uuid
def acquire_lock(resource: str, ttl_ms: int = 5000) -> str | None:
lock_key = f"lock:{resource}"
token = str(uuid.uuid4())
acquired = r.set(lock_key, token, nx=True, px=ttl_ms)
return token if acquired else None
兼容工具
Claude CodeCursor
标签
前端开发
