第十二章:分布式锁

深入了解 Redis 分布式锁的实现,包括基本锁、RedLock 算法和可重入锁。

最后更新: 2024-01-15
页面目录

Redis 分布式锁

分布式锁是分布式系统中控制并发访问的重要机制。本章详细介绍如何使用 Redis 实现分布式锁。

分布式锁概述

┌─────────────────────────────────────────────────────────────────┐
│                    分布式锁应用场景                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   ┌─────────────────────────────────────────────────────────┐   │
│   │                    分布式系统                           │   │
│   │                                                         │   │
│   │   Service A     Service B     Service C                │   │
│   │       │             │             │                     │   │
│   │       └─────────────┼─────────────┘                     │   │
│   │                     │                                   │   │
│   │              ┌──────┴──────┐                            │   │
│   │              │  Redis Lock  │                            │   │
│   │              │   互斥访问   │                            │   │
│   │              └─────────────┘                            │   │
│   │                     │                                   │   │
│   │              ┌──────┴──────┐                            │   │
│   │              │   Resource   │                            │   │
│   │              │   临界资源   │                            │   │
│   │              └─────────────┘                            │   │
│   └─────────────────────────────────────────────────────────┘   │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

锁的应用场景

场景 说明
订单处理 防止重复下单
库存扣减 防止超卖
幂等性控制 确保接口幂等
任务调度 避免重复执行
资源分配 限流、连接池

基本实现

加锁

# 简单加锁(不推荐)
SET lock:order:12345 "1" NX EX 30

# 参数说明:
# NX: 不存在时设置
# EX 30: 30秒过期

解锁

# 简单解锁(不原子,有风险)
GET lock:order:12345
DEL lock:order:12345

# 问题:如果在 GET 和 DEL 之间锁过期被其他客户端获取,会错误删除别人的锁

原子解锁(Lua 脚本)

-- 解锁脚本
if redis.call('get', KEYS[1]) == ARGV[1] then
    return redis.call('del', KEYS[1])
else
    return 0
end

Python 实现

简单分布式锁

import redis
import time
import uuid

class RedisLock:
    def __init__(self, client, lock_name, timeout=30):
        self.client = client
        self.lock_name = f"lock:{lock_name}"
        self.timeout = timeout
        self.token = str(uuid.uuid4())  # 唯一标识
    
    def acquire(self):
        """获取锁"""
        return self.client.set(
            self.lock_name, 
            self.token, 
            nx=True, 
            ex=self.timeout
        )
    
    def release(self):
        """释放锁"""
        # Lua 脚本保证原子性
        script = """
        if redis.call('get', KEYS[1]) == ARGV[1] then
            return redis.call('del', KEYS[1])
        else
            return 0
        end
        """
        self.client.eval(script, 1, self.lock_name, self.token)
    
    def extend(self, additional_time):
        """延长锁时间(可重入锁)"""
        script = """
        if redis.call('get', KEYS[1]) == ARGV[1] then
            return redis.call('expire', KEYS[1], ARGV[2])
        else
            return 0
        end
        """
        return self.client.eval(
            script, 1, 
            self.lock_name, 
            self.token, 
            self.timeout + additional_time
        )
    
    def __enter__(self):
        if not self.acquire():
            raise RuntimeError("Failed to acquire lock")
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()
        return False


# 使用示例
client = redis.Redis(host='localhost', port=6379)

with RedisLock(client, "order:12345") as lock:
    # 业务逻辑
    process_order()

带重试的锁

import time
import threading

class RedisLockWithRetry:
    def __init__(self, client, lock_name, timeout=30, 
                 retry_count=3, retry_delay=0.1):
        self.lock = RedisLock(client, lock_name, timeout)
        self.retry_count = retry_count
        self.retry_delay = retry_delay
    
    def acquire(self):
        """带重试的获取锁"""
        for _ in range(self.retry_count):
            if self.lock.acquire():
                return True
            time.sleep(self.retry_delay)
        return False
    
    def release(self):
        """释放锁"""
        self.lock.release()
    
    def __enter__(self):
        if not self.acquire():
            raise RuntimeError("Failed to acquire lock after retries")
        return self
    
    def __exit__(self, *args):
        self.release()


# 使用示例
with RedisLockWithRetry(client, "order:12345", retry_count=5) as lock:
    process_order()

RedLock 算法

