第六章:聚合管道

最后更新: 2024-01-01 作者: MongoDB Team
页面目录

第六章:聚合管道

使用 MongoDB 聚合框架进行复杂数据处理和分析

6.1 聚合管道概述

┌─────────────────────────────────────────────────────────────────┐
│                      聚合管道流程                                │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐     │
│  │  $match │───▶│  $group │───▶│  $sort  │───▶│ $project│     │
│  │  过滤   │    │   分组  │    │   排序  │    │  投影   │     │
│  └─────────┘    └─────────┘    └─────────┘    └─────────┘     │
│                                                                 │
│  输入 ─────────────────────────────────────────────────▶ 输出  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

基本语法

db.collection.aggregate([
  { $stage1: { ... } },
  { $stage2: { ... } },
  { $stage3: { ... } }
], { options })

6.2 $match - 过滤阶段

// 基本过滤
db.orders.aggregate([
  { $match: { status: "completed" } }
])

// 多条件过滤
db.orders.aggregate([
  {
    $match: {
      status: "completed",
      created_at: { $gte: ISODate("2024-01-01") },
      total_amount: { $gte: 100 }
    }
  }
])

// 在管道开头使用 $match 可以利用索引
db.orders.aggregate([
  { $match: { user_id: ObjectId("...") } },
  { $sort: { created_at: -1 } }
])

6.3 $group - 分组阶段

基本分组

// 统计文档数量
db.orders.aggregate([
  { $group: { _id: null, count: { $sum: 1 } } }
])

// 按字段分组
db.orders.aggregate([
  { $group: { _id: "$status", count: { $sum: 1 } } }
])

// 按多个字段分组
db.orders.aggregate([
  {
    $group: {
      _id: { status: "$status", payment_method: "$payment.method" },
      count: { $sum: 1 },
      total: { $sum: "$total_amount" }
    }
  }
])

聚合累加器

累加器 说明 示例
$sum 求和 { $sum: "$amount" }
$avg 平均值 { $avg: "$score" }
$min 最小值 { $min: "$price" }
$max 最大值 { $max: "$price" }
$push 添加到数组 { $push: "$product" }
$addToSet 添加到集合(去重) { $addToSet: "$category" }
$first 第一条 { $first: "$name" }
$last 最后一条 { $last: "$date" }
$stdDevPop 总体标准差 { $stdDevPop: "$price" }
$stdDevSamp 样本标准差 { $stdDevSamp: "$price" }
// 订单统计
db.orders.aggregate([
  {
    $group: {
      _id: "$user_id",
      total_orders: { $sum: 1 },
      total_spent: { $sum: "$total_amount" },
      avg_order_value: { $avg: "$total_amount" },
      min_order: { $min: "$total_amount" },
      max_order: { $max: "$total_amount" },
      first_order_date: { $first: "$created_at" },
      last_order_date: { $last: "$created_at" }
    }
  }
])

// 收集分组内的值
db.orders.aggregate([
  {
    $group: {
      _id: "$status",
      orders: { $push: { id: "$_id", amount: "$total_amount" } }
    }
  }
])

6.4 $project - 投影阶段

基本投影

// 重命名字段
db.orders.aggregate([
  {
    $project: {
      order_id: "$_id",
      user_id: 1,
      total: "$total_amount",
      _id: 0
    }
  }
])

// 排除字段
db.orders.aggregate([
  {
    $project: {
      shipping_address: 0,
      payment_info: 0
    }
  }
])

字段操作

// 创建计算字段
db.orders.aggregate([
  {
    $project: {
      order_number: 1,
      subtotal: "$total_amount",
      tax: { $multiply: ["$total_amount", 0.1] },
      total: {
        $add: ["$total_amount", { $multiply: ["$total_amount", 0.1] }]
      }
    }
  }
])

// 使用表达式
db.users.aggregate([
  {
    $project: {
      full_name: { $concat: ["$first_name", " ", "$last_name"] },
      age_group: {
        $switch: {
          branches: [
            { case: { $lt: ["$age", 18] }, then: "未成年" },
            { case: { $lt: ["$age", 30] }, then: "青年" },
            { case: { $lt: ["$age", 60] }, then: "中年" }
          ],
          default: "老年"
        }
      }
    }
  }
])

