lynn是什么意思| 保险子是什么| 人丹是什么药| 优思明是什么药| 血压低会出现什么症状| 绚丽夺目的意思是什么| 大便颗粒状是什么原因造成的| 芍药花什么时候开花| 什么是直系亲属| 女人为什么会得霉菌| 哈西奈德溶液治什么病| 静脉曲张挂什么科| 口腔科主要看什么| 左耳烫代表什么预兆| 女人左眼跳是什么预兆| 未土是什么土| amp是什么| 中耳炎去药店买什么药| 苏麻为什么不嫁给康熙| 聪明的近义词是什么| 十月二十二是什么星座| 为什么会有血管瘤| 蛇冲什么生肖| 鼻子旁边的痣代表什么| 什么情况下需要根管治疗| 主动脉迂曲是什么意思| 斯德哥尔摩是什么意思| 肠道感染用什么抗生素| 咳嗽可以吃什么水果| 不孕不育应检查什么| 膝关节弹响是什么原因| 阴部毛变白是什么原因| 咆哮是什么意思| 出恭什么意思| 什么情况下会得甲亢| 咽炎咳嗽吃什么药| 人为什么会得甲母痣| 月指什么生肖| 什么是男人| 初潮什么意思| 胸口长痘痘是什么原因| alin是什么意思| 什么什么不安| 打开图片用什么软件| 胃泌素偏低是什么原因| 山东简称是什么| 嬛嬛一袅楚宫腰什么意思| 尿酸降低是什么意思| 千什么百什么| 查血糖血脂挂什么科| 眼压高滴什么眼药水| 刚柔并济是什么意思| 免疫缺陷是什么意思| 县常委什么级别| 11月2号是什么星座| 男票是什么意思| 乳清粉是什么| 女人吃芡实有什么好处| 良缘是什么意思| 自主能力是什么意思| 螺子黛是什么| 女真人是什么民族| 北边是什么生肖| 傲娇是什么意思| 喝酒吃头孢有什么反应| 肠胀气是什么原因引起的| 垂是什么意思| hoegaarden是什么啤酒| 贫血喝什么口服液| 来月经是黑色的是什么原因| 花卉是什么意思| hs医学上是什么意思| 女人为什么会叫床| 六月二十日是什么日子| 脐动脉2条是什么意思| 内蒙古代叫什么| 澳门使用什么货币| 白茶什么样的好| 很黄很暴力是什么意思| 什么时候同房容易怀孕| 拾荒者是什么意思| 独美是什么意思| 哈伦裤配什么上衣| 唯女子与小人难养也什么意思| 下眼睑跳动是什么原因| 学信网上的报告编号是什么| 甲状腺看什么科| 什么人容易长智齿| 恐惧症吃什么药最好| 217是什么意思| 母亲节要送什么礼物| 空腹不能吃什么| 胃酸吃什么| 憨包是什么意思| 子宫息肉有什么危害| 前列腺炎有什么症状表现| 葛根粉吃了有什么作用| 白带发黄有异味用什么药| 512是什么星座| 肺结节钙化是什么意思| 开飞机需要什么驾照| 脆豆腐是什么做的| 自诩是什么意思| 乌龟肺炎用什么药| 冲击波治疗有什么效果| 鱼露是什么| 眼压是什么意思| 深圳市市长什么级别| 喝白糖水有什么好处和坏处| 精液是什么味道的| 什么是省控线| 什么书最香| 慰安妇是什么| 内脂是什么| 糖尿病能吃什么零食| 小鹦鹉吃什么| 护照和签证有什么区别| 肝郁气滞吃什么药好| 血糖偏高能吃什么水果| 梦到前任预示着什么| 学中医需要什么学历| pt指什么| 我国最早的中医学专著是什么| 去皱纹用什么方法最好和最快| 情绪化什么意思| 风湿类风湿有什么区别| 长期熬夜有什么坏处| 手发胀是什么前兆| 高血压早餐吃什么好| 什么是传染性软疣| 满面红光是什么意思| 提心吊胆是什么生肖| 拉肚子吃什么药| 林俊杰为什么不结婚| 判官是什么意思| 女儿红属于什么酒| 缺钾吃什么药| 处级是什么级别| 一字之师是什么意思| 嗤之以鼻是什么意思| 黄花菜不能和什么一起吃| 什么是应激反应| 小叶增生吃什么药好| 71年出生属什么生肖| 三维重建是什么意思| 宰相相当于现在什么官| 藏红花泡水喝有什么功效和作用| 眼球出血是什么原因引起的| 经常胸闷是什么原因| 泉中水是什么生肖| 智齿是什么牙| 什么叫做焦虑症| 左眉毛上有痣代表什么| 虹膜是什么意思| 王母娘娘叫什么名字| 女人吃桃子有什么好处和坏处| 虫介念什么| 如日中天是什么生肖| 两棵树是什么牌子| 康普茶是什么| 屁股痛是什么引起的| 头顶秃了一小块是什么原因怎么办| 晚五行属什么| 咸肉烧什么好吃| 转氨酶高吃什么食物降得快| 女人腰疼是什么妇科病| 女人贫血吃什么补血最快| 凹陷性疤痕用什么药膏| 天妒英才是什么意思| 人参果什么季节成熟| 切除阑尾对身体有什么影响| 什么是极光| 果糖胺是什么意思| 打美国电话前面加什么| 来事吃什么水果好| brunch是什么意思| 惊厥是什么原因引起的| 近亲结婚生的孩子会得什么病| 巧克力囊肿有什么症状表现| 消融手术是什么意思| 腮腺炎的症状是什么| 头疼头胀是什么原因| 空窗期是什么意思| 伤口发炎化脓用什么药| 用什么锅炒菜对人体健康更有益| 什么水果不能上供| 什么是鼻息肉| 1999属什么生肖| 钢铁锅含眼泪喊修瓢锅这是什么歌| 东南大学什么专业最牛| 来大姨妈吃什么水果好| 质粒是什么| 颈椎脑供血不足吃什么药| 什么是沙龙| 女兔配什么属相最好| 潜血是什么意思| 离婚要带什么| 5月17日是什么星座| 总流鼻血是什么原因| 天上的月亮是什么生肖| 吃纳豆有什么好处| 孤独症有什么表现| 腰部酸胀是什么原因| 梧桐叶像什么| 一级军士长什么待遇| 笃定什么意思| 总打哈欠是什么原因| 卉字五行属什么| 食道不舒服挂什么科| 1990属什么生肖| 4月18号是什么星座| 江西景德镇有什么好玩的地方| 广西狗肉节是什么时候| 积劳成疾的疾是什么意思| fsa是什么意思| 两个子是什么字| 孕晚期缺铁对胎儿有什么影响| 甲状腺实性结节什么意思| 瑶浴spa是什么意思| 入职offer是什么意思| 囊肿什么意思| 意大利用什么货币| 什么怎么读| 大姨妈不能吃什么水果| 大姨妈期间不能吃什么东西| 三本是什么学历| 土豆粉是什么做的| 每次睡觉都做梦为什么| 什么布料最好| 其实不然是什么意思| 水痘是什么| 枕大神经痛吃什么药| 鼻窦炎吃什么药好得快| 和田玉对身体有什么好处| 涤纶是什么布料| 丽江机场叫什么名字| 急性心力衰竭的急救措施是什么| 乳腺增生是什么意思| 吃什么补充雌激素| 世侄是什么意思| 石骨症是什么病| 失心疯是什么意思| 梦见水果是什么意思| 肠梗阻是什么病| 胃食管反流能吃什么水果| 心机血缺血是什么症状| 斐乐手表属于什么档次| 汪星人什么意思| 十二月七号是什么星座| 犬瘟热是什么症状| 纠缠什么意思| 胃息肉是什么原因造成的| 梦见着火了是什么征兆| 什么叫应届毕业生| 月德合是什么意思| 过指什么生肖| 吃避孕药有什么好处| 男生学什么技术吃香| 气短吃什么药立马见效| 阴阳失调是什么意思| ml什么单位| 富裕是什么意思| 20点是什么时辰| 睡觉梦到蛇是什么意思| 百度
Docs 菜单
Docs 主页
/
数据库手册