RedLock 是更可靠的分布式锁算法,需要多个独立的 Redis 实例。

算法原理

┌─────────────────────────────────────────────────────────────────┐
│                    RedLock 算法原理                               │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   Client                                                         │
│     │                                                          │
│     │ 1. 获取当前时间(ms)                                        │
│     │                                                          │
│     ├──────────────────────────────────────────────────────►    │
│     │                                                           │
│     │ 2. 依次向 N 个 Redis 实例获取锁                           │
│     │    ┌─────────┐ ┌─────────┐ ┌─────────┐                   │
│     │    │ Redis 1 │ │ Redis 2 │ │ Redis 3 │                   │
│     │    │  ✓ 获取  │ │  ✓ 获取  │ │  ✗ 超时  │                   │
│     │    └─────────┘ └─────────┘ └─────────┘                   │
│     │                                                           │
│     │ 3. 计算获取锁花费的时间                                   │
│     │                                                          │
│     │ 4. 判断是否获取成功:                                     │
│     │    • 超过 N/2+1 个实例成功                                │
│     │    • 总花费时间 < 锁过期时间                              │
│     │                                                          │
│     │ 5. 如果成功,有效时间 = 初始时间 - 获取时间                │
│     │                                                          │
│     │ 6. 失败时,释放所有实例的锁                               │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Python 实现

import time
import uuid
from typing import List

class RedLock:
    def __init__(self, client_instances: List[redis.Redis], 
                 retry_count=3, retry_delay=0.2,
                 clock_drifting_factor=0.01):
        """
        RedLock 实现
        
        Args:
            client_instances: Redis 客户端列表
            retry_count: 重试次数
            retry_delay: 重试间隔
            clock_drifting_factor: 时钟漂移因子
        """
        self.clients = client_instances
        self.retry_count = retry_count
        self.retry_delay = retry_delay
        self.clock_drifting_factor = clock_drifting_factor
        self.quorum = len(client_instances) // 2 + 1
        self.lock_script = """
        if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then
            redis.call('pexpire', KEYS[1], ARGV[2])
            return 1
        end
        return 0
        """
        self.unlock_script = """
        if redis.call('get', KEYS[1]) == ARGV[1] then
            return redis.call('del', KEYS[1])
        else
            return 0
        end
        """
    
    def lock(self, resource: str, ttl_ms: int = 30000) -> dict:
        """
        获取锁
        
        Args:
            resource: 锁的资源名
            ttl_ms: 锁过期时间(毫秒)
        
        Returns:
            成功: {'validity': True, 'token': 'xxx'}
            失败: {'validity': False, 'token': None}
        """
        token = str(uuid.uuid4())
        start_time = time.time()
        
        for _ in range(self.retry_count):
            acquired = []
            for client in self.clients:
                try:
                    result = client.eval(
                        self.lock_script, 1, 
                        f"lock:{resource}", 
                        token, 
                        ttl_ms
                    )
                    if result == 1:
                        acquired.append(True)
                except Exception:
                    acquired.append(False)
            
            # 检查是否获得多数票
            acquired_count = sum(acquired)
            elapsed_ms = (time.time() - start_time) * 1000
            
            # 时钟漂移补偿
            drift = int(ttl_ms * self.clock_drifting_factor) + 2
            
            validity_time = ttl_ms - elapsed_ms - drift
            
            if acquired_count >= self.quorum and validity_time > 0:
                return {
                    'validity': True,
                    'token': token,
                    'validity_ms': validity_time,
                    'servers': acquired_count
                }
            
            # 失败,释放所有锁
            for client in self.clients:
                try:
                    client.eval(
                        self.unlock_script, 1, 
                        f"lock:{resource}", 
                        token
                    )
                except Exception:
                    pass
            
            time.sleep(self.retry_delay)
        
        return {'validity': False, 'token': None}
    
    def unlock(self, resource: str, token: str):
        """释放锁"""
        for client in self.clients:
            try:
                client.eval(
                    self.unlock_script, 1, 
                    f"lock:{resource}", 
                    token
                )
            except Exception:
                pass


# 使用示例
clients = [
    redis.Redis(host='redis1', port=6379, decode_responses=True),
    redis.Redis(host='redis2', port=6379, decode_responses=True),
    redis.Redis(host='redis3', port=6379, decode_responses=True),
]

lock_manager = RedLock(clients)

