第十三章:Lua 脚本
深入了解 Redis Lua 脚本,实现复杂的原子操作和业务逻辑。
最后更新: 2024-01-15
页面目录
Redis Lua 脚本
Redis Lua 脚本允许在服务器端执行复杂的原子操作,是实现高性能业务逻辑的重要工具。
Lua 脚本概述
┌─────────────────────────────────────────────────────────────────┐
│ Redis Lua 脚本架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Client │
│ │ │
│ │ EVAL script numkeys keys[] args[] │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Redis Server │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────┐ │ │
│ │ │ Lua VM │ │ │
│ │ │ │ │ │
│ │ │ ┌─────────────────────────────────────────┐ │ │ │
│ │ │ │ Lua Script │ │ │ │
│ │ │ │ │ │ │ │
│ │ │ │ Redis.call('GET', KEYS[1]) │ │ │ │
│ │ │ │ Redis.call('SET', KEYS[1], ARGV[1]) │ │ │ │
│ │ │ │ │ │ │ │
│ │ │ └─────────────────────────────────────────┘ │ │ │
│ │ │ │ │ │
│ │ └─────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ 原子执行,不会被其他命令打断 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Lua 脚本特点
| 特点 | 说明 |
|---|---|
| 原子性 | 脚本执行期间不会被其他命令打断 |
| 高效 | 减少客户端与服务器往返次数 |
| 可复用 | 一次编译,多次执行 |
| 功能强大 | 实现复杂业务逻辑 |
基本命令
EVAL
# 基本语法
EVAL script numkeys key [key ...] arg [arg ...]
# 示例
EVAL "return redis.call('GET', KEYS[1])" 1 mykey
# 传参示例
EVAL "return redis.call('SET', KEYS[1], ARGV[1])" 1 mykey "hello"
# 多键示例
EVAL "return redis.call('MGET', KEYS[1], KEYS[2])" 2 key1 key2
EVALSHA
# 先加载脚本获取 SHA1
SCRIPT LOAD "return redis.call('GET', KEYS[1])"
# 返回: "c4a8c54c1f5d7d0f2b3e..."
# 使用 SHA1 执行
EVALSHA "c4a8c54c1f5d7d0f2b3e..." 1 mykey
其他命令
# 检查脚本是否存在
SCRIPT EXISTS sha1 [sha1 ...]
# 杀死运行中的脚本
SCRIPT KILL
# 清除脚本缓存
SCRIPT FLUSH
常用脚本示例
1. 分布式锁
-- 获取锁
-- KEYS[1] = 锁名
-- ARGV[1] = 过期时间(秒)
-- ARGV[2] = 唯一标识
local lock = redis.call('SETNX', KEYS[1], ARGV[2])
if lock == 1 then
redis.call('EXPIRE', KEYS[1], ARGV[1])
return 1
else
return 0
end
2. 释放锁(安全)
-- 释放锁(只能释放自己持有的锁)
-- KEYS[1] = 锁名
-- ARGV[1] = 唯一标识
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
3. 限流器
-- 滑动窗口限流
-- KEYS[1] = 限流键
-- ARGV[1] = 窗口大小(秒)
-- ARGV[2] = 最大请求数
local key = KEYS[1]
local window = tonumber(ARGV[1])
local limit = tonumber(ARGV[2])
local current = redis.call('ZCARD', key)
if current and current >= limit then
return 0
end
redis.call('ZADD', key, redis.call('TIME')[1], redis.call('TIME')[2])
redis.call('EXPIRE', key, window)
return 1
4. 计数器
-- 原子递增(带过期)
-- KEYS[1] = 计数器键
-- ARGV[1] = TTL(秒)
-- ARGV[2] = 步长
local key = KEYS[1]
local ttl = tonumber(ARGV[1])
local step = tonumber(ARGV[2]) or 1
local count = redis.call('INCRBY', key, step)
if count == step then
redis.call('EXPIRE', key, ttl)
end
return count
5. 商品库存扣减
-- 原子扣减库存
-- KEYS[1] = 库存键
-- ARGV[1] = 扣减数量
local stock = redis.call('GET', KEYS[1])
if not stock then
return -1 -- 商品不存在
end
stock = tonumber(stock)
local quantity = tonumber(ARGV[1])
if stock < quantity then
return -2 -- 库存不足
end
local new_stock = stock - quantity
redis.call('SET', KEYS[1], new_stock)
return new_stock
Python 中使用 Lua
直接执行
import redis
client = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 执行 Lua 脚本
script = """
local count = redis.call('INCR', KEYS[1])
if count == 1 then
redis.call('EXPIRE', KEYS[1], ARGV[1])
end
return count
"""
result = client.eval(script, 1, "counter", 3600)
print(f"计数器值: {result}")
预加载脚本
# 预加载脚本获取 SHA
script_sha = client.script_load("""
local count = redis.call('INCR', KEYS[1])
if count == 1 then
redis.call('EXPIRE', KEYS[1], ARGV[1])
end
return count
""")
# 执行预加载的脚本
result = client.evalsha(script_sha, 1, "counter", 3600)
脚本缓存自动加载
# Redis 会自动处理脚本缓存
# 使用 register_script 注册脚本
get_script = client.register_script("""
return redis.call('GET', KEYS[1])
""")
# 执行
result = get_script(keys=['mykey'])
高级用法
批量操作
-- 批量设置和获取
-- KEYS = [set_key1, set_key2, get_key1]
-- ARGV = [value1, value2]
-- 设置
redis.call('SET', KEYS[1], ARGV[1])
redis.call('SET', KEYS[2], ARGV[2])
-- 获取
local value = redis.call('GET', KEYS[3])
return value
条件操作
-- 条件设置(满足条件才设置)
-- KEYS[1] = 键
-- ARGV[1] = 新值
-- ARGV[2] = 条件值
local current = redis.call('GET', KEYS[1])
if current == ARGV[2] or current == nil then
redis.call('SET', KEYS[1], ARGV[1])
return 1
else
return 0
end
循环和遍历
-- 处理多个键
-- KEYS = [key1, key2, key3]
local results = {}
for i, key in ipairs(KEYS) do
local value = redis.call('GET', key)
table.insert(results, value)
end
return cjson.encode(results)
表操作
-- 使用 Redis 的表结构
-- 返回用户信息
local user = {
name = redis.call('HGET', KEYS[1], 'name'),
age = redis.call('HGET', KEYS[1], 'age'),
email = redis.call('HGET', KEYS[1], 'email')
}
return cjson.encode(user)
错误处理
pcall 调用
-- 使用 pcall 捕获错误
local ok, result = pcall(function()
return redis.call('GET', KEYS[1])
end)
if ok then
return result
else
return "Error: " .. result
end
条件检查
-- 检查键是否存在
if redis.call('EXISTS', KEYS[1]) == 0 then
return nil
end
-- 检查类型
if redis.call('TYPE', KEYS[1])['ok'] ~= 'string' then
return redis.error_reply("WRONGTYPE")
end
常用库
cjson 库
-- JSON 编码
local data = {name = "张三", age = 25}
local json_str = cjson.encode(data)
-- JSON 解码
local obj = cjson.decode(json_str)
print(obj.name)
cmsgpack 库
-- MessagePack 序列化(更高效)
local data = {name = "张三", age = 25}
local packed = cmsgpack.pack(data)
local unpacked = cmsgpack.unpack(packed)
性能考虑
| 考虑项 | 建议 |
|---|---|
| 脚本长度 | 保持简洁,避免过长脚本 |
| 执行时间 | 设置合理的超时时间 |
| 循环 | 避免大循环,尽量使用 Redis 原生命令 |
| 内存 | 注意脚本返回数据大小 |
超时配置
# 设置脚本最大执行时间(毫秒)
lua-time-limit 5000
完整示例
秒杀系统
-- 秒杀脚本
-- KEYS[1] = 库存键
-- KEYS[2] = 用户已购买记录键
-- ARGV[1] = 用户ID
-- ARGV[2] = 购买数量
-- ARGV[3] = 最大购买数量
-- 检查是否已购买
local bought = redis.call('SISMEMBER', KEYS[2], ARGV[1])
if bought == 1 then
return {-1, "Already purchased"}
end
-- 检查库存
local stock = tonumber(redis.call('GET', KEYS[1]) or 0)
local quantity = tonumber(ARGV[2])
if stock < quantity then
return {-2, "Insufficient stock"}
end
-- 扣减库存
local new_stock = stock - quantity
redis.call('SET', KEYS[1], new_stock)
-- 记录购买
redis.call('SADD', KEYS[2], ARGV[1])
return {new_stock, "Success"}
Python 调用
seckill_script = """
local stock_key = KEYS[1]
local user_key = KEYS[2]
local user_id = ARGV[1]
local quantity = tonumber(ARGV[2])
-- 检查是否已购买
if redis.call('SISMEMBER', user_key, user_id) == 1 then
return {-1, "Already purchased"}
end
-- 检查库存
local stock = tonumber(redis.call('GET', stock_key) or 0)
if stock < quantity then
return {-2, "Insufficient stock"}
end
-- 扣减库存
redis.call('SET', stock_key, stock - quantity)
redis.call('SADD', user_key, user_id)
return {stock - quantity, "Success"}
"""
result = client.eval(seckill_script, 2,
"stock:product:001",
"user:bought:product:001",
"user:12345", 1)
print(result)
下一步
接下来让我们学习 Redis 运维管理。
👉 运维管理