Quick Any2Ico(图片转换成图标) V2.2.3.0绿色版

变更流允许应用程序访问实时数据更改,从而避免事先手动追踪 oplog 的复杂性和风险。应用程序可使用变更流来订阅针对单个集合、数据库或整个部署的所有数据变更,并立即对它们做出响应。由于变更流采用聚合框架,因此,应用程序还可对特定变更进行过滤,或是随意转换通知。

从 MongoDB 5.1 开始,我们对变更流进行了优化,提高了资源利用率,并加快了某些聚合管道阶段的处理速度。

变更流可用于副本集分片集群

变更流包含在Stable API V1 中。 但是, Stable APIV 中不包含 showExpandedEvents 选项。1

连接 change stream 可以使用带有+srv连接选项的 DNS 种子列表,也可以在连接字符串中单独列出服务器。

如果驱动程序与变更流失去连接或连接中断,它则会尝试通过集群中具有匹配读取偏好的其他节点与变更流重新建立连接。如果驱动程序未找到具有正确读取偏好的节点,则会引发异常。

有关更多信息,请参阅连接字符串 URI 格式。

可以针对如下情况打开变更流:

目标
说明

集合

可以为单个集合(除 system 集合,adminlocalconfig 数据库中的任何集合)打开变更流游标。

本页上的示例使用 MongoDB 驱动程序打开并使用单个集合的变更流游标。另请参阅 mongosh 方法 db.collection.watch()