常用表达式操作符

// 算术表达式
{ $add: [expr1, expr2, ...] }      // 加法
{ $subtract: [expr1, expr2] }      // 减法
{ $multiply: [expr1, expr2, ...] }  // 乘法
{ $divide: [expr1, expr2] }       // 除法
{ $mod: [expr1, expr2] }          // 取模
{ $pow: [expr, exponent] }        // 幂运算
{ $abs: expr }                     // 绝对值
{ $ceil: expr }                    // 向上取整
{ $floor: expr }                   // 向下取整
{ $round: [expr, decimals] }      // 四舍五入

// 字符串表达式
{ $concat: [str1, str2, ...] }     // 字符串连接
{ $substr: [str, start, length] }  // 子字符串(已废弃)
{ $substrBytes: [str, start, length] }  // 字节截取
{ $split: [str, delimiter] }       // 分割字符串
{ $toLower: str }                   // 转小写
{ $toUpper: str }                   // 转大写
{ $trim: { string: str } }          // 去除首尾空白
{ $ltrim: { string: str } }         // 去除首部空白
{ $rtrim: { string: str } }        // 去除尾部空白
{ $regexFind: { input: str, regex: pattern } }  // 正则匹配

6.5 $sort - 排序阶段

// 单字段排序
db.orders.aggregate([
  { $sort: { created_at: -1 } }
])

// 多字段排序
db.orders.aggregate([
  {
    $sort: {
      status: 1,        // 先按状态升序
      created_at: -1    // 再按创建时间降序
    }
  }
])

// 与其他阶段结合
db.orders.aggregate([
  { $match: { status: "completed" } },
  { $group: { _id: "$user_id", total: { $sum: "$total_amount" } } },
  { $sort: { total: -1 } },
  { $limit: 10 }  // Top 10 客户
])

6.6 $limit 和 $skip - 分页

// 获取前 N 条
db.orders.aggregate([
  { $sort: { created_at: -1 } },
  { $limit: 10 }
])

// 分页查询
const page = 2
const pageSize = 20
db.orders.aggregate([
  { $match: { status: "completed" } },
  { $sort: { created_at: -1 } },
  { $skip: (page - 1) * pageSize },
  { $limit: pageSize }
])

6.7 $lookup - 表关联

基本语法

// 单字段关联
db.orders.aggregate([
  {
    $lookup: {
      from: "users",              // 关联的集合
      localField: "user_id",      // 本地字段
      foreignField: "_id",       // 外部集合字段
      as: "user_info"            // 输出字段名
    }
  }
])

示例:订单关联用户

// 订单集合中的 user_id 关联用户集合的 _id
db.orders.aggregate([
  {
    $lookup: {
      from: "users",
      localField: "user_id",
      foreignField: "_id",
      as: "user"
    }
  },
  { $unwind: "$user" },  // 展开数组
  {
    $project: {
      order_number: 1,
      total_amount: 1,
      "user.username": 1,
      "user.email": 1
    }
  }
])

关联同一集合

// 关联同一集合(自关联)
db.employees.aggregate([
  {
    $lookup: {
      from: "employees",
      localField: "manager_id",
      foreignField: "_id",
      as: "manager"
    }
  },
  {
    $project: {
      name: 1,
      title: 1,
      manager_name: { $arrayElemAt: ["$manager.name", 0] }
    }
  }
])

复杂关联条件

// pipeline 形式的 $lookup(MongoDB 5.0+)
db.orders.aggregate([
  {
    $lookup: {
      from: "products",
      let: { order_product_id: "$product_id" },
      pipeline: [
        {
          $match: {
            $expr: { $eq: ["$_id", "$$order_product_id"] }
          }
        },
        { $project: { name: 1, price: 1, category: 1 } }
      ],
      as: "product_info"
    }
  }
])

6.8 $unwind - 展开数组

// 基本展开
db.orders.aggregate([
  { $unwind: "$items" },
  {
    $project: {
      order_id: "$_id",
      product: "$items.product",
      quantity: "$items.quantity"
    }
  }
])

