第十二章:分布式锁
深入了解 Redis 分布式锁的实现,包括基本锁、RedLock 算法和可重入锁。
最后更新: 2024-01-15
页面目录
Redis 分布式锁
分布式锁是分布式系统中控制并发访问的重要机制。本章详细介绍如何使用 Redis 实现分布式锁。
分布式锁概述
┌─────────────────────────────────────────────────────────────────┐
│ 分布式锁应用场景 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 分布式系统 │ │
│ │ │ │
│ │ Service A Service B Service C │ │
│ │ │ │ │ │ │
│ │ └─────────────┼─────────────┘ │ │
│ │ │ │ │
│ │ ┌──────┴──────┐ │ │
│ │ │ Redis Lock │ │ │
│ │ │ 互斥访问 │ │ │
│ │ └─────────────┘ │ │
│ │ │ │ │
│ │ ┌──────┴──────┐ │ │
│ │ │ Resource │ │ │
│ │ │ 临界资源 │ │ │
│ │ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
锁的应用场景
| 场景 | 说明 |
|---|---|
| 订单处理 | 防止重复下单 |
| 库存扣减 | 防止超卖 |
| 幂等性控制 | 确保接口幂等 |
| 任务调度 | 避免重复执行 |
| 资源分配 | 限流、连接池 |
基本实现
加锁
# 简单加锁(不推荐)
SET lock:order:12345 "1" NX EX 30
# 参数说明:
# NX: 不存在时设置
# EX 30: 30秒过期
解锁
# 简单解锁(不原子,有风险)
GET lock:order:12345
DEL lock:order:12345
# 问题:如果在 GET 和 DEL 之间锁过期被其他客户端获取,会错误删除别人的锁
原子解锁(Lua 脚本)
-- 解锁脚本
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
Python 实现
简单分布式锁
import redis
import time
import uuid
class RedisLock:
def __init__(self, client, lock_name, timeout=30):
self.client = client
self.lock_name = f"lock:{lock_name}"
self.timeout = timeout
self.token = str(uuid.uuid4()) # 唯一标识
def acquire(self):
"""获取锁"""
return self.client.set(
self.lock_name,
self.token,
nx=True,
ex=self.timeout
)
def release(self):
"""释放锁"""
# Lua 脚本保证原子性
script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
self.client.eval(script, 1, self.lock_name, self.token)
def extend(self, additional_time):
"""延长锁时间(可重入锁)"""
script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('expire', KEYS[1], ARGV[2])
else
return 0
end
"""
return self.client.eval(
script, 1,
self.lock_name,
self.token,
self.timeout + additional_time
)
def __enter__(self):
if not self.acquire():
raise RuntimeError("Failed to acquire lock")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
return False
# 使用示例
client = redis.Redis(host='localhost', port=6379)
with RedisLock(client, "order:12345") as lock:
# 业务逻辑
process_order()
带重试的锁
import time
import threading
class RedisLockWithRetry:
def __init__(self, client, lock_name, timeout=30,
retry_count=3, retry_delay=0.1):
self.lock = RedisLock(client, lock_name, timeout)
self.retry_count = retry_count
self.retry_delay = retry_delay
def acquire(self):
"""带重试的获取锁"""
for _ in range(self.retry_count):
if self.lock.acquire():
return True
time.sleep(self.retry_delay)
return False
def release(self):
"""释放锁"""
self.lock.release()
def __enter__(self):
if not self.acquire():
raise RuntimeError("Failed to acquire lock after retries")
return self
def __exit__(self, *args):
self.release()
# 使用示例
with RedisLockWithRetry(client, "order:12345", retry_count=5) as lock:
process_order()
RedLock 算法
RedLock 是更可靠的分布式锁算法,需要多个独立的 Redis 实例。
算法原理
┌─────────────────────────────────────────────────────────────────┐
│ RedLock 算法原理 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Client │
│ │ │
│ │ 1. 获取当前时间(ms) │
│ │ │
│ ├──────────────────────────────────────────────────────► │
│ │ │
│ │ 2. 依次向 N 个 Redis 实例获取锁 │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ │ Redis 1 │ │ Redis 2 │ │ Redis 3 │ │
│ │ │ ✓ 获取 │ │ ✓ 获取 │ │ ✗ 超时 │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │
│ │ │
│ │ 3. 计算获取锁花费的时间 │
│ │ │
│ │ 4. 判断是否获取成功: │
│ │ • 超过 N/2+1 个实例成功 │
│ │ • 总花费时间 < 锁过期时间 │
│ │ │
│ │ 5. 如果成功,有效时间 = 初始时间 - 获取时间 │
│ │ │
│ │ 6. 失败时,释放所有实例的锁 │
│ │
└─────────────────────────────────────────────────────────────────┘
Python 实现
import time
import uuid
from typing import List
class RedLock:
def __init__(self, client_instances: List[redis.Redis],
retry_count=3, retry_delay=0.2,
clock_drifting_factor=0.01):
"""
RedLock 实现
Args:
client_instances: Redis 客户端列表
retry_count: 重试次数
retry_delay: 重试间隔
clock_drifting_factor: 时钟漂移因子
"""
self.clients = client_instances
self.retry_count = retry_count
self.retry_delay = retry_delay
self.clock_drifting_factor = clock_drifting_factor
self.quorum = len(client_instances) // 2 + 1
self.lock_script = """
if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then
redis.call('pexpire', KEYS[1], ARGV[2])
return 1
end
return 0
"""
self.unlock_script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
def lock(self, resource: str, ttl_ms: int = 30000) -> dict:
"""
获取锁
Args:
resource: 锁的资源名
ttl_ms: 锁过期时间(毫秒)
Returns:
成功: {'validity': True, 'token': 'xxx'}
失败: {'validity': False, 'token': None}
"""
token = str(uuid.uuid4())
start_time = time.time()
for _ in range(self.retry_count):
acquired = []
for client in self.clients:
try:
result = client.eval(
self.lock_script, 1,
f"lock:{resource}",
token,
ttl_ms
)
if result == 1:
acquired.append(True)
except Exception:
acquired.append(False)
# 检查是否获得多数票
acquired_count = sum(acquired)
elapsed_ms = (time.time() - start_time) * 1000
# 时钟漂移补偿
drift = int(ttl_ms * self.clock_drifting_factor) + 2
validity_time = ttl_ms - elapsed_ms - drift
if acquired_count >= self.quorum and validity_time > 0:
return {
'validity': True,
'token': token,
'validity_ms': validity_time,
'servers': acquired_count
}
# 失败,释放所有锁
for client in self.clients:
try:
client.eval(
self.unlock_script, 1,
f"lock:{resource}",
token
)
except Exception:
pass
time.sleep(self.retry_delay)
return {'validity': False, 'token': None}
def unlock(self, resource: str, token: str):
"""释放锁"""
for client in self.clients:
try:
client.eval(
self.unlock_script, 1,
f"lock:{resource}",
token
)
except Exception:
pass
# 使用示例
clients = [
redis.Redis(host='redis1', port=6379, decode_responses=True),
redis.Redis(host='redis2', port=6379, decode_responses=True),
redis.Redis(host='redis3', port=6379, decode_responses=True),
]
lock_manager = RedLock(clients)
result = lock_manager.lock("order:12345", ttl_ms=30000)
if result['validity']:
try:
process_order()
finally:
lock_manager.unlock("order:12345", result['token'])
可重入锁
实现原理
┌─────────────────────────────────────────────────────────────────┐
│ 可重入锁原理 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 锁持有者追踪 │ │
│ │ │ │
│ │ lock:order:123 ──► {token: count} │ │
│ │ └──> UUID1: 3 │ │
│ │ │ │
│ │ 同一个线程多次获取锁,增加计数 │ │
│ │ 释放锁时减少计数,计数为0时删除锁 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Python 实现
class ReentrantLock:
def __init__(self, client, lock_name, timeout=30):
self.client = client
self.lock_name = f"rlock:{lock_name}"
self.timeout = timeout
self.token = str(uuid.uuid4())
self.count = 0
self.lock_key = f"rlock:{lock_name}:{self.token}"
def acquire(self) -> bool:
"""获取锁"""
if self.count > 0:
# 重入:增加计数
self.count += 1
return True
# 首次获取
acquired = self.client.set(
self.lock_name,
self.token,
nx=True,
ex=self.timeout
)
if acquired:
self.count = 1
# 存储锁计数
self.client.set(self.lock_key, 1, ex=self.timeout)
return acquired
def release(self):
"""释放锁"""
if self.count <= 0:
return
self.count -= 1
if self.count == 0:
# 释放锁
script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
self.client.eval(script, 1, self.lock_name, self.token)
self.client.delete(self.lock_key)
def __enter__(self):
if not self.acquire():
raise RuntimeError("Failed to acquire lock")
return self
def __exit__(self, *args):
self.release()
return False
# 使用示例
with ReentrantLock(client, "resource") as lock:
with ReentrantLock(client, "resource") as lock2: # 重入
# 同一线程可以多次获取
access_resource()
常见问题
1. 锁过期但任务未完成
def process_with_lock(client, task_id):
"""
锁过期但任务未完成的处理
方案:使用看门狗自动续期
"""
lock = RedisLock(client, f"task:{task_id}", timeout=30)
# 后台线程续期
def watchdog():
while True:
time.sleep(10)
if lock.token:
lock.extend(30)
# 启动续期线程
import threading
thread = threading.Thread(target=watchdog, daemon=True)
thread.start()
try:
lock.acquire()
execute_task(task_id)
finally:
lock.release()
2. 分布式环境下时钟不同步
# 使用 RedLock 减轻时钟问题
# 或使用 Redis 实例时间而非本地时间
3. 主从切换导致锁丢失
# 使用 RedLock 或 Redisson
# Redisson 内置了完善的分布式锁实现
使用场景示例
订单处理
def process_order(order_id):
"""处理订单,防止重复处理"""
lock_key = f"order:process:{order_id}"
with RedisLock(client, lock_key, timeout=60) as lock:
# 检查订单状态
order = get_order(order_id)
if order.status != 'pending':
return
# 处理订单
process_payment(order)
update_order_status(order_id, 'completed')
库存扣减
def deduct_stock(product_id, quantity):
"""扣减库存,防止超卖"""
lock_key = f"stock:lock:{product_id}"
with RedisLock(client, lock_key, timeout=10) as lock:
stock = int(client.get(f"stock:{product_id}") or 0)
if stock < quantity:
raise ValueError("库存不足")
new_stock = stock - quantity
client.set(f"stock:{product_id}", new_stock)
return new_stock
定时任务协调
def scheduled_task(task_name):
"""定时任务,确保单节点执行"""
lock_key = f"scheduler:{task_name}"
with RedisLock(client, lock_key, timeout=3600) as lock:
execute_scheduled_task(task_name)
下一步
接下来让我们学习 Redis Lua 脚本。
👉 Lua 脚本