数据库

您可以为单个数据库(不包括 adminlocalconfig 数据库)打开变更流游标,以监视其所有非系统集合的更改。

有关这种 MongoDB 驱动程序方法,请参阅您的驱动程序文档。另请参阅 mongosh 方法 db.watch()

部署

您可以为部署(副本集或分片集群)打开变更流游标,以监控所有数据库(除 adminlocalconfig 外)中对所有非系统集合的变更。

有关这种 MongoDB 驱动程序方法,请参阅您的驱动程序文档。另请参阅 mongosh 方法 Mongo.watch()

注意

变更流示例

本页上的示例使用 MongoDB 驱动程序说明如何为集合打开变更流游标以及如何使用变更游标。

如果针对数据库打开的活动变更流的数量超过连接池大小,则可能会出现通知延迟。在等待下一事件的时间段内,每个变更流均会使用一个连接并对该变更流执行 getMore 操作。为避免出现延迟问题,应确保池大小应大于已打开的变更流数量。有关详情,请参阅 maxPoolSize 设置。

在分片集群上打开变更流时:

  • mongos每个分片上创建单独的变更流。无论变更流是否针对特定的分片密钥范围,都会出现这种行为。

  • mongos 收到更改流结果时,它会对这些结果进行排序和筛选。如有必要,mongos 还会执行 fullDocument 查找。

为获得最佳性能,请在变更流中限制对 $lookup 查询的使用。

要打开变更流:

  • 对于副本集,您可以从任何承载数据的成员发出打开 change stream 操作。

  • 对于分片集群,必须从 mongos 中发出打开变更流操作。

以下示例将为某一集合打开一个变更流,并对游标进行迭代以检索变更流文档。[1]


? 使用右上角的选择语言下拉菜单来设置本页面上示例的语言。


下面的C示例假定您已连接到MongoDB副本集并访问了包含 inventory集合的数据库。

mongoc_collection_t *collection;
bson_t *pipeline = bson_new ();
bson_t opts = BSON_INITIALIZER;
mongoc_change_stream_t *stream;
const bson_t *change;
const bson_t *resume_token;
bson_error_t error;
collection = mongoc_database_get_collection (db, "inventory");
stream = mongoc_collection_watch (collection, pipeline, NULL /* opts */);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

下面的C#示例假定您已连接到MongoDB副本集并访问了包含 集合的数据库 inventory

var cursor = inventory.Watch();
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
cursor.Dispose();

以下 Go 示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory 集合的数据库

cs, err := coll.Watch(ctx, mongo.Pipeline{})
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current

以下 Java 示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory 集合的数据库

MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator();
ChangeStreamDocument<Document> next = cursor.next();

下面的Kotlin示例假设您已连接到MongoDB副本集,并且可以访问权限包含inventory集合的数据库。 要学习;了解有关完成这些任务的更多信息,请参阅Kotlin协程驱动程序数据库和集合指南。

val job = launch {
val changeStream = collection.watch()
changeStream.collect {
println("Received a change event: $it")
}
}

以下示例假设您已连接到 MongoDB 副本集,并且已访问包含 inventory 集合的数据库

cursor = db.inventory.watch()
document = await cursor.next()

以下 Node.js 示例假设您已连接到 MongoDB 副本集,并且已访问包含 inventory 集合的数据库

以下示例使用流来处理变更事件。

const collection = db.collection('inventory');
const changeStream = collection.watch();
changeStream
.on('change', next => {
// process next document
})
.once('error', () => {
// handle error
});

或者,您也可以使用迭代器处理变更事件:

const collection = db.collection('inventory');
const changeStream = collection.watch();
const next = await changeStream.next();

ChangeStream 扩展了 EventEmitter

以下示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory 集合的数据库

$changeStream = $db->inventory->watch();
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

以下 Python 示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory 集合的数据库

cursor = db.inventory.watch()
next(cursor)

以下示例假设您已连接到 MongoDB 副本集,并且已访问一个包含 inventory 集合的数据库

cursor = inventory.watch.to_enum
next_change = cursor.next

下面的 Swift (异步) 示例假设您已连接到 MongoDB 副本集,并已访问包含 inventory 集合的数据库