// 保留数组索引
db.orders.aggregate([
  { $unwind: { path: "$items", includeArrayIndex: "index" } }
])

// 过滤空数组
db.orders.aggregate([
  { $unwind: { path: "$items", preserveNullAndEmptyArrays: false } }
])

6.9 $bucket 和 $bucketAuto - 分桶

// $bucket - 固定边界分桶
db.sales.aggregate([
  {
    $bucket: {
      groupBy: "$amount",
      boundaries: [0, 100, 500, 1000, 5000, Infinity],
      default: "Other",
      output: {
        count: { $sum: 1 },
        sales: { $push: "$amount" }
      }
    }
  }
])

// $bucketAuto - 自动分桶
db.sales.aggregate([
  {
    $bucketAuto: {
      groupBy: "$amount",
      buckets: 5,  // 自动分成 5 个桶
      output: {
        count: { $sum: 1 },
        min: { $min: "$amount" },
        max: { $max: "$amount" },
        avg: { $avg: "$amount" }
      }
    }
  }
])

6.10 $facet - 多管道并行

db.orders.aggregate([
  {
    $facet: {
      // 子管道 1:按状态统计
      status_summary: [
        { $group: { _id: "$status", count: { $sum: 1 } } }
      ],
      // 子管道 2:月度趋势
      monthly_trend: [
        {
          $group: {
            _id: {
              year: { $year: "$created_at" },
              month: { $month: "$created_at" }
            },
            count: { $sum: 1 },
            total: { $sum: "$total_amount" }
          }
        },
        { $sort: { "_id.year": 1, "_id.month": 1 } }
      ],
      // 子管道 3:Top 10 商品
      top_products: [
        { $unwind: "$items" },
        {
          $group: {
            _id: "$items.product_id",
            total_quantity: { $sum: "$items.quantity" }
          }
        },
        { $sort: { total_quantity: -1 } },
        { $limit: 10 }
      ]
    }
  }
])

6.11 $graphLookup - 图查询

// 查找组织架构(递归查询)
db.employees.aggregate([
  {
    $graphLookup: {
      from: "employees",
      startWith: "$_id",
      connectFromField: "_id",
      connectToField: "manager_id",
      as: "reporting_chain",
      maxDepth: 10,
      depthField: "level"
    }
  }
])

// 查找社交网络
db.users.aggregate([
  { $match: { username: "alice" } },
  {
    $graphLookup: {
      from: "friendships",
      startWith: "$_id",
      connectFromField: "friend_id",
      connectToField: "user_id",
      as: "friends_of_friends",
      maxDepth: 2,
      restrictSearchWithMatch: { status: "accepted" }
    }
  }
])

6.12 聚合选项

// 添加选项
db.orders.aggregate([
  { $match: { status: "completed" } },
  { $group: { _id: "$user_id", total: { $sum: "$total_amount" } } }
], {
  allowDiskUse: true,          // 允许使用磁盘存储中间结果
  maxTimeMS: 60000,            // 最大执行时间(毫秒)
  batchSize: 100,              // 每批返回文档数
  comment: "customer_stats",   // 注释(用于调试)
  hint: { status: 1, date: -1 }  // 强制使用特定索引
})

6.13 Map-Reduce(不推荐)

Map-Reduce 仍被支持,但聚合管道是更好的选择

// Map 函数
const mapFunction = function() {
  emit(this.category, this.amount)
}

// Reduce 函数
const reduceFunction = function(key, values) {
  return Array.sum(values)
}

// 执行 Map-Reduce
db.orders.mapReduce(
  mapFunction,
  reduceFunction,
  {
    out: { inline: 1 },
    query: { status: "completed" }
  }
)

💡 实践提示

  1. 管道顺序优化:将 $match 放在前面减少后续处理的数据量
  2. 使用 $limit 限制:在早期限制文档数量
  3. $lookup 性能:关联字段应该有索引
  4. 内存限制:默认聚合管道使用 100MB 内存限制,使用 allowDiskUse 扩展
  5. 索引利用:在 $match 阶段使用索引

📚 继续学习