第六章:聚合管道
最后更新: 2024-01-01
作者: MongoDB Team
页面目录
第六章:聚合管道
使用 MongoDB 聚合框架进行复杂数据处理和分析
6.1 聚合管道概述
┌─────────────────────────────────────────────────────────────────┐
│ 聚合管道流程 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ $match │───▶│ $group │───▶│ $sort │───▶│ $project│ │
│ │ 过滤 │ │ 分组 │ │ 排序 │ │ 投影 │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ 输入 ─────────────────────────────────────────────────▶ 输出 │
│ │
└─────────────────────────────────────────────────────────────────┘
基本语法
db.collection.aggregate([
{ $stage1: { ... } },
{ $stage2: { ... } },
{ $stage3: { ... } }
], { options })
6.2 $match - 过滤阶段
// 基本过滤
db.orders.aggregate([
{ $match: { status: "completed" } }
])
// 多条件过滤
db.orders.aggregate([
{
$match: {
status: "completed",
created_at: { $gte: ISODate("2024-01-01") },
total_amount: { $gte: 100 }
}
}
])
// 在管道开头使用 $match 可以利用索引
db.orders.aggregate([
{ $match: { user_id: ObjectId("...") } },
{ $sort: { created_at: -1 } }
])
6.3 $group - 分组阶段
基本分组
// 统计文档数量
db.orders.aggregate([
{ $group: { _id: null, count: { $sum: 1 } } }
])
// 按字段分组
db.orders.aggregate([
{ $group: { _id: "$status", count: { $sum: 1 } } }
])
// 按多个字段分组
db.orders.aggregate([
{
$group: {
_id: { status: "$status", payment_method: "$payment.method" },
count: { $sum: 1 },
total: { $sum: "$total_amount" }
}
}
])
聚合累加器
| 累加器 | 说明 | 示例 |
|---|---|---|
$sum |
求和 | { $sum: "$amount" } |
$avg |
平均值 | { $avg: "$score" } |
$min |
最小值 | { $min: "$price" } |
$max |
最大值 | { $max: "$price" } |
$push |
添加到数组 | { $push: "$product" } |
$addToSet |
添加到集合(去重) | { $addToSet: "$category" } |
$first |
第一条 | { $first: "$name" } |
$last |
最后一条 | { $last: "$date" } |
$stdDevPop |
总体标准差 | { $stdDevPop: "$price" } |
$stdDevSamp |
样本标准差 | { $stdDevSamp: "$price" } |
// 订单统计
db.orders.aggregate([
{
$group: {
_id: "$user_id",
total_orders: { $sum: 1 },
total_spent: { $sum: "$total_amount" },
avg_order_value: { $avg: "$total_amount" },
min_order: { $min: "$total_amount" },
max_order: { $max: "$total_amount" },
first_order_date: { $first: "$created_at" },
last_order_date: { $last: "$created_at" }
}
}
])
// 收集分组内的值
db.orders.aggregate([
{
$group: {
_id: "$status",
orders: { $push: { id: "$_id", amount: "$total_amount" } }
}
}
])
6.4 $project - 投影阶段
基本投影
// 重命名字段
db.orders.aggregate([
{
$project: {
order_id: "$_id",
user_id: 1,
total: "$total_amount",
_id: 0
}
}
])
// 排除字段
db.orders.aggregate([
{
$project: {
shipping_address: 0,
payment_info: 0
}
}
])
字段操作
// 创建计算字段
db.orders.aggregate([
{
$project: {
order_number: 1,
subtotal: "$total_amount",
tax: { $multiply: ["$total_amount", 0.1] },
total: {
$add: ["$total_amount", { $multiply: ["$total_amount", 0.1] }]
}
}
}
])
// 使用表达式
db.users.aggregate([
{
$project: {
full_name: { $concat: ["$first_name", " ", "$last_name"] },
age_group: {
$switch: {
branches: [
{ case: { $lt: ["$age", 18] }, then: "未成年" },
{ case: { $lt: ["$age", 30] }, then: "青年" },
{ case: { $lt: ["$age", 60] }, then: "中年" }
],
default: "老年"
}
}
}
}
])
常用表达式操作符
// 算术表达式
{ $add: [expr1, expr2, ...] } // 加法
{ $subtract: [expr1, expr2] } // 减法
{ $multiply: [expr1, expr2, ...] } // 乘法
{ $divide: [expr1, expr2] } // 除法
{ $mod: [expr1, expr2] } // 取模
{ $pow: [expr, exponent] } // 幂运算
{ $abs: expr } // 绝对值
{ $ceil: expr } // 向上取整
{ $floor: expr } // 向下取整
{ $round: [expr, decimals] } // 四舍五入
// 字符串表达式
{ $concat: [str1, str2, ...] } // 字符串连接
{ $substr: [str, start, length] } // 子字符串(已废弃)
{ $substrBytes: [str, start, length] } // 字节截取
{ $split: [str, delimiter] } // 分割字符串
{ $toLower: str } // 转小写
{ $toUpper: str } // 转大写
{ $trim: { string: str } } // 去除首尾空白
{ $ltrim: { string: str } } // 去除首部空白
{ $rtrim: { string: str } } // 去除尾部空白
{ $regexFind: { input: str, regex: pattern } } // 正则匹配
6.5 $sort - 排序阶段
// 单字段排序
db.orders.aggregate([
{ $sort: { created_at: -1 } }
])
// 多字段排序
db.orders.aggregate([
{
$sort: {
status: 1, // 先按状态升序
created_at: -1 // 再按创建时间降序
}
}
])
// 与其他阶段结合
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $group: { _id: "$user_id", total: { $sum: "$total_amount" } } },
{ $sort: { total: -1 } },
{ $limit: 10 } // Top 10 客户
])
6.6 $limit 和 $skip - 分页
// 获取前 N 条
db.orders.aggregate([
{ $sort: { created_at: -1 } },
{ $limit: 10 }
])
// 分页查询
const page = 2
const pageSize = 20
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $sort: { created_at: -1 } },
{ $skip: (page - 1) * pageSize },
{ $limit: pageSize }
])
6.7 $lookup - 表关联
基本语法
// 单字段关联
db.orders.aggregate([
{
$lookup: {
from: "users", // 关联的集合
localField: "user_id", // 本地字段
foreignField: "_id", // 外部集合字段
as: "user_info" // 输出字段名
}
}
])
示例:订单关联用户
// 订单集合中的 user_id 关联用户集合的 _id
db.orders.aggregate([
{
$lookup: {
from: "users",
localField: "user_id",
foreignField: "_id",
as: "user"
}
},
{ $unwind: "$user" }, // 展开数组
{
$project: {
order_number: 1,
total_amount: 1,
"user.username": 1,
"user.email": 1
}
}
])
关联同一集合
// 关联同一集合(自关联)
db.employees.aggregate([
{
$lookup: {
from: "employees",
localField: "manager_id",
foreignField: "_id",
as: "manager"
}
},
{
$project: {
name: 1,
title: 1,
manager_name: { $arrayElemAt: ["$manager.name", 0] }
}
}
])
复杂关联条件
// pipeline 形式的 $lookup(MongoDB 5.0+)
db.orders.aggregate([
{
$lookup: {
from: "products",
let: { order_product_id: "$product_id" },
pipeline: [
{
$match: {
$expr: { $eq: ["$_id", "$$order_product_id"] }
}
},
{ $project: { name: 1, price: 1, category: 1 } }
],
as: "product_info"
}
}
])
6.8 $unwind - 展开数组
// 基本展开
db.orders.aggregate([
{ $unwind: "$items" },
{
$project: {
order_id: "$_id",
product: "$items.product",
quantity: "$items.quantity"
}
}
])
// 保留数组索引
db.orders.aggregate([
{ $unwind: { path: "$items", includeArrayIndex: "index" } }
])
// 过滤空数组
db.orders.aggregate([
{ $unwind: { path: "$items", preserveNullAndEmptyArrays: false } }
])
6.9 $bucket 和 $bucketAuto - 分桶
// $bucket - 固定边界分桶
db.sales.aggregate([
{
$bucket: {
groupBy: "$amount",
boundaries: [0, 100, 500, 1000, 5000, Infinity],
default: "Other",
output: {
count: { $sum: 1 },
sales: { $push: "$amount" }
}
}
}
])
// $bucketAuto - 自动分桶
db.sales.aggregate([
{
$bucketAuto: {
groupBy: "$amount",
buckets: 5, // 自动分成 5 个桶
output: {
count: { $sum: 1 },
min: { $min: "$amount" },
max: { $max: "$amount" },
avg: { $avg: "$amount" }
}
}
}
])
6.10 $facet - 多管道并行
db.orders.aggregate([
{
$facet: {
// 子管道 1:按状态统计
status_summary: [
{ $group: { _id: "$status", count: { $sum: 1 } } }
],
// 子管道 2:月度趋势
monthly_trend: [
{
$group: {
_id: {
year: { $year: "$created_at" },
month: { $month: "$created_at" }
},
count: { $sum: 1 },
total: { $sum: "$total_amount" }
}
},
{ $sort: { "_id.year": 1, "_id.month": 1 } }
],
// 子管道 3:Top 10 商品
top_products: [
{ $unwind: "$items" },
{
$group: {
_id: "$items.product_id",
total_quantity: { $sum: "$items.quantity" }
}
},
{ $sort: { total_quantity: -1 } },
{ $limit: 10 }
]
}
}
])
6.11 $graphLookup - 图查询
// 查找组织架构(递归查询)
db.employees.aggregate([
{
$graphLookup: {
from: "employees",
startWith: "$_id",
connectFromField: "_id",
connectToField: "manager_id",
as: "reporting_chain",
maxDepth: 10,
depthField: "level"
}
}
])
// 查找社交网络
db.users.aggregate([
{ $match: { username: "alice" } },
{
$graphLookup: {
from: "friendships",
startWith: "$_id",
connectFromField: "friend_id",
connectToField: "user_id",
as: "friends_of_friends",
maxDepth: 2,
restrictSearchWithMatch: { status: "accepted" }
}
}
])
6.12 聚合选项
// 添加选项
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $group: { _id: "$user_id", total: { $sum: "$total_amount" } } }
], {
allowDiskUse: true, // 允许使用磁盘存储中间结果
maxTimeMS: 60000, // 最大执行时间(毫秒)
batchSize: 100, // 每批返回文档数
comment: "customer_stats", // 注释(用于调试)
hint: { status: 1, date: -1 } // 强制使用特定索引
})
6.13 Map-Reduce(不推荐)
Map-Reduce 仍被支持,但聚合管道是更好的选择
// Map 函数
const mapFunction = function() {
emit(this.category, this.amount)
}
// Reduce 函数
const reduceFunction = function(key, values) {
return Array.sum(values)
}
// 执行 Map-Reduce
db.orders.mapReduce(
mapFunction,
reduceFunction,
{
out: { inline: 1 },
query: { status: "completed" }
}
)
💡 实践提示
- 管道顺序优化:将
$match放在前面减少后续处理的数据量 - 使用
$limit限制:在早期限制文档数量 $lookup性能:关联字段应该有索引- 内存限制:默认聚合管道使用 100MB 内存限制,使用
allowDiskUse扩展 - 索引利用:在
$match阶段使用索引
📚 继续学习