let inventory = db.collection("inventory")
// Option 1: retrieve next document via next()
let next = inventory.watch().flatMap { cursor in
cursor.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch().flatMap { cursor in
cursor.forEach { event in
// process event
print(event)
}
}

以下 Swift(同步)示例假设您已连接到 MongoDB 副本集,并已访问包含 inventory 集合的数据库

let inventory = db.collection("inventory")
let changeStream = try inventory.watch()
let next = changeStream.next()

若要从游标检索数据更改事件,请迭代使用变更流游标。有关变更流事件的信息,请参阅变更事件。

变更流游标保持打开状态,直到出现以下任一情况:

  • 游标已明确关闭。

  • 发生失效事件;例如删除或重命名集合。

  • 与MongoDB 部署的连接关闭或超时。有关更多信息,请参阅行为。

  • 如果部署是分片集群,则分片删除可能会导致打开的变更流游标关闭。关闭的变更流游标可能无法完全恢复。

注意

未关闭游标的生命周期取决于语言。

[1] 您可以指定 startAtOperationTime 在特定时间点打开游标。如果指定的起点在过去,它必须在 oplog 的时间范围内。

? 使用右上角的选择语言下拉菜单来设置本页面上示例的语言。


在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

pipeline = BCON_NEW ("pipeline",
"[",
"{",
"$match",
"{",
"fullDocument.username",
BCON_UTF8 ("alice"),
"}",
"}",
"{",
"$addFields",
"{",
"newField",
BCON_UTF8 ("this is an added field!"),
"}",
"}",
"]");
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
.Match(change =>
change.FullDocument["username"] == "alice" ||
change.OperationType == ChangeStreamOperationType.Delete)
.AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>(
"{ $addFields : { newField : 'this is an added field!' } }");
var collection = database.GetCollection<BsonDocument>("inventory");
using (var cursor = collection.Watch(pipeline))
{
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
}

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"$or",
bson.A{
bson.D{{"fullDocument.username", "alice"}},
bson.D{{"operationType", "delete"}}}}},
}}}
cs, err := coll.Watch(ctx, pipeline)
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

MongoClient mongoClient = MongoClients.create("mongodb://<username>:<password>@<host>:<port>");
// Select the MongoDB database and collection to open the change stream against
MongoDatabase db = mongoClient.getDatabase("myTargetDatabase");
MongoCollection<Document> collection = db.getCollection("myTargetCollection");
// Create $match pipeline stage.
List<Bson> pipeline = singletonList(Aggregates.match(Filters.or(
Document.parse("{'fullDocument.username': 'alice'}"),
Filters.in("operationType", asList("delete")))));
// Create the change stream cursor, passing the pipeline to the
// collection.watch() method
MongoCursor<Document> cursor = collection.watch(pipeline).iterator();

pipeline列表包含一个$match阶段,用于筛选符合以下一个或两个条件的任何操作:

  • username 值为 alice

  • operationType 值为 delete

pipeline 传递给 watch() 方法会指示变更流在通过指定的 pipeline 传递通知后返回通知。

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

val pipeline = listOf(
Aggregates.match(
or(
eq("fullDocument.username", "alice"),
`in`("operationType", listOf("delete"))
)
))
val job = launch {
val changeStream = collection.watch(pipeline)
changeStream.collect {
println("Received a change event: $it")
}
}

pipeline列表包含一个$match阶段,用于筛选符合以下一个或两个条件的任何操作:

  • username 值为 alice

  • operationType 值为 delete

pipeline 传递给 watch() 方法会指示变更流在通过指定的 pipeline 传递通知后返回通知。

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

pipeline = [
{"$match": {"fullDocument.username": "alice"}},
{"$addFields": {"newField": "this is an added field!"}},
]
cursor = db.inventory.watch(pipeline=pipeline)
document = await cursor.next()

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

以下示例使用流来处理变更事件。

const pipeline = [
{ $match: { 'fullDocument.username': 'alice' } },
{ $addFields: { newField: 'this is an added field!' } }
];
const collection = db.collection('inventory');
const changeStream = collection.watch(pipeline);
changeStream
.on('change', next => {
// process next document
})
.once('error', error => {
// handle error
});

或者,您也可以使用迭代器处理变更事件:

const changeStreamIterator = collection.watch(pipeline);
const next = await changeStreamIterator.next();

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

$pipeline = [
['$match' => ['fullDocument.username' => 'alice']],
['$addFields' => ['newField' => 'this is an added field!']],
];
$changeStream = $db->inventory->watch($pipeline);
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

pipeline = [
{"$match": {"fullDocument.username": "alice"}},
{"$addFields": {"newField": "this is an added field!"}},
]
cursor = db.inventory.watch(pipeline=pipeline)
next(cursor)

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

let pipeline: [BSONDocument] = [
["$match": ["fullDocument.username": "alice"]],
["$addFields": ["newField": "this is an added field!"]]
]
let inventory = db.collection("inventory")
// Option 1: use next() to iterate
let next = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in
changeStream.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in
changeStream.forEach { event in
// process event
print(event)
}
}

在配置 change stream 时,您可以通过提供以下一个或多个管道阶段的数组来控制 change stream 输出

let pipeline: [BSONDocument] = [
["$match": ["fullDocument.username": "alice"]],
["$addFields": ["newField": "this is an added field!"]]
]
let inventory = db.collection("inventory")
let changeStream = try inventory.watch(pipeline, withEventType: BSONDocument.self)
let next = changeStream.next()

