第十一章:缓存实战

深入了解 Redis 缓存实战技巧,包括缓存策略、缓存问题和解决方案。

最后更新: 2024-01-15
页面目录

Redis 缓存实战

缓存是 Redis 最常见的应用场景。本章介绍各种缓存策略和常见问题的解决方案。

缓存模式

┌─────────────────────────────────────────────────────────────────┐
│                      缓存读写模式                                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   ┌─────────────┐     ┌─────────────┐     ┌─────────────┐      │
│   │  Cache-Aside │     │Write-Through│     │Write-Behind│      │
│   └─────────────┘     └─────────────┘     └─────────────┘      │
│                                                                  │
│   Read:           Write:           Write:                        │
│   App → Cache     App → Cache → DB                             │
│      ↓            DB 同步更新      异步写                        │
│      ↓                              ↓                           │
│   Miss → DB       Cache miss → DB    DB                         │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Cache-Aside 模式

原理

读操作:
1. 应用程序先查缓存
2. 缓存命中 → 返回数据
3. 缓存未命中 → 查数据库 → 写入缓存 → 返回数据

写操作:
1. 应用程序写数据库
2. 删除缓存

Python 实现

import redis
import json

client = redis.Redis(host='localhost', port=6379, decode_responses=True)

def get_user(user_id):
    """Cache-Aside 读操作"""
    cache_key = f"user:{user_id}"
    
    # 1. 查缓存
    cached = client.get(cache_key)
    if cached:
        return json.loads(cached)
    
    # 2. 缓存未命中,查数据库
    user = db.query("SELECT * FROM users WHERE id = ?", user_id)
    
    # 3. 写入缓存
    client.setex(cache_key, 3600, json.dumps(user))
    
    return user

def update_user(user_id, data):
    """Cache-Aside 写操作"""
    # 1. 写数据库
    db.execute("UPDATE users SET ... WHERE id = ?", user_id, data)
    
    # 2. 删除缓存
    client.delete(f"user:{user_id}")

Write-Through 模式

原理

写操作:
1. 应用程序写缓存
2. 缓存同步写数据库

读操作与 Cache-Aside 相同

Python 实现

def set_user(user_id, data):
    """Write-Through 写操作"""
    cache_key = f"user:{user_id}"
    
    # 1. 写缓存
    client.setex(cache_key, 3600, json.dumps(data))
    
    # 2. 同步写数据库
    db.execute("INSERT INTO users ...", data)
    
    return data

def get_user(user_id):
    """Write-Through 读操作"""
    return get_user_cached(user_id)  # 与 Cache-Aside 相同

缓存过期策略

TTL 设置

# 不同数据设置不同 TTL
CACHE_TTL = {
    'user_profile': 3600,      # 用户信息:1小时
    'product_info': 7200,     # 商品信息:2小时
    'hot_product': 300,        # 热门商品:5分钟
    'session': 1800,           # 会话:30分钟
    'config': 86400,          # 配置:1天
}

def cache_set(key, value, ttl):
    client.setex(key, ttl, json.dumps(value))

def cache_get(key):
    cached = client.get(key)
    return json.loads(cached) if cached else None

过期策略选择

数据类型 推荐 TTL 说明
用户信息 1-24h 变更不频繁
会话 30min-2h 与登录超时相关
配置 1-24h 变更极少
热门数据 5-30min 需要新鲜度
计数/统计 60s-5min 需要实时性

缓存问题与解决方案

1. 缓存穿透

问题:大量请求查询不存在的数据

请求 → Redis(未命中) → MySQL(无数据) → 返回空
     ↑ 大量重复请求

解决方案

def get_user(user_id):
    """缓存穿透防护"""
    cache_key = f"user:{user_id}"
    
    cached = client.get(cache_key)
    if cached:
        return json.loads(cached)
    
    # 查询数据库
    user = db.query("SELECT * FROM users WHERE id = ?", user_id)
    
    if user:
        # 正常数据缓存
        client.setex(cache_key, 3600, json.dumps(user))
    else:
        # 空值缓存,防止穿透
        client.setex(cache_key, 60, "")
    
    return user

