第十一章:缓存实战
深入了解 Redis 缓存实战技巧,包括缓存策略、缓存问题和解决方案。
最后更新: 2024-01-15
页面目录
Redis 缓存实战
缓存是 Redis 最常见的应用场景。本章介绍各种缓存策略和常见问题的解决方案。
缓存模式
┌─────────────────────────────────────────────────────────────────┐
│ 缓存读写模式 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Cache-Aside │ │Write-Through│ │Write-Behind│ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ Read: Write: Write: │
│ App → Cache App → Cache → DB │
│ ↓ DB 同步更新 异步写 │
│ ↓ ↓ │
│ Miss → DB Cache miss → DB DB │
│ │
└─────────────────────────────────────────────────────────────────┘
Cache-Aside 模式
原理
读操作:
1. 应用程序先查缓存
2. 缓存命中 → 返回数据
3. 缓存未命中 → 查数据库 → 写入缓存 → 返回数据
写操作:
1. 应用程序写数据库
2. 删除缓存
Python 实现
import redis
import json
client = redis.Redis(host='localhost', port=6379, decode_responses=True)
def get_user(user_id):
"""Cache-Aside 读操作"""
cache_key = f"user:{user_id}"
# 1. 查缓存
cached = client.get(cache_key)
if cached:
return json.loads(cached)
# 2. 缓存未命中,查数据库
user = db.query("SELECT * FROM users WHERE id = ?", user_id)
# 3. 写入缓存
client.setex(cache_key, 3600, json.dumps(user))
return user
def update_user(user_id, data):
"""Cache-Aside 写操作"""
# 1. 写数据库
db.execute("UPDATE users SET ... WHERE id = ?", user_id, data)
# 2. 删除缓存
client.delete(f"user:{user_id}")
Write-Through 模式
原理
写操作:
1. 应用程序写缓存
2. 缓存同步写数据库
读操作与 Cache-Aside 相同
Python 实现
def set_user(user_id, data):
"""Write-Through 写操作"""
cache_key = f"user:{user_id}"
# 1. 写缓存
client.setex(cache_key, 3600, json.dumps(data))
# 2. 同步写数据库
db.execute("INSERT INTO users ...", data)
return data
def get_user(user_id):
"""Write-Through 读操作"""
return get_user_cached(user_id) # 与 Cache-Aside 相同
缓存过期策略
TTL 设置
# 不同数据设置不同 TTL
CACHE_TTL = {
'user_profile': 3600, # 用户信息:1小时
'product_info': 7200, # 商品信息:2小时
'hot_product': 300, # 热门商品:5分钟
'session': 1800, # 会话:30分钟
'config': 86400, # 配置:1天
}
def cache_set(key, value, ttl):
client.setex(key, ttl, json.dumps(value))
def cache_get(key):
cached = client.get(key)
return json.loads(cached) if cached else None
过期策略选择
| 数据类型 | 推荐 TTL | 说明 |
|---|---|---|
| 用户信息 | 1-24h | 变更不频繁 |
| 会话 | 30min-2h | 与登录超时相关 |
| 配置 | 1-24h | 变更极少 |
| 热门数据 | 5-30min | 需要新鲜度 |
| 计数/统计 | 60s-5min | 需要实时性 |
缓存问题与解决方案
1. 缓存穿透
问题:大量请求查询不存在的数据
请求 → Redis(未命中) → MySQL(无数据) → 返回空
↑ 大量重复请求
解决方案
def get_user(user_id):
"""缓存穿透防护"""
cache_key = f"user:{user_id}"
cached = client.get(cache_key)
if cached:
return json.loads(cached)
# 查询数据库
user = db.query("SELECT * FROM users WHERE id = ?", user_id)
if user:
# 正常数据缓存
client.setex(cache_key, 3600, json.dumps(user))
else:
# 空值缓存,防止穿透
client.setex(cache_key, 60, "")
return user
# 或者使用布隆过滤器
from bloom_filter import BloomFilter
bloom = BloomFilter(max_elements=100000, error_rate=0.01)
def get_user(user_id):
if user_id not in bloom:
return None # 布隆过滤器判定不存在
return get_user_from_db(user_id)
2. 缓存击穿
问题:热点 key 过期,瞬间大量请求击穿到数据库
时间 T: Hot Key 过期
时间 T+: 大量请求 → DB
解决方案:互斥锁
import time
import threading
def get_user_mutex(user_id):
"""互斥锁方案"""
cache_key = f"user:{user_id}"
# 先查缓存
cached = client.get(cache_key)
if cached:
return json.loads(cached)
# 获取锁
lock_key = f"lock:{cache_key}"
lock_acquired = client.set(lock_key, "1", nx=True, ex=10)
if lock_acquired:
try:
# 双检查
cached = client.get(cache_key)
if cached:
return json.loads(cached)
# 查数据库
user = db.query("SELECT * FROM users WHERE id = ?", user_id)
if user:
client.setex(cache_key, 3600, json.dumps(user))
return user
finally:
client.delete(lock_key)
else:
# 等待后重试
time.sleep(0.1)
return get_user_mutex(user_id)
3. 缓存雪崩
问题:大量 key 同时过期,请求击穿到数据库
时间 T: 大量 Key 过期
时间 T+: 大量请求 → DB
解决方案
import random
def set_with_jitter(key, value, ttl):
"""添加随机抖动,防止雪崩"""
jitter = random.randint(0, int(ttl * 0.1)) # 10% 随机抖动
client.setex(key, ttl + jitter, json.dumps(value))
# 方案二:热点数据永不过期 + 后台更新
def get_user(user_id):
"""永不过期 + 后台更新"""
cache_key = f"user:{user_id}"
cached = client.get(cache_key)
if cached:
data = json.loads(cached)
# 检查是否需要更新
if data.get('expires_at') < time.time():
# 异步更新
update_cache_async(user_id)
return data
# 首次加载
user = db.query("SELECT * FROM users WHERE id = ?", user_id)
data = {
'data': user,
'expires_at': time.time() + 3600
}
client.set(cache_key, json.dumps(data))
return data
# 方案三:多级缓存
CACHE_L1 = {} # 本地缓存
CACHE_L2 = redis # Redis 缓存
def get_user_l1_l2(user_id):
"""二级缓存"""
# L1 查找
if user_id in CACHE_L1:
return CACHE_L1[user_id]
# L2 查找
cache_key = f"user:{user_id}"
cached = CACHE_L2.get(cache_key)
if cached:
user = json.loads(cached)
CACHE_L1[user_id] = user # 写入 L1
return user
# 数据库查询
user = db.query(user_id)
CACHE_L1[user_id] = user
CACHE_L2.setex(cache_key, 3600, json.dumps(user))
return user
热点数据处理
热点 key 识别
import redis
import time
class HotKeyDetector:
def __init__(self):
self.client = redis.Redis(host='localhost', port=6379)
self.counter_key = "hotkey:counter"
self.window = 60 # 统计窗口(秒)
def record_access(self, key):
"""记录访问"""
pipe = self.client.pipeline()
pipe.zincrby(self.counter_key, 1, key)
pipe.expire(self.counter_key, self.window)
pipe.execute()
def get_hot_keys(self, top_n=100):
"""获取热点 key"""
return self.client.zrevrange(self.counter_key, 0, top_n - 1)
# 使用装饰器
def hot_key_monitor(func):
def wrapper(*args, **kwargs):
key = f"{func.__name__}:{args[0]}"
detector.record_access(key)
return func(*args, **kwargs)
return wrapper
热点数据预热
def preload_hot_data():
"""热点数据预热"""
# 从数据库加载热点数据
hot_users = db.query("SELECT * FROM users ORDER BY view_count DESC LIMIT 100")
pipe = client.pipeline()
for user in hot_users:
key = f"user:{user['id']}"
pipe.setex(key, 3600, json.dumps(user))
pipe.execute()
# 启动时预热
if __name__ == '__main__':
preload_hot_data()
分布式缓存
Redis Cluster 缓存
from redis.cluster import RedisCluster
cluster = RedisCluster(
host='127.0.0.1',
port=7000,
decode_responses=True
)
def get_product(product_id):
"""集群模式缓存"""
cache_key = f"product:{product_id}"
cached = cluster.get(cache_key)
if cached:
return json.loads(cached)
product = db.query("SELECT * FROM products WHERE id = ?", product_id)
if product:
cluster.setex(cache_key, 7200, json.dumps(product))
return product
Redis Sentinel 缓存
from redis.sentinel import Sentinel
sentinel = Sentinel([
('localhost', 26379),
('localhost', 26380),
], socket_timeout=0.1)
# 获取主节点写缓存
master = sentinel.master_for('mymaster')
# 获取从节点读缓存
slave = sentinel.slave_for('mymaster')
def write_cache(key, value, ttl):
"""写主节点"""
master.setex(key, ttl, json.dumps(value))
def read_cache(key):
"""读从节点"""
return slave.get(key)
缓存监控
监控指标
def monitor_cache():
"""缓存监控"""
info = client.info('stats')
print(f"键空间命中: {info['keyspace_hits']}")
print(f"键空间未命中: {info['keyspace_misses']}")
hits = info['keyspace_hits']
misses = info['keyspace_misses']
total = hits + misses
if total > 0:
hit_rate = hits / total * 100
print(f"命中率: {hit_rate:.2f}%")
# 监控 Big Key
big_keys = []
for key in client.scan_iter(count=1000):
size = client.memory_usage(key) or 0
if size > 10 * 1024 * 1024: # > 10MB
big_keys.append((key, size))
print(f"Big Keys: {big_keys}")
告警配置
def cache_health_check():
"""缓存健康检查"""
alerts = []
# 检查命中率
info = client.info('stats')
hit_rate = info['keyspace_hits'] / (info['keyspace_hits'] + info['keyspace_misses'])
if hit_rate < 0.8:
alerts.append(f"缓存命中率过低: {hit_rate:.2%}")
# 检查内存使用
mem_info = client.info('memory')
if mem_info['used_memory'] > mem_info['maxmemory'] * 0.8:
alerts.append("内存使用超过80%")
# 检查连接数
clients = client.info('clients')
if clients['connected_clients'] > 1000:
alerts.append("连接数过高")
return alerts
最佳实践
代码规范
# ✅ 推荐:统一封装
class CacheManager:
def __init__(self, client):
self.client = client
def get(self, key, deserializer=json.loads):
cached = self.client.get(key)
return deserializer(cached) if cached else None
def set(self, key, value, ttl=3600, serializer=json.dumps):
self.client.setex(key, ttl, serializer(value))
def delete(self, key):
self.client.delete(key)
def exists(self, key):
return self.client.exists(key)
# ✅ 推荐:使用连接池
pool = redis.ConnectionPool(host='localhost', port=6379, max_connections=50)
cache = CacheManager(redis.Redis(connection_pool=pool))
# ❌ 避免:硬编码
client = redis.Redis(host='localhost', port=6379)
键命名规范
# 键命名格式:业务:类型:标识
# 示例:
# user:profile:1001 # 用户信息
# product:detail:2001 # 商品详情
# order:info:3001 # 订单信息
# session:user:abc123 # 会话
# lock:order:12345 # 分布式锁
# cache:hot:products # 热门商品
# ✅ 推荐:使用冒号分隔
# ❌ 避免:使用下划线或其他分隔符
下一步
接下来让我们学习 Redis 分布式锁。
👉 分布式锁