第十章:存储过程与触发器

最后更新: 2024-01-01 作者: PostgreSQL Team
页面目录

第十章:存储过程与触发器

10.1 PL/pgSQL 高级特性

10.1.1 数组操作

CREATE OR REPLACE FUNCTION process_user_tags(tag_array TEXT[])
RETURNS TABLE(tag TEXT, count INTEGER) AS $$
DECLARE
    tag_record RECORD;
BEGIN
    FOREACH tag IN ARRAY tag_array
    LOOP
        -- 统计每个标签
        SELECT COUNT(*) INTO tag_record.count
        FROM user_tags
        WHERE tag_name = tag;
        
        tag := tag;
        count := COALESCE(tag_record.count, 0);
        RETURN NEXT;
    END LOOP;
END;
$$ LANGUAGE plpgsql;

10.1.2 游标使用

CREATE OR REPLACE FUNCTION process_large_orders()
RETURNS VOID AS $$
DECLARE
    order_cursor CURSOR FOR 
        SELECT id, user_id, total FROM orders WHERE status = 'pending';
    order_row order_cursor%ROWTYPE;
BEGIN
    OPEN order_cursor;
    
    LOOP
        FETCH order_cursor INTO order_row;
        EXIT WHEN NOT FOUND;
        
        -- 处理每条订单
        PERFORM process_order(order_row.id);
    END LOOP;
    
    CLOSE order_cursor;
END;
$$ LANGUAGE plpgsql;

10.1.3 动态 SQL

CREATE OR REPLACE FUNCTION dynamic_query(table_name TEXT, column_name TEXT, value ANYELEMENT)
RETURNS TABLE(result JSON) AS $$
DECLARE
    sql_query TEXT;
BEGIN
    sql_query := format('SELECT row_to_json(t) FROM %I t WHERE %I = $1', table_name, column_name);
    
    RETURN QUERY EXECUTE sql_query USING value;
END;
$$ LANGUAGE plpgsql;

-- 使用 EXECUTE 执行 DDL
CREATE OR REPLACE FUNCTION create_archive_table(source_table TEXT)
RETURNS VOID AS $$
DECLARE
    archive_table TEXT := source_table || '_archive';
BEGIN
    EXECUTE format('CREATE TABLE IF NOT EXISTS %I (LIKE %I INCLUDING ALL)', archive_table, source_table);
END;
$$ LANGUAGE plpgsql;

10.2 高级触发器

10.2.1 触发器条件

-- 仅在特定列更新时触发
CREATE TRIGGER update_timestamp
BEFORE UPDATE OF email, phone ON users
FOR EACH ROW EXECUTE FUNCTION update_modified_column();

-- WHEN 子句条件
CREATE TRIGGER check_order_total
BEFORE INSERT ON orders
FOR EACH ROW
WHEN (NEW.total > 10000)
EXECUTE FUNCTION high_value_order_alert();

10.2.2 触发器链

-- 创建审计追踪表
CREATE TABLE audit_log (
    id SERIAL PRIMARY KEY,
    table_name TEXT NOT NULL,
    operation TEXT NOT NULL,
    old_data JSONB,
    new_data JSONB,
    changed_by TEXT,
    changed_at TIMESTAMPTZ DEFAULT NOW()
);

-- 通用审计触发器函数
CREATE OR REPLACE FUNCTION audit_changes()
RETURNS TRIGGER AS $$
BEGIN
    IF TG_OP = 'INSERT' THEN
        INSERT INTO audit_log (table_name, operation, new_data, changed_by)
        VALUES (TG_TABLE_NAME, TG_OP, row_to_json(NEW), current_user);
        RETURN NEW;
    ELSIF TG_OP = 'UPDATE' THEN
        INSERT INTO audit_log (table_name, operation, old_data, new_data, changed_by)
        VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD), row_to_json(NEW), current_user);
        RETURN NEW;
    ELSIF TG_OP = 'DELETE' THEN
        INSERT INTO audit_log (table_name, operation, old_data, changed_by)
        VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD), current_user);
        RETURN OLD;
    END IF;
    RETURN NULL;
END;
$$ LANGUAGE plpgsql;

-- 应用到多个表
CREATE TRIGGER users_audit AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW EXECUTE FUNCTION audit_changes();

CREATE TRIGGER orders_audit AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE FUNCTION audit_changes();

10.2.3 增量更新触发器

-- 创建库存表
CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    stock INTEGER DEFAULT 0,
    last_updated TIMESTAMPTZ DEFAULT NOW()
);

