第十章:存储过程与触发器
最后更新: 2024-01-01
作者: PostgreSQL Team
页面目录
第十章:存储过程与触发器
10.1 PL/pgSQL 高级特性
10.1.1 数组操作
CREATE OR REPLACE FUNCTION process_user_tags(tag_array TEXT[])
RETURNS TABLE(tag TEXT, count INTEGER) AS $$
DECLARE
tag_record RECORD;
BEGIN
FOREACH tag IN ARRAY tag_array
LOOP
-- 统计每个标签
SELECT COUNT(*) INTO tag_record.count
FROM user_tags
WHERE tag_name = tag;
tag := tag;
count := COALESCE(tag_record.count, 0);
RETURN NEXT;
END LOOP;
END;
$$ LANGUAGE plpgsql;
10.1.2 游标使用
CREATE OR REPLACE FUNCTION process_large_orders()
RETURNS VOID AS $$
DECLARE
order_cursor CURSOR FOR
SELECT id, user_id, total FROM orders WHERE status = 'pending';
order_row order_cursor%ROWTYPE;
BEGIN
OPEN order_cursor;
LOOP
FETCH order_cursor INTO order_row;
EXIT WHEN NOT FOUND;
-- 处理每条订单
PERFORM process_order(order_row.id);
END LOOP;
CLOSE order_cursor;
END;
$$ LANGUAGE plpgsql;
10.1.3 动态 SQL
CREATE OR REPLACE FUNCTION dynamic_query(table_name TEXT, column_name TEXT, value ANYELEMENT)
RETURNS TABLE(result JSON) AS $$
DECLARE
sql_query TEXT;
BEGIN
sql_query := format('SELECT row_to_json(t) FROM %I t WHERE %I = $1', table_name, column_name);
RETURN QUERY EXECUTE sql_query USING value;
END;
$$ LANGUAGE plpgsql;
-- 使用 EXECUTE 执行 DDL
CREATE OR REPLACE FUNCTION create_archive_table(source_table TEXT)
RETURNS VOID AS $$
DECLARE
archive_table TEXT := source_table || '_archive';
BEGIN
EXECUTE format('CREATE TABLE IF NOT EXISTS %I (LIKE %I INCLUDING ALL)', archive_table, source_table);
END;
$$ LANGUAGE plpgsql;
10.2 高级触发器
10.2.1 触发器条件
-- 仅在特定列更新时触发
CREATE TRIGGER update_timestamp
BEFORE UPDATE OF email, phone ON users
FOR EACH ROW EXECUTE FUNCTION update_modified_column();
-- WHEN 子句条件
CREATE TRIGGER check_order_total
BEFORE INSERT ON orders
FOR EACH ROW
WHEN (NEW.total > 10000)
EXECUTE FUNCTION high_value_order_alert();
10.2.2 触发器链
-- 创建审计追踪表
CREATE TABLE audit_log (
id SERIAL PRIMARY KEY,
table_name TEXT NOT NULL,
operation TEXT NOT NULL,
old_data JSONB,
new_data JSONB,
changed_by TEXT,
changed_at TIMESTAMPTZ DEFAULT NOW()
);
-- 通用审计触发器函数
CREATE OR REPLACE FUNCTION audit_changes()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
INSERT INTO audit_log (table_name, operation, new_data, changed_by)
VALUES (TG_TABLE_NAME, TG_OP, row_to_json(NEW), current_user);
RETURN NEW;
ELSIF TG_OP = 'UPDATE' THEN
INSERT INTO audit_log (table_name, operation, old_data, new_data, changed_by)
VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD), row_to_json(NEW), current_user);
RETURN NEW;
ELSIF TG_OP = 'DELETE' THEN
INSERT INTO audit_log (table_name, operation, old_data, changed_by)
VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD), current_user);
RETURN OLD;
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
-- 应用到多个表
CREATE TRIGGER users_audit AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW EXECUTE FUNCTION audit_changes();
CREATE TRIGGER orders_audit AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE FUNCTION audit_changes();
10.2.3 增量更新触发器
-- 创建库存表
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
stock INTEGER DEFAULT 0,
last_updated TIMESTAMPTZ DEFAULT NOW()
);
-- 库存变化日志
CREATE TABLE stock_changes (
id SERIAL PRIMARY KEY,
product_id INTEGER REFERENCES products(id),
quantity_change INTEGER,
reason TEXT,
changed_at TIMESTAMPTZ DEFAULT NOW()
);
-- 更新库存的存储过程
CREATE OR REPLACE FUNCTION update_stock(
p_product_id INTEGER,
p_quantity INTEGER,
p_reason TEXT
)
RETURNS VOID AS $$
BEGIN
UPDATE products
SET stock = stock + p_quantity,
last_updated = NOW()
WHERE id = p_product_id;
INSERT INTO stock_changes (product_id, quantity_change, reason)
VALUES (p_product_id, p_quantity, p_reason);
-- 检查库存
IF (SELECT stock FROM products WHERE id = p_product_id) < 0 THEN
RAISE WARNING '产品 % 库存为负数!', p_product_id;
END IF;
END;
$$ LANGUAGE plpgsql;
-- 触发器调用
CREATE TRIGGER trg_update_stock
AFTER INSERT OR UPDATE ON stock_changes
FOR EACH ROW EXECUTE FUNCTION sync_product_stock();
CREATE OR REPLACE FUNCTION sync_product_stock()
RETURNS TRIGGER AS $$
BEGIN
UPDATE products
SET stock = (
SELECT COALESCE(SUM(quantity_change), 0)
FROM stock_changes
WHERE product_id = NEW.product_id
)
WHERE id = NEW.product_id;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
10.3 自动化任务
10.3.1 pg_cron 扩展
-- 安装 pg_cron
CREATE EXTENSION pg_cron;
-- 配置
ALTER SYSTEM SET cron.database_name = 'mydb';
-- 每天凌晨 2 点清理过期会话
SELECT cron.schedule('cleanup-sessions', '0 2 * * *',
$$DELETE FROM sessions WHERE expires_at < NOW()$$);
-- 每周一凌晨 3 点生成报告
SELECT cron.schedule('weekly-report', '0 3 * * 1',
$$SELECT generate_weekly_report()$$);
-- 每月 1 日凌晨 1 点归档数据
SELECT cron.schedule('archive-old-data', '0 1 1 * *',
$$SELECT archive_old_orders()$$);
-- 查看定时任务
SELECT * FROM cron.job;
-- 删除任务
SELECT cron.unschedule('cleanup-sessions');
10.3.2 使用 pg_agent(需要单独安装)
-- 在 pgAdmin 中创建 pgAgent 作业
-- 支持更复杂的调度:每 5 分钟、每小时等
10.4 批处理与数据转换
10.4.1 批量处理大表
CREATE OR REPLACE FUNCTION batch_update_status(
batch_size INTEGER DEFAULT 1000
)
RETURNS INTEGER AS $$
DECLARE
updated_count INTEGER := 0;
min_id INTEGER;
max_id INTEGER;
BEGIN
-- 获取 ID 范围
SELECT MIN(id), MAX(id) INTO min_id, max_id
FROM orders
WHERE status = 'pending' AND updated_at < NOW() - INTERVAL '30 days';
-- 分批处理
WHILE min_id <= max_id LOOP
UPDATE orders
SET status = 'expired',
updated_at = NOW()
WHERE id BETWEEN min_id AND min_id + batch_size - 1
AND status = 'pending';
updated_count := updated_count + SQL%ROWCOUNT;
min_id := min_id + batch_size;
-- 记录进度
RAISE NOTICE '已更新 % 条记录', updated_count;
END LOOP;
RETURN updated_count;
END;
$$ LANGUAGE plpgsql;
10.4.2 数据清洗函数
CREATE OR REPLACE FUNCTION clean_phone_number(phone TEXT)
RETURNS TEXT AS $$
BEGIN
-- 移除非数字字符
RETURN regexp_replace(phone, '[^0-9]', '', 'g');
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION normalize_email(email TEXT)
RETURNS TEXT AS $$
BEGIN
-- 转换为小写,移除多余空格
RETURN LOWER(TRIM(email));
END;
$$ LANGUAGE plpgsql;
-- 应用到数据
UPDATE users SET
phone = clean_phone_number(phone),
email = normalize_email(email);
10.5 本章小结
| 主题 | 关键点 |
|---|---|
| 高级 PL/pgSQL | 数组、游标、动态SQL |
| 触发器 | 条件触发器、审计追踪 |
| 自动化 | pg_cron 定时任务 |
| 批处理 | 分批更新、数据清洗 |
📌 下一章预告
下一章将介绍 PostgreSQL 的分区表机制。