# 或者使用布隆过滤器
from bloom_filter import BloomFilter

bloom = BloomFilter(max_elements=100000, error_rate=0.01)

def get_user(user_id):
    if user_id not in bloom:
        return None  # 布隆过滤器判定不存在
    return get_user_from_db(user_id)

2. 缓存击穿

问题:热点 key 过期,瞬间大量请求击穿到数据库

时间 T:  Hot Key 过期
时间 T+: 大量请求 → DB

解决方案:互斥锁

import time
import threading

def get_user_mutex(user_id):
    """互斥锁方案"""
    cache_key = f"user:{user_id}"
    
    # 先查缓存
    cached = client.get(cache_key)
    if cached:
        return json.loads(cached)
    
    # 获取锁
    lock_key = f"lock:{cache_key}"
    lock_acquired = client.set(lock_key, "1", nx=True, ex=10)
    
    if lock_acquired:
        try:
            # 双检查
            cached = client.get(cache_key)
            if cached:
                return json.loads(cached)
            
            # 查数据库
            user = db.query("SELECT * FROM users WHERE id = ?", user_id)
            
            if user:
                client.setex(cache_key, 3600, json.dumps(user))
            
            return user
        finally:
            client.delete(lock_key)
    else:
        # 等待后重试
        time.sleep(0.1)
        return get_user_mutex(user_id)

3. 缓存雪崩

问题:大量 key 同时过期,请求击穿到数据库

时间 T:  大量 Key 过期
时间 T+: 大量请求 → DB

解决方案

import random

def set_with_jitter(key, value, ttl):
    """添加随机抖动,防止雪崩"""
    jitter = random.randint(0, int(ttl * 0.1))  # 10% 随机抖动
    client.setex(key, ttl + jitter, json.dumps(value))

# 方案二:热点数据永不过期 + 后台更新
def get_user(user_id):
    """永不过期 + 后台更新"""
    cache_key = f"user:{user_id}"
    
    cached = client.get(cache_key)
    if cached:
        data = json.loads(cached)
        # 检查是否需要更新
        if data.get('expires_at') < time.time():
            # 异步更新
            update_cache_async(user_id)
        return data
    
    # 首次加载
    user = db.query("SELECT * FROM users WHERE id = ?", user_id)
    data = {
        'data': user,
        'expires_at': time.time() + 3600
    }
    client.set(cache_key, json.dumps(data))
    return data

# 方案三:多级缓存
CACHE_L1 = {}  # 本地缓存
CACHE_L2 = redis  # Redis 缓存

def get_user_l1_l2(user_id):
    """二级缓存"""
    # L1 查找
    if user_id in CACHE_L1:
        return CACHE_L1[user_id]
    
    # L2 查找
    cache_key = f"user:{user_id}"
    cached = CACHE_L2.get(cache_key)
    if cached:
        user = json.loads(cached)
        CACHE_L1[user_id] = user  # 写入 L1
        return user
    
    # 数据库查询
    user = db.query(user_id)
    CACHE_L1[user_id] = user
    CACHE_L2.setex(cache_key, 3600, json.dumps(user))
    return user

热点数据处理

热点 key 识别

import redis
import time

class HotKeyDetector:
    def __init__(self):
        self.client = redis.Redis(host='localhost', port=6379)
        self.counter_key = "hotkey:counter"
        self.window = 60  # 统计窗口(秒)
    
    def record_access(self, key):
        """记录访问"""
        pipe = self.client.pipeline()
        pipe.zincrby(self.counter_key, 1, key)
        pipe.expire(self.counter_key, self.window)
        pipe.execute()
    
    def get_hot_keys(self, top_n=100):
        """获取热点 key"""
        return self.client.zrevrange(self.counter_key, 0, top_n - 1)

# 使用装饰器
def hot_key_monitor(func):
    def wrapper(*args, **kwargs):
        key = f"{func.__name__}:{args[0]}"
        detector.record_access(key)
        return func(*args, **kwargs)
    return wrapper

热点数据预热