提示

变更流事件文档的_id字段充当恢复令牌。 请勿使用管道修改或删除变更流事件的_id字段。

从 MongoDB 4.2 开始,如果变更流聚合管道修改了事件的 _id 字段,则变更流会引发异常。

请参阅变更事件以了解有关变更流响应文档格式的更多信息。

默认情况下,变更流仅在更新操作期间返回字段的增量。不过,您可以配置变更流以返回已更新文档的最新多数提交版本。


? 使用右上角的选择语言下拉菜单来设置本页面上示例的语言。


要返回已更新文档的当前多数提交版本,请将带有 "updateLookup" 值的 "fullDocument" 选项传递给 mongoc_collection_watch 方法。

在下面的示例中,所有更新操作通知都包含一个 fullDocument 字段,该字段表示受更新操作影响的文档的当前版本。

BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup");
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

要返回更新文档的最新多数提交版本,请将 "FullDocument = ChangeStreamFullDocumentOption.UpdateLookup" 传递给 db.collection.watch() 方法。

在下面的示例中,所有更新操作通知都包含一个 FullDocument 字段,该字段表示受更新操作影响的文档的当前版本。

var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
var cursor = inventory.Watch(options);
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
cursor.Dispose();

要返回已更新文档的当前多数提交版本,请使用 SetFullDocument(options.UpdateLookup) 变更流选项。

cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetFullDocument(options.UpdateLookup))
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current

要返回已更新文档的最新多数提交版本,请将 FullDocument.UPDATE_LOOKUP 传递给 db.collection.watch.fullDocument() 方法。

在下面的示例中,所有更新操作通知都包含一个 FullDocument 字段,该字段表示受更新操作影响的文档的当前版本。

cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
next = cursor.next();

要返回更新文档的最新多数提交版本,请将FullDocument.UPDATE_LOOKUP 传递给 ChangeStreamFlow.fullDocument()方法。

在下面的示例中,所有更新操作通知都包含一个 FullDocument 字段,该字段表示受更新操作影响的文档的当前版本。

val job = launch {
val changeStream = collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
changeStream.collect {
println(it)
}
}

要返回更新文档的最新多数提交版本,请将 full_document='updateLookup' 传递给 db.collection.watch() 方法。

