MongoDB聚合:$merge 阶段(2)
$merge
的用途是把聚合管道产生的结果写入指定的集合,有时候可以用$merge
来做物化视图。下面是$merge
的一些例子。
举例
按需物化视图:创建集合
当输出集合不存在时,$merge
将自动创建。首先在zoo
数据库的salaries
集合中填充员工和部门历史数据:
db.getSiblingDB("zoo").salaries.insertMany([
{ "_id" : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 },
{ "_id" : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 },
{ "_id" : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 },
{ "_id" : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 },
{ "_id" : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 },
{ "_id" : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 },
{ "_id" : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 },
{ "_id" : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 },
{ "_id" : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 },
{ "_id" : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }
])
然后,使用$group
和$merge
管道阶段,在reporting
数据库中创建一个名为budgets
的集合。
注意:
- 对于复制集或标准部署的情况,如果输出数据库不存在会自动创建数据库
- 对于分片集群部署的情况,要求输出指定的数据库必须已经存在。
db.getSiblingDB("zoo").salaries.aggregate( [
{ $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, salaries: { $sum: "$salary" } } },
{ $merge : { into: { db: "reporting", coll: "budgets" }, on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
] )
$group
阶段根据fiscal_year
和dept
对salaries进行分组$merge
阶段将$group
阶段处理的结果输出到reporting
数据库的budgets
集合。
此时,budegets
将包含下面的文档:
{ "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 }
{ "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 125000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 310000 }
按需物化视图:更新/替换数据
下面的例子继续使用上面例子的数据,salaries
集合包含了员工薪酬和部门的历史数据:
{ "_id" : 1, "employee": "Ant", "dept": "A", "salary": 100000, "fiscal_year": 2017 },
{ "_id" : 2, "employee": "Bee", "dept": "A", "salary": 120000, "fiscal_year": 2017 },
{ "_id" : 3, "employee": "Cat", "dept": "Z", "salary": 115000, "fiscal_year": 2017 },
{ "_id" : 4, "employee": "Ant", "dept": "A", "salary": 115000, "fiscal_year": 2018 },
{ "_id" : 5, "employee": "Bee", "dept": "Z", "salary": 145000, "fiscal_year": 2018 },
{ "_id" : 6, "employee": "Cat", "dept": "Z", "salary": 135000, "fiscal_year": 2018 },
{ "_id" : 7, "employee": "Gecko", "dept": "A", "salary": 100000, "fiscal_year": 2018 },
{ "_id" : 8, "employee": "Ant", "dept": "A", "salary": 125000, "fiscal_year": 2019 },
{ "_id" : 9, "employee": "Bee", "dept": "Z", "salary": 160000, "fiscal_year": 2019 },
{ "_id" : 10, "employee": "Cat", "dept": "Z", "salary": 150000, "fiscal_year": 2019 }
budgets
集合包含了年度累计预算:
{ "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 }
{ "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 125000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 310000 }
在当前财政年度(本例中为 2019 年),将新员工增加到slaaries
集合,并为下一年预分配一些人数:
db.getSiblingDB("zoo").salaries.insertMany([
{ "_id" : 11, employee: "Wren", dept: "Z", salary: 100000, fiscal_year: 2019 },
{ "_id" : 12, employee: "Zebra", dept: "A", salary: 150000, fiscal_year: 2019 },
{ "_id" : 13, employee: "headcount1", dept: "Z", salary: 120000, fiscal_year: 2020 },
{ "_id" : 14, employee: "headcount2", dept: "Z", salary: 120000, fiscal_year: 2020 }
])
下面的聚合将更新budgets
集合以反映新的薪酬信息:
db.getSiblingDB("zoo").salaries.aggregate( [
{ $match: { fiscal_year: 2019 }},
{ $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, employees: { $push: "$employee" } } },
{ $project: { _id: 0, dept: "$_id.dept", fiscal_year: "$_id.fiscal_year", employees: 1 } },
{ $merge : { into : { db: "reporting", coll: "orgArchive" }, on: [ "dept", "fiscal_year" ], whenMatched: "fail" } }
] )
其中:
$match
阶段查询出所有fiscal_year
大于等于2019
的文档。$group
阶段根据fiscal_year
和dept
字段对薪酬进行分组。$merge
将结果集写入到budgets
集合,替换相同_id
(fiscal_year
和dept
)的文档,本例中没有匹配到文档,所以只会插入新文档。
聚合运行后,查询budgets
集合的结果:
db.getSiblingDB("reporting").budgets.find().sort( { _id: 1 } )
budget
集合纳入了2019财年新的薪酬并新增了2020财年的新文档:
{ "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 }
{ "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 275000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 410000 }
{ "_id" : { "fiscal_year" : 2020, "dept" : "Z" }, "salaries" : 240000 }
只新增数据
为了确认$merge
没有覆盖集合的任何数据,设置whenMatched
来keepExisting
或fail
。下面是zoo
数据库的salaries
集合,包含了员工薪酬和部门历史数据:
{ "_id" : 1, "employee": "Ant", "dept": "A", "salary": 100000, "fiscal_year": 2017 },
{ "_id" : 2, "employee": "Bee", "dept": "A", "salary": 120000, "fiscal_year": 2017 },
{ "_id" : 3, "employee": "Cat", "dept": "Z", "salary": 115000, "fiscal_year": 2017 },
{ "_id" : 4, "employee": "Ant", "dept": "A", "salary": 115000, "fiscal_year": 2018 },
{ "_id" : 5, "employee": "Bee", "dept": "Z", "salary": 145000, "fiscal_year": 2018 },
{ "_id" : 6, "employee": "Cat", "dept": "Z", "salary": 135000, "fiscal_year": 2018 },
{ "_id" : 7, "employee": "Gecko", "dept": "A", "salary": 100000, "fiscal_year": 2018 },
{ "_id" : 8, "employee": "Ant", "dept": "A", "salary": 125000, "fiscal_year": 2019 },
{ "_id" : 9, "employee": "Bee", "dept": "Z", "salary": 160000, "fiscal_year": 2019 },
{ "_id" : 10, "employee": "Cat", "dept": "Z", "salary": 150000, "fiscal_year": 2019 }
reporting
数据库中的orgArchive
集合包含了过去财年的部门组织记录。已归档的记录不能修改。
{ "_id" : ObjectId("5cd8c68261baa09e9f3622be"), "employees" : [ "Ant", "Gecko" ], "dept" : "A", "fiscal_year" : 2018 }
{ "_id" : ObjectId("5cd8c68261baa09e9f3622bf"), "employees" : [ "Ant", "Bee" ], "dept" : "A", "fiscal_year" : 2017 }
{ "_id" : ObjectId("5cd8c68261baa09e9f3622c0"), "employees" : [ "Bee", "Cat" ], "dept" : "Z", "fiscal_year" : 2018 }
{ "_id" : ObjectId("5cd8c68261baa09e9f3622c1"), "employees" : [ "Cat" ], "dept" : "Z", "fiscal_year" : 2017 }
在orgArchive
集合创建一个由fiscal_year
和dept
构成的复合唯一索引,也就是说相同的财年和部门最多只有一条记录:
db.getSiblingDB("reporting").orgArchive.createIndex ( { fiscal_year: 1, dept: 1 }, { unique: true } )
在2019财年结束时,salaries
集合包含下面的文档:
{ "_id" : 1, "employee" : "Ant", "dept" : "A", "salary" : 100000, "fiscal_year" : 2017 }
{ "_id" : 2, "employee" : "Bee", "dept" : "A", "salary" : 120000, "fiscal_year" : 2017 }
{ "_id" : 3, "employee" : "Cat", "dept" : "Z", "salary" : 115000, "fiscal_year" : 2017 }
{ "_id" : 4, "employee" : "Ant", "dept" : "A", "salary" : 115000, "fiscal_year" : 2018 }
{ "_id" : 5, "employee" : "Bee", "dept" : "Z", "salary" : 145000, "fiscal_year" : 2018 }
{ "_id" : 6, "employee" : "Cat", "dept" : "Z", "salary" : 135000, "fiscal_year" : 2018 }
{ "_id" : 7, "employee" : "Gecko", "dept" : "A", "salary" : 100000, "fiscal_year" : 2018 }
{ "_id" : 8, "employee" : "Ant", "dept" : "A", "salary" : 125000, "fiscal_year" : 2019 }
{ "_id" : 9, "employee" : "Bee", "dept" : "Z", "salary" : 160000, "fiscal_year" : 2019 }
{ "_id" : 10, "employee" : "Cat", "dept" : "Z", "salary" : 150000, "fiscal_year" : 2019 }
{ "_id" : 11, "employee" : "Wren", "dept" : "Z", "salary" : 100000, "fiscal_year" : 2019 }
{ "_id" : 12, "employee" : "Zebra", "dept" : "A", "salary" : 150000, "fiscal_year" : 2019 }
{ "_id" : 13, "employee" : "headcount1", "dept" : "Z", "salary" : 120000, "fiscal_year" : 2020 }
{ "_id" : 14, "employee" : "headcount2", "dept" : "Z", "salary" : 120000, "fiscal_year" : 2020 }
下面的聚合管道将更新orgArchive
集合2019财年的数据:
db.getSiblingDB("zoo").salaries.aggregate( [
{ $match: { fiscal_year: 2019 }},
{ $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, employees: { $push: "$employee" } } },
{ $project: { _id: 0, dept: "$_id.dept", fiscal_year: "$_id.fiscal_year", employees: 1 } },
{ $merge : { into : { db: "reporting", coll: "orgArchive" }, on: [ "dept", "fiscal_year" ], whenMatched: "fail" } }
] )
其中:
$match
阶段查询出所有fiscal_year
等于2019的文档。$group
阶段根据fiscal_year
和dept
对员工进行分组。$project
对_id
字段进行抑制,并增加单独的dept
和fiscal_year
字段。当文档通过$merge
阶段时,将自动产生一个新的_id
字段。$merge
根据dept
、fiscal_year
字段匹配到文档后将产生错误。
聚合运行后,orgArchive
集合将包含下列文档:
{ "_id" : ObjectId("5caccc6a66b22dd8a8cc419f"), "employees" : [ "Ahn", "Bess" ], "dept" : "A", "fiscal_year" : 2017 }
{ "_id" : ObjectId("5caccc6a66b22dd8a8cc419e"), "employees" : [ "Ahn", "Gee" ], "dept" : "A", "fiscal_year" : 2018 }
{ "_id" : ObjectId("5caccd0b66b22dd8a8cc438e"), "employees" : [ "Ahn", "Zeb" ], "dept" : "A", "fiscal_year" : 2019 }
{ "_id" : ObjectId("5caccc6a66b22dd8a8cc41a0"), "employees" : [ "Carl" ], "dept" : "Z", "fiscal_year" : 2017 }
{ "_id" : ObjectId("5caccc6a66b22dd8a8cc41a1"), "employees" : [ "Bess", "Carl" ], "dept" : "Z", "fiscal_year" : 2018 }
{ "_id" : ObjectId("5caccd0b66b22dd8a8cc438d"), "employees" : [ "Bess", "Carl", "Wen" ], "dept" : "Z", "fiscal_year" : 2019 }
说明:
若orgArchive
集合中已经存在2019年有两个部门"A"、"B"的文档,则聚合会因为键值重复执行失败,并且出错前已经插入的数据无法回滚。
若使用keepExisting
选项,则不会对已存在的文档产生影响,不会报错。若使用replace
处理,则会替换已存在的文档,也不会报错。
合并多个集合的结果
默认情况下,$merge
会覆盖目标集合中重复的文档。
在集合purchaseorders
中,插入季度和区域的订单信息:
db.purchaseorders.insertMany( [
{ _id: 1, quarter: "2019Q1", region: "A", qty: 200, reportDate: new Date("2019-04-01") },
{ _id: 2, quarter: "2019Q1", region: "B", qty: 300, reportDate: new Date("2019-04-01") },
{ _id: 3, quarter: "2019Q1", region: "C", qty: 700, reportDate: new Date("2019-04-01") },
{ _id: 4, quarter: "2019Q2", region: "B", qty: 300, reportDate: new Date("2019-07-01") },
{ _id: 5, quarter: "2019Q2", region: "C", qty: 1000, reportDate: new Date("2019-07-01") },
{ _id: 6, quarter: "2019Q2", region: "A", qty: 400, reportDate: new Date("2019-07-01") },
] )
在集合reportedsales
中插入季度和区域的销售报告信息:
db.reportedsales.insertMany( [
{ _id: 1, quarter: "2019Q1", region: "A", qty: 400, reportDate: new Date("2019-04-02") },
{ _id: 2, quarter: "2019Q1", region: "B", qty: 550, reportDate: new Date("2019-04-02") },
{ _id: 3, quarter: "2019Q1", region: "C", qty: 1000, reportDate: new Date("2019-04-05") },
{ _id: 4, quarter: "2019Q2", region: "B", qty: 500, reportDate: new Date("2019-07-02") },
] )
按照季度查看报告:
{ "_id" : "2019Q1", "sales" : 1950, "purchased" : 1200 }
{ "_id" : "2019Q2", "sales" : 500, "purchased" : 1700 }
可以使用$merge
把purchaseorders
和reportedsales
集合进行合并,得到一个新的quarterlyreport
集合,聚合:
db.purchaseorders.aggregate( [
{ $group: { _id: "$quarter", purchased: { $sum: "$qty" } } }, // 按季度对订单进行分组
{ $merge : { into: "quarterlyreport", on: "_id", whenMatched: "merge", whenNotMatched: "insert" } }
])
$group
阶段按季度进行分组,并使用$sum
对qty
累加,形成purchased
字段
{ "_id" : "2019Q2", "purchased" : 1700 }
{ "_id" : "2019Q1", "purchased" : 1200 }
$merge
阶段将文档写入quarterlyreport
集合,如果集合中有_id
相同的文档则会合并,否则会插入新文档。
查询quarterlyreport
集合文档数据:
db.quarterlyreport.find().sort( { _id: 1 } )
结果:
{ "_id" : "2019Q1", "sales" : 1200, "purchased" : 1200 }
{ "_id" : "2019Q2", "sales" : 1700, "purchased" : 1700 }
同样的,对reportedsales
运行聚合管道,并将销售结果合并到quarterlyreport
集合:
db.reportedsales.aggregate( [
{ $group: { _id: "$quarter", sales: { $sum: "$qty" } } }, // 按季度对销售额汇总
{ $merge : { into: "quarterlyreport", on: "_id", whenMatched: "merge", whenNotMatched: "insert" } }
])
$group
阶段按quarter进行分组并使用$sum
将qty
的合计值放到sales
字段,得到:
{ "_id" : "2019Q2", "sales" : 500 }
{ "_id" : "2019Q1", "sales" : 1950 }
merge
阶段将文档输出到quarterlyreport
集合,如果集合有相同_id
(季度)的文档则进行合并,否则就插入新文档。
查询quarterlyreport
的数据:
db.quarterlyreport.find().sort( { _id: 1 } )
可以看到集合包含下面的文档:
{ "_id" : "2019Q1", "sales" : 1950, "purchased" : 1200 }
{ "_id" : "2019Q2", "sales" : 500, "purchased" : 1700 }
使用管道定制合并
在匹配到文档时,$merge
也可以使用自定义更新管道,whenMatched
管道可以包含下面的这些阶段:
$addFields
及其别名$set
$projecct
及其别名$unset
$replaceRoot
及其别名$replaceWith
下面的例子中,创建一个votes
集合,包含了日常选票数据:
db.votes.insertMany( [
{ date: new Date("2019-05-01"), "thumbsup" : 1, "thumbsdown" : 1 },
{ date: new Date("2019-05-02"), "thumbsup" : 3, "thumbsdown" : 1 },
{ date: new Date("2019-05-03"), "thumbsup" : 1, "thumbsdown" : 1 },
{ date: new Date("2019-05-04"), "thumbsup" : 2, "thumbsdown" : 2 },
{ date: new Date("2019-05-05"), "thumbsup" : 6, "thumbsdown" : 10 },
{ date: new Date("2019-05-06"), "thumbsup" : 13, "thumbsdown" : 16 }
] )
另外,创建一个monthlytotals
集合,包含有最新的每月总票数:
db.monthlytotals.insertOne(
{ "_id" : "2019-05", "thumbsup" : 26, "thumbsdown" : 31 }
)
最后,在创建一个votes
集合,插入按日的选票数据:
db.votes.insertOne(
{ date: new Date("2019-05-07"), "thumbsup" : 14, "thumbsdown" : 10 }
)
下面使用$merge
的自定义管道,更新monthlytotals
集合中已经存在的文档:
db.votes.aggregate([
{ $match: { date: { $gte: new Date("2019-05-07"), $lt: new Date("2019-05-08") } } },
{ $project: { _id: { $dateToString: { format: "%Y-%m", date: "$date" } }, thumbsup: 1, thumbsdown: 1 } },
{ $merge: {
into: "monthlytotals",
on: "_id",
whenMatched: [
{ $addFields: {
thumbsup: { $add:[ "$thumbsup", "$$new.thumbsup" ] },
thumbsdown: { $add: [ "$thumbsdown", "$$new.thumbsdown" ] }
} } ],
whenNotMatched: "insert"
} }
])
其中:
-
$match
阶段查询指定日期的选票:{ "_id" : ObjectId("5ce6097c436eb7e1203064a6"), "date" : ISODate("2019-05-07T00:00:00Z"), "thumbsup" : 14, "thumbsdown" : 10 }
-
$project
阶段,设置_id
字段为年-月字符串:\{ "thumbsup" : 14, "thumbsdown" : 10, "_id" : "2019-05" }
-
$merge
阶段,将文档写入monthlytotals
集合,如果存在_id
匹配到的文档,则使用管道添加thumbsup
和thumbsdown
的投票- 管道不能直接访问聚合结果的字段,要访问
thumbsup
和thumbsdown
字段,需要使用$$new
变量,如:$$new.thumbsup
和$new.thumbsdown
。 - 对于集合中已存在的文档,管道可以直接访问
thumbsup
和thumbsdown
字段,如:$thumbsup
和$thumbsdown
。
- 管道不能直接访问聚合结果的字段,要访问
聚合运行后,使用下面的指令查询monthlytotals
集合数据:
db.monthlytotals.find()
结果:
{ "_id" : "2019-05", "thumbsup" : 40, "thumbsdown" : 41 }
使用变量自定义合并
在$merge
阶段的whenMatched
字段,还可以使用变量,但变量在使用前必须提前定义,定义字段有两种方式:
- 在
$merge
阶段使用let
进行定义 - 使用聚合命令
let
(从MongoDB5.0开始支持)
在whenMatched
中使用变量,必须以$$
符号为前缀指定变量名$$<variable_name>
,如:$$year
。如果变量是文档,也可以包含文档字段,格式为$$<变量名>.<字段>
。例如,$$year.month
。
在Merge阶段使用变量
在$merge
阶段使用let
定义变量,并在whenMatched
字段使用变量:
db.cakeSales.insertOne( [
{ _id: 1, flavor: "chocolate", salesTotal: 1580,
salesTrend: "up" }
] )
db.runCommand( {
aggregate: db.cakeSales.getName(),
pipeline: [ {
$merge: {
into: db.cakeSales.getName(),
let : { year: "2020" },
whenMatched: [ {
$addFields: { "salesYear": "$$year" }
} ]
}
} ],
cursor: {}
} )
db.cakeSales.find()
说明:
- 创建
cakeSales
集合并插入数据 - 运行聚合执行,指令中使用
let
定义了year
变量,并在whenMatched
中把year
赋值给字段salesYear
。 - 查询
cakeSales
集合文档
输出:
{ "_id" : 1, "flavor" : "chocolate", "salesTotal" : 1580,
"salesTrend" : "up", "salesYear" : "2020" }
在聚合命令中使用变量
从Mongodb5.0开始,可以在聚合命令中使用let
定义变量,并在$merge
阶段的whenMatched
字段中引用。
举例:
db.cakeSales.insertOne(
{ _id: 1, flavor: "chocolate", salesTotal: 1580,
salesTrend: "up" }
)
db.runCommand( {
aggregate: db.cakeSales.getName(),
pipeline: [ {
$merge: {
into: db.cakeSales.getName(),
whenMatched: [ {
$addFields: { "salesYear": "$$year" } }
] }
}
],
cursor: {},
let : { year: "2020" }
} )
db.cakeSales.find()
说明:
- 创建一个
cakeSales
集合并插入数据 - 运行聚合命令,使用
let
定义一个year
变量,在whenMatched
中将year
变量赋值给salesYear
字段。 - 显示
cakeSales
文档
输出:
{ "_id" : 1, "flavor" : "chocolate", "salesTotal" : 1580,
"salesTrend" : "up", "salesYear" : "2020" }
同时在Merge和聚合命令中定义变量
可以同时在Merge阶段和命令中定义变量。如果在$merge
阶段和聚合命令中定义了同名的变量,则优先$merge
阶段的变量。
在下面的例子中,$merge
节点定义了year
变量,值为"2020",另外在聚合命令中也定义了year
变量,值为"2019",运行下面的命令:
db.cakeSales.insertOne(
{ _id: 1, flavor: "chocolate", salesTotal: 1580,
salesTrend: "up" }
)
db.runCommand( {
aggregate: db.cakeSales.getName(),
pipeline: [ {
$merge: {
into: db.cakeSales.getName(),
let : { year: "2020" },
whenMatched: [ {
$addFields: { "salesYear": "$$year" }
} ]
}
} ],
cursor: {},
let : { year: "2019" }
} )
db.cakeSales.find()
结果:
{
_id: 1,
flavor: 'chocolate',
salesTotal: 1580,
salesTrend: 'up',
salesYear: '2020'
}
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!