def preload_hot_data():
    """热点数据预热"""
    # 从数据库加载热点数据
    hot_users = db.query("SELECT * FROM users ORDER BY view_count DESC LIMIT 100")
    
    pipe = client.pipeline()
    for user in hot_users:
        key = f"user:{user['id']}"
        pipe.setex(key, 3600, json.dumps(user))
    pipe.execute()

# 启动时预热
if __name__ == '__main__':
    preload_hot_data()

分布式缓存

Redis Cluster 缓存

from redis.cluster import RedisCluster

cluster = RedisCluster(
    host='127.0.0.1',
    port=7000,
    decode_responses=True
)

def get_product(product_id):
    """集群模式缓存"""
    cache_key = f"product:{product_id}"
    
    cached = cluster.get(cache_key)
    if cached:
        return json.loads(cached)
    
    product = db.query("SELECT * FROM products WHERE id = ?", product_id)
    if product:
        cluster.setex(cache_key, 7200, json.dumps(product))
    
    return product

Redis Sentinel 缓存

from redis.sentinel import Sentinel

sentinel = Sentinel([
    ('localhost', 26379),
    ('localhost', 26380),
], socket_timeout=0.1)

# 获取主节点写缓存
master = sentinel.master_for('mymaster')

# 获取从节点读缓存
slave = sentinel.slave_for('mymaster')

def write_cache(key, value, ttl):
    """写主节点"""
    master.setex(key, ttl, json.dumps(value))

def read_cache(key):
    """读从节点"""
    return slave.get(key)

缓存监控

监控指标

def monitor_cache():
    """缓存监控"""
    info = client.info('stats')
    
    print(f"键空间命中: {info['keyspace_hits']}")
    print(f"键空间未命中: {info['keyspace_misses']}")
    
    hits = info['keyspace_hits']
    misses = info['keyspace_misses']
    total = hits + misses
    
    if total > 0:
        hit_rate = hits / total * 100
        print(f"命中率: {hit_rate:.2f}%")
    
    # 监控 Big Key
    big_keys = []
    for key in client.scan_iter(count=1000):
        size = client.memory_usage(key) or 0
        if size > 10 * 1024 * 1024:  # > 10MB
            big_keys.append((key, size))
    
    print(f"Big Keys: {big_keys}")

告警配置

def cache_health_check():
    """缓存健康检查"""
    alerts = []
    
    # 检查命中率
    info = client.info('stats')
    hit_rate = info['keyspace_hits'] / (info['keyspace_hits'] + info['keyspace_misses'])
    if hit_rate < 0.8:
        alerts.append(f"缓存命中率过低: {hit_rate:.2%}")
    
    # 检查内存使用
    mem_info = client.info('memory')
    if mem_info['used_memory'] > mem_info['maxmemory'] * 0.8:
        alerts.append("内存使用超过80%")
    
    # 检查连接数
    clients = client.info('clients')
    if clients['connected_clients'] > 1000:
        alerts.append("连接数过高")
    
    return alerts

最佳实践

代码规范

# ✅ 推荐:统一封装
class CacheManager:
    def __init__(self, client):
        self.client = client
    
    def get(self, key, deserializer=json.loads):
        cached = self.client.get(key)
        return deserializer(cached) if cached else None
    
    def set(self, key, value, ttl=3600, serializer=json.dumps):
        self.client.setex(key, ttl, serializer(value))
    
    def delete(self, key):
        self.client.delete(key)
    
    def exists(self, key):
        return self.client.exists(key)

# ✅ 推荐:使用连接池
pool = redis.ConnectionPool(host='localhost', port=6379, max_connections=50)
cache = CacheManager(redis.Redis(connection_pool=pool))

# ❌ 避免:硬编码
client = redis.Redis(host='localhost', port=6379)

键命名规范

# 键命名格式:业务:类型:标识
# 示例:
# user:profile:1001           # 用户信息
# product:detail:2001          # 商品详情
# order:info:3001              # 订单信息
# session:user:abc123          # 会话
# lock:order:12345             # 分布式锁
# cache:hot:products           # 热门商品

# ✅ 推荐:使用冒号分隔
# ❌ 避免:使用下划线或其他分隔符

下一步

接下来让我们学习 Redis 分布式锁。

👉 分布式锁