result = lock_manager.lock("order:12345", ttl_ms=30000)
if result['validity']:
    try:
        process_order()
    finally:
        lock_manager.unlock("order:12345", result['token'])

可重入锁

实现原理

┌─────────────────────────────────────────────────────────────────┐
│                    可重入锁原理                                   │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   ┌─────────────────────────────────────────────────────────┐   │
│   │  锁持有者追踪                                            │   │
│   │                                                         │   │
│   │   lock:order:123  ──► {token: count}                   │   │
│   │                      └──> UUID1: 3                       │   │
│   │                                                         │   │
│   │   同一个线程多次获取锁,增加计数                          │   │
│   │   释放锁时减少计数,计数为0时删除锁                       │   │
│   └─────────────────────────────────────────────────────────┘   │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Python 实现

class ReentrantLock:
    def __init__(self, client, lock_name, timeout=30):
        self.client = client
        self.lock_name = f"rlock:{lock_name}"
        self.timeout = timeout
        self.token = str(uuid.uuid4())
        self.count = 0
        self.lock_key = f"rlock:{lock_name}:{self.token}"
    
    def acquire(self) -> bool:
        """获取锁"""
        if self.count > 0:
            # 重入:增加计数
            self.count += 1
            return True
        
        # 首次获取
        acquired = self.client.set(
            self.lock_name, 
            self.token, 
            nx=True, 
            ex=self.timeout
        )
        
        if acquired:
            self.count = 1
            # 存储锁计数
            self.client.set(self.lock_key, 1, ex=self.timeout)
        
        return acquired
    
    def release(self):
        """释放锁"""
        if self.count <= 0:
            return
        
        self.count -= 1
        
        if self.count == 0:
            # 释放锁
            script = """
            if redis.call('get', KEYS[1]) == ARGV[1] then
                return redis.call('del', KEYS[1])
            else
                return 0
            end
            """
            self.client.eval(script, 1, self.lock_name, self.token)
            self.client.delete(self.lock_key)
    
    def __enter__(self):
        if not self.acquire():
            raise RuntimeError("Failed to acquire lock")
        return self
    
    def __exit__(self, *args):
        self.release()
        return False


# 使用示例
with ReentrantLock(client, "resource") as lock:
    with ReentrantLock(client, "resource") as lock2:  # 重入
        # 同一线程可以多次获取
        access_resource()

常见问题

1. 锁过期但任务未完成

def process_with_lock(client, task_id):
    """
    锁过期但任务未完成的处理
    方案:使用看门狗自动续期
    """
    lock = RedisLock(client, f"task:{task_id}", timeout=30)
    
    # 后台线程续期
    def watchdog():
        while True:
            time.sleep(10)
            if lock.token:
                lock.extend(30)
    
    # 启动续期线程
    import threading
    thread = threading.Thread(target=watchdog, daemon=True)
    thread.start()
    
    try:
        lock.acquire()
        execute_task(task_id)
    finally:
        lock.release()

2. 分布式环境下时钟不同步

# 使用 RedLock 减轻时钟问题
# 或使用 Redis 实例时间而非本地时间

3. 主从切换导致锁丢失

# 使用 RedLock 或 Redisson
# Redisson 内置了完善的分布式锁实现

使用场景示例

订单处理

def process_order(order_id):
    """处理订单,防止重复处理"""
    lock_key = f"order:process:{order_id}"
    
    with RedisLock(client, lock_key, timeout=60) as lock:
        # 检查订单状态
        order = get_order(order_id)
        if order.status != 'pending':
            return
        
        # 处理订单
        process_payment(order)
        update_order_status(order_id, 'completed')

库存扣减

def deduct_stock(product_id, quantity):
    """扣减库存,防止超卖"""
    lock_key = f"stock:lock:{product_id}"
    
    with RedisLock(client, lock_key, timeout=10) as lock:
        stock = int(client.get(f"stock:{product_id}") or 0)
        
        if stock < quantity:
            raise ValueError("库存不足")
        
        new_stock = stock - quantity
        client.set(f"stock:{product_id}", new_stock)
        
        return new_stock

定时任务协调

def scheduled_task(task_name):
    """定时任务,确保单节点执行"""
    lock_key = f"scheduler:{task_name}"
    
    with RedisLock(client, lock_key, timeout=3600) as lock:
        execute_scheduled_task(task_name)

下一步

接下来让我们学习 Redis Lua 脚本。

👉 Lua 脚本