-- 库存变化日志
CREATE TABLE stock_changes (
    id SERIAL PRIMARY KEY,
    product_id INTEGER REFERENCES products(id),
    quantity_change INTEGER,
    reason TEXT,
    changed_at TIMESTAMPTZ DEFAULT NOW()
);

-- 更新库存的存储过程
CREATE OR REPLACE FUNCTION update_stock(
    p_product_id INTEGER,
    p_quantity INTEGER,
    p_reason TEXT
)
RETURNS VOID AS $$
BEGIN
    UPDATE products 
    SET stock = stock + p_quantity,
        last_updated = NOW()
    WHERE id = p_product_id;
    
    INSERT INTO stock_changes (product_id, quantity_change, reason)
    VALUES (p_product_id, p_quantity, p_reason);
    
    -- 检查库存
    IF (SELECT stock FROM products WHERE id = p_product_id) < 0 THEN
        RAISE WARNING '产品 % 库存为负数!', p_product_id;
    END IF;
END;
$$ LANGUAGE plpgsql;

-- 触发器调用
CREATE TRIGGER trg_update_stock
AFTER INSERT OR UPDATE ON stock_changes
FOR EACH ROW EXECUTE FUNCTION sync_product_stock();

CREATE OR REPLACE FUNCTION sync_product_stock()
RETURNS TRIGGER AS $$
BEGIN
    UPDATE products
    SET stock = (
        SELECT COALESCE(SUM(quantity_change), 0) 
        FROM stock_changes 
        WHERE product_id = NEW.product_id
    )
    WHERE id = NEW.product_id;
    
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

10.3 自动化任务

10.3.1 pg_cron 扩展

-- 安装 pg_cron
CREATE EXTENSION pg_cron;

-- 配置
ALTER SYSTEM SET cron.database_name = 'mydb';

-- 每天凌晨 2 点清理过期会话
SELECT cron.schedule('cleanup-sessions', '0 2 * * *', 
    $$DELETE FROM sessions WHERE expires_at < NOW()$$);

-- 每周一凌晨 3 点生成报告
SELECT cron.schedule('weekly-report', '0 3 * * 1', 
    $$SELECT generate_weekly_report()$$);

-- 每月 1 日凌晨 1 点归档数据
SELECT cron.schedule('archive-old-data', '0 1 1 * *', 
    $$SELECT archive_old_orders()$$);

-- 查看定时任务
SELECT * FROM cron.job;

-- 删除任务
SELECT cron.unschedule('cleanup-sessions');

10.3.2 使用 pg_agent(需要单独安装)

-- 在 pgAdmin 中创建 pgAgent 作业
-- 支持更复杂的调度:每 5 分钟、每小时等

10.4 批处理与数据转换

10.4.1 批量处理大表

CREATE OR REPLACE FUNCTION batch_update_status(
    batch_size INTEGER DEFAULT 1000
)
RETURNS INTEGER AS $$
DECLARE
    updated_count INTEGER := 0;
    min_id INTEGER;
    max_id INTEGER;
BEGIN
    -- 获取 ID 范围
    SELECT MIN(id), MAX(id) INTO min_id, max_id
    FROM orders
    WHERE status = 'pending' AND updated_at < NOW() - INTERVAL '30 days';
    
    -- 分批处理
    WHILE min_id <= max_id LOOP
        UPDATE orders
        SET status = 'expired',
            updated_at = NOW()
        WHERE id BETWEEN min_id AND min_id + batch_size - 1
            AND status = 'pending';
        
        updated_count := updated_count + SQL%ROWCOUNT;
        min_id := min_id + batch_size;
        
        -- 记录进度
        RAISE NOTICE '已更新 % 条记录', updated_count;
    END LOOP;
    
    RETURN updated_count;
END;
$$ LANGUAGE plpgsql;

10.4.2 数据清洗函数

CREATE OR REPLACE FUNCTION clean_phone_number(phone TEXT)
RETURNS TEXT AS $$
BEGIN
    -- 移除非数字字符
    RETURN regexp_replace(phone, '[^0-9]', '', 'g');
END;
$$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION normalize_email(email TEXT)
RETURNS TEXT AS $$
BEGIN
    -- 转换为小写,移除多余空格
    RETURN LOWER(TRIM(email));
END;
$$ LANGUAGE plpgsql;

-- 应用到数据
UPDATE users SET 
    phone = clean_phone_number(phone),
    email = normalize_email(email);

10.5 本章小结

主题 关键点
高级 PL/pgSQL 数组、游标、动态SQL
触发器 条件触发器、审计追踪
自动化 pg_cron 定时任务
批处理 分批更新、数据清洗

📌 下一章预告

下一章将介绍 PostgreSQL 的分区表机制。