在下面的示例中,所有更新操作通知都包含一个 `full_document 字段,该字段表示受更新操作影响的文档的当前版本。

cursor = db.inventory.watch(full_document="updateLookup")
document = await cursor.next()

要返回更新文档的最新多数提交版本,请将 { fullDocument: 'updateLookup' } 传递给 db.collection.watch() 方法。

在下面的示例中,所有更新操作通知都包含一个 fullDocument 字段,该字段表示受更新操作影响的文档的当前版本。

以下示例使用流来处理变更事件。

const collection = db.collection('inventory');
const changeStream = collection.watch([], { fullDocument: 'updateLookup' });
changeStream
.on('change', next => {
// process next document
})
.once('error', error => {
// handle error
});

或者,您也可以使用迭代器处理变更事件:

const changeStreamIterator = collection.watch([], { fullDocument: 'updateLookup' });
const next = await changeStreamIterator.next();

要返回更新文档的最新多数提交版本,请将 "fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP" 传递给 db.watch() 方法。

在下面的示例中,所有更新操作通知都包含一个 fullDocument 字段,该字段表示受更新操作影响的文档的当前版本。

$changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]);
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

要返回更新文档的最新多数提交版本,请将 full_document='updateLookup' 传递给 db.collection.watch() 方法。

在下面的示例中,所有更新操作通知都包含一个 full_document 字段,该字段表示受更新操作影响的文档的当前版本。

cursor = db.inventory.watch(full_document="updateLookup")
next(cursor)

要返回更新文档的最新多数提交版本,请将 full_document: 'updateLookup' 传递给 db.watch() 方法。

在下面的示例中,所有更新操作通知都包含一个 full_document 字段,该字段表示受更新操作影响的文档的当前版本。

cursor = inventory.watch([], full_document: 'updateLookup').to_enum
next_change = cursor.next

要返回已更新文档的最新多数提交版本,请将 options: ChangeStreamOptions(fullDocument: .updateLookup) 传递给 watch() 方法。

let inventory = db.collection("inventory")
// Option 1: use next() to iterate
let next = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.forEach { event in
// process event
print(event)
}
}

要返回已更新文档的最新多数提交版本,请将 options: ChangeStreamOptions(fullDocument: .updateLookup) 传递给 watch() 方法。

let inventory = db.collection("inventory")
let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
let next = changeStream.next()

注意

如果在更新操作之后但在查找之前有一个或多个多数提交操作修改了更新的文档,则返回的完整文档可能显著不同于更新操作时的文档。

但是,变更流文档中包含的增量始终正确地描述应用于该变更流事件的被监控集合更改。

如果以下任一条件为真,则更新事件的 fullDocument 字段可能会缺失:

  • 如果文档被删除,或者集合在更新和查找之间被删除。

  • 如果更新更改了该集合分片键中至少一个字段的值。

请参阅变更事件以了解有关变更流响应文档格式的更多信息。

在打开游标时将恢复令牌指定为 resumeAfter startAfter,借此恢复变更流。

您可以在打开游标时将恢复令牌传递给 resumeAfter ,从而在特定事件发生后恢复 change stream。

请参阅恢复令牌以了解有关恢复令牌的更多信息。

重要

  • 如果时间戳位于过去,oplog 必须有足够的历史记录来定位与令牌或时间戳相关的操作。

  • 在某一无效事件(例如,集合删除或重命名)关闭变更流后,您无法使用 resumeAfter 来恢复变更流。请改为使用 startAfter无效事件后启动新的变更流。

在下面的示例中,resumeAfter 选项会附加到流选项,以便在流被销毁后重新创建流。将 _id 传递给变更流会尝试在指定的操作之后开始恢复通知。

stream = mongoc_collection_watch (collection, pipeline, NULL);
if (mongoc_change_stream_next (stream, &change)) {
resume_token = mongoc_change_stream_get_resume_token (stream);
BSON_APPEND_DOCUMENT (&opts, "resumeAfter", resume_token);
mongoc_change_stream_destroy (stream);
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
mongoc_change_stream_destroy (stream);
} else {
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);
}

在下面的示例中,resumeToken 是从最后一个变更流文档中检索的,并作为选项传递给 Watch() 方法。将 resumeToken 传递给 Watch() 方法,会指示变更流尝试在恢复令牌中指定的操作之后开始恢复通知。

var resumeToken = previousCursor.GetResumeToken();
var options = new ChangeStreamOptions { ResumeAfter = resumeToken };
var cursor = inventory.Watch(options);
cursor.MoveNext();
var next = cursor.Current.First();
cursor.Dispose();

您可以使用ChangeStreamOptions.SetResumeAfter来指定变更流的恢复令牌。如果设置了 resumeAfter 选项,变更流将在恢复令牌中指定的操作后恢复通知。SetResumeAfter 采用的值必须解析为恢复令牌,例如以下示例中的的 resumeToken

resumeToken := original.ResumeToken()
cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetResumeAfter(resumeToken))
assert.NoError(t, err)
defer cs.Close(ctx)
ok = cs.Next(ctx)
result := cs.Current

您可以使用 resumeAfter() 方法在恢复令牌中指定的操作后恢复通知。resumeAfter() 方法采用的值必须解析为恢复令牌,例如以下示例中的 resumeToken

BsonDocument resumeToken = next.getResumeToken();
cursor = inventory.watch().resumeAfter(resumeToken).iterator();
next = cursor.next();

您可以使用 ChangeStreamFlow.resumeAfter()方法,以便在执行恢复令牌中指定的操作后恢复通知。 resumeAfter()方法采用的值必须解析为恢复令牌,示例下例中的resumeToken变量。

val resumeToken = BsonDocument()
val job = launch {
val changeStream = collection.watch()
.resumeAfter(resumeToken)
changeStream.collect {
println(it)
}
}

您可以使用 resume_after 修饰符在恢复令牌中指定的操作后恢复通知。resume_after 修饰符采用的值必须解析为恢复令牌,例如以下示例中的 resume_token

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
document = await cursor.next()

您可以使用 resumeAfter 选项在恢复令牌中指定的操作后恢复通知。resumeAfter 选项采用的值必须解析为恢复令牌,例如以下示例中的 resumeToken

const collection = db.collection('inventory');
const changeStream = collection.watch();
let newChangeStream;
changeStream
.once('change', next => {
const resumeToken = changeStream.resumeToken;
changeStream.close();
newChangeStream = collection.watch([], { resumeAfter: resumeToken });
newChangeStream
.on('change', next => {
processChange(next);
})
.once('error', error => {
// handle error
});
})
.once('error', error => {
// handle error
});

您可以使用 resumeAfter 选项在恢复令牌中指定的操作后恢复通知。resumeAfter 选项采用的值必须解析为恢复令牌,例如以下示例中的 $resumeToken

$resumeToken = $changeStream->getResumeToken();
if ($resumeToken === null) {
throw new \Exception('Resume token was not found');
}
$changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]);
$changeStream->rewind();
$firstChange = $changeStream->current();

您可以使用 resume_after 修饰符在恢复令牌中指定的操作后恢复通知。resume_after 修饰符采用的值必须解析为恢复令牌,例如以下示例中的 resume_token

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
next(cursor)

您可以使用 resume_after 修饰符在恢复令牌中指定的操作后恢复通知。resume_after 修饰符采用的值必须解析为恢复令牌,例如以下示例中的 resume_token

change_stream = inventory.watch
cursor = change_stream.to_enum
next_change = cursor.next
resume_token = change_stream.resume_token
new_cursor = inventory.watch([], resume_after: resume_token).to_enum
resumed_change = new_cursor.next

您可以使用 resumeAfter 选项在恢复令牌中指定的操作后恢复通知。resumeAfter 选项采用的值必须解析为恢复令牌,例如以下示例中的 resumeToken

let inventory = db.collection("inventory")
inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.next().map { _ in
changeStream.resumeToken
}.always { _ in
_ = changeStream.kill()
}
}.flatMap { resumeToken in
inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in
newStream.forEach { event in
// process event
print(event)
}
}
}

您可以使用 resumeAfter 选项在恢复令牌中指定的操作后恢复通知。resumeAfter 选项采用的值必须解析为恢复令牌,例如以下示例中的 resumeToken

let inventory = db.collection("inventory")
let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
let next = changeStream.next()
let resumeToken = changeStream.resumeToken
let resumedChangeStream = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken))
let nextAfterResume = resumedChangeStream.next()

您可在打开游标时将恢复令牌传递给 startAfter,从而在特定事件之后启动新的变更流。与 resumeAfter 不同,startAfter 可在出现无效事件之后通过创建新的变更流来恢复通知。

请参阅恢复令牌以了解有关恢复令牌的更多信息。

重要

  • 如果时间戳位于过去,oplog 必须有足够的历史记录来定位与令牌或时间戳相关的操作。

恢复令牌可从多个来源获取:

说明

更改事件通知包含针对 _id 字段的恢复词元:

$changeStream 聚合阶段在 cursor.postBatchResumeToken 字段中包含恢复令牌。

该字段仅在使用 aggregate 命令时显示。

getMore 命令在 cursor.postBatchResumeToken 字段中包含一个恢复令牌。

从 MongoDB 4.2 开始,如果变更流聚合管道修改了事件的 _id 字段,则变更流会引发异常。

提示

MongoDB 提供了“代码段”,这是 mongosh 的扩展,用于解码十六进制编码的恢复令牌。

您可以从 mongosh 安装并运行 resumetoken

snippet install resumetoken
decodeResumeToken('<RESUME TOKEN>')

如果系统上安装了 npm,那么您还可以在命令行中运行 resumetoken (并且不使用 mongosh):

npx mongodb-resumetoken-decoder <RESUME TOKEN>

请参阅以下内容了解详细信息:

更改事件通知包含针对 _id 字段的恢复令牌:

{
"_id": {
"_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004"
},
"operationType": "insert",
"clusterTime": Timestamp({ "t": 1666193824, "i": 1 }),
"collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"),
"wallTime": ISODate("2025-08-06T15:37:04.604Z"),
"fullDocument": {
"_id": ObjectId("635019a078be67426d7cf4d2"'),
"name": "Giovanni Verga"
},
"ns": {
"db": "test",
"coll": "names"
},
"documentKey": {
"_id": ObjectId("635019a078be67426d7cf4d2")
}
}

使用 aggregate 命令时,$changeStream 聚合阶段在 cursor.postBatchResumeToken 字段中包含恢复令牌:

{
"cursor": {
"firstBatch": [],
"postBatchResumeToken": {
"_data": "8263515EAC000000022B0429296E1404"
},
"id": Long("4309380460777152828"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp({ "t": 1666277036, "i": 1 }),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666277036, "i": 1 })
}

getMore 命令还在 cursor.postBatchResumeToken 字段中包含一个恢复令牌:

{
"cursor": {
"nextBatch": [],
"postBatchResumeToken": {
"_data": "8263515979000000022B0429296E1404"
},
"id": Long("7049907285270685005"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666275705, "i": 1 })
}

变更流对于采用业务依赖型系统的基础设施很有益处,因为数据更改一旦变为持久更改,它就会通知下游系统。例如,在实施提取、转换和加载 (ETL) 服务、跨平台同步、协作功能以及通知服务时,变更流可为开发人员节省时间。

对于在自管理部署上执行身份验证授权的部署:

  • 要打开针对特定集合的变更流,应用程序必须具有对相应集合授予 changeStreamfind 动作的特权。

    { resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
  • 要在单个数据库上打开变更流,应用程序必须具有对数据库中所有非 system 集合授予 changeStreamfind 动作的特权。

    { resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] }
  • 要在整个部署中打开变更流,应用程序必须具有对部署中所有数据库的所有非 system 集合授予 changeStreamfind 动作的特权。

    { resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }

变更流仅在数据发生更改时通知副本集中的大多数数据承载节点。这可确保通知仅由大多数已提交且在故障情况下持续存在的更改触发。

例如,考虑一个 3 节点副本集,针对主节点打开了变更流游标。如果客户端发出插入操作,则只有在插入持续到大多数数据承载节点后,变更流才会将数据更改通知应用程序。

如果某个操作与事务相关联,则变更事件文档包括 txnNumberlsid

除非提供了显式排序规则,否则变更流使用 simple 二进制比较。

从 MongoDB 5.3 开始,在范围迁移期间,不会为孤立文档的更新生成变更流事件。

从 MongoDB 6.0 开始,可使用变更流事件来输出更改前后的文档版本(文档前映像和后映像):

如果图像属于以下情况,则前像和后像不可用于变更流事件

  • 在文档更新或删除操作时未对集合启用。

  • expireAfterSeconds 中设置的前像和后像保留时间后之后被删除。

    • 以下示例将整个集群上的 expireAfterSeconds 设置为 100 秒:

      use admin
      db.runCommand( {
      setClusterParameter:
      { changeStreamOptions: {
      preAndPostImages: { expireAfterSeconds: 100 }
      } }
      } )
    • 以下示例返回当前的 changeStreamOptions 设置,包括 expireAfterSeconds

      db.adminCommand( { getClusterParameter: "changeStreamOptions" } )
    • expireAfterSeconds 设置为 off 可使用默认保留策略:将保留前像和后像,直到从 oplog 中删除对应的变更流事件。

    • 如果变更流事件从 oplog 中删除,则无论 expireAfterSeconds 前映像和后映像保留时间如何,相应的前映像和后映像也会被删除。

其他考量:

  • 启用前像和后像会占用存储空间并增加处理时间。仅在需要时启用前像和后像。

  • 将变更流事件大小限制为小于 16 MiB。要限制事件大小,您可以:

    • 将文档大小限制为 8 MB。如果其他 change stream 事件字段(例如 updateDescription)不是很大,则可以在 change stream 输出中同时请求更新前的文档和更新后的文档。

    • 如果其他变更流事件字段(例如 updateDescription)并不大,则仅请求变更流输出中最多 16 MiB 的文档的后像。

    • 在以下情况下,仅请求变更流输出中最多 16 MiB 的文档的前像:

      • 文档更新仅影响文档结构或内容的一小部分,

      • 不会引起 replace 变更事件。replace 事件始终包含后像。

  • 要请求前图像,请在db.collection.watch()中将fullDocumentBeforeChange设置为requiredwhenAvailable。要请求后图像,您可以使用相同的方法设置fullDocument

  • 前像被写入 config.system.preimages 集合。

    • config.system.preimages 集合可能会变大。要限制集合大小,可如前文所示为前映像设置 expireAfterSeconds 时间。

    • 前像由后台进程异步删除。

重要

向后不兼容的功能

从 MongoDB 6.0 开始,如果您将文档前图像和后图像用于 change stream,则必须使用 collMod 命令为每个集合禁用 changeStreamPreAndPostImages,然后才能降级到早期 MongoDB 版本。

提示

有关变更流输出的完整示例,请参阅使用文档前像和后像的变更流

后退

限制

在此页面上

抱怨是什么意思 肚子痛去医院挂什么科 皂角米有什么功效 血压高吃什么食物好 大小脸是什么原因造成的
很无奈是什么意思 情不自禁的禁是什么意思 三点水是什么字 头发一半白一半黑是什么原因 直肠ca代表什么病
属猴和什么属相相冲 喜欢吃肉的动物是什么生肖 腰椎挂什么科 比基尼是什么 血窦是什么意思
检查淋巴挂什么科 抬举征阳性是什么意思 40不惑是什么意思 口炎念什么 俄罗斯人是什么人种
甲醛什么气味hcv8jop4ns1r.cn 左进右出有什么讲究hcv8jop3ns3r.cn 用什么泡脚可以活血化瘀疏通经络hcv7jop9ns5r.cn 头不由自主的轻微晃动是什么病hcv8jop6ns0r.cn 89年的蛇是什么命hcv8jop5ns8r.cn
逝者如斯夫什么意思hcv8jop3ns6r.cn 什么是cdhuizhijixie.com 三氯蔗糖是什么东西liaochangning.com 康斯坦丁是什么意思hcv9jop8ns3r.cn 一什么湖水hcv7jop5ns3r.cn
什么是脚气hcv9jop3ns6r.cn 人少了一魄什么反应0297y7.com 胆固醇是什么东西hcv9jop4ns8r.cn 婴儿便秘怎么办什么方法最有效hcv8jop2ns2r.cn 911是什么电话hcv8jop7ns2r.cn
小龙虾什么季节吃最好hcv8jop3ns2r.cn 前庭神经炎挂什么科bysq.com 心病有什么症状hcv9jop6ns9r.cn 排骨和什么一起炖好吃hcv7jop6ns9r.cn 多囊卵巢综合症吃什么药imcecn.com
百度