聚合国内IT技术精华文章,分享IT技术精华,帮助IT从业人士成长

MongoDB修改监控

2019-08-15 16:10 浏览: 133 次 我要评论(0 条) 字号:

我的系统X采用MongoDB存储配置,现在我有一个需求,当某个App1修改了X的配置时,我希望能及时通知其他的App2、App3等。
在MongoDB3.6版本以前,可以利用–replSet副本配置(备注[1]),监控其产生的OpLog来获取修改变化。
在MongoDB3.6版本之后,有了一个名为Change Streams的特性(备注[2]),可以直接满足我这个需求。下面来试一试。

从这里下载MongoDB3.6版本程序:

https://www.mongodb.com/download-center?jmp=homepage#community

我c用的CentOS7.2,因此下载这个:

https://www.mongodb.com/dr/fastdl.mongodb.org/linux/mongodb-linux-x86_64-rhel70-3.6.5.tgz/download

1,在终端终端Shell-1里先将MongoDB服务运行起来。Change Streams只能在副本集或分片集群中打开,在分片集群中,必须为mongos路由打开Change Streams操作,存储引擎要求是WiredTiger存储引擎。

[root@localhost bin]# pwd
/root/mongodb-linux-x86_64-rhel70-3.6.5/bin
[root@localhost bin]# mkdir  /opt/mongo/
[root@localhost bin]# ./mongod --dbpath /opt/mongo/  --replSet "rs"

另起一个终端Shell-2,执行:
2,创建副本集:

[root@localhost bin]# ./mongo
...
> rs.initiate()
{
	"info2" : "no configuration specified. Using a default configuration for the set",
	"me" : "localhost:27017",
	"ok" : 1,
	"operationTime" : Timestamp(1527211153, 1),
	"$clusterTime" : {
		"clusterTime" : Timestamp(1527211153, 1),
		"signature" : {
			"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
			"keyId" : NumberLong(0)
		}
	}
}

3,创建测试数据:

rs:PRIMARY> use demo
switched to db demo
rs:PRIMARY> db.createCollection("coll")
{
	"ok" : 1,
	"operationTime" : Timestamp(1527212586, 1),
	"$clusterTime" : {
		"clusterTime" : Timestamp(1527212586, 1),
		"signature" : {
			"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
			"keyId" : NumberLong(0)
		}
	}
}
rs:PRIMARY> db.coll.insert({"test":1})
WriteResult({ "nInserted" : 1 })
rs:PRIMARY> db.coll.find().pretty()
{ "_id" : ObjectId("5b076a4c8d2767c662be613c"), "test" : 1 }

再另起一个终端Shell-3,利用mongo执行如下js:

[root@localhost mongodb-linux-x86_64-rhel70-3.6.5]# cat test.js 
conn = new Mongo("mongodb://localhost:27017/demo?replicaSet=rs");
db = conn.getDB("demo");
collection = db.coll;

const changeStreamCursor = collection.watch();

pollStream(changeStreamCursor);

//this function polls a change stream and prints out each change as it comes in
function pollStream(cursor) {
  while (!cursor.isExhausted()) {
    if (cursor.hasNext()) {
      change = cursor.next();
      print(JSON.stringify(change));
    }
  }
  pollStream(cursor);
}
[root@localhost mongodb-linux-x86_64-rhel70-3.6.5]# ./bin/mongo --quiet test.js 
2018-05-24T22:04:28.533-0400 I NETWORK  [thread1] Starting new replica set monitor for rs/localhost:27017
2018-05-24T22:04:28.681-0400 I NETWORK  [thread1] Successfully connected to localhost:27017 (1 connections now open to localhost:27017 with a 5 second timeout)

回到终端Shell-2的mongo命令行,尝试修改数据:

rs:PRIMARY> db.coll.update({"test":1}, {$set:{"test":2}})
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })

查看终端Shell-3,获得打印信息:

2018-05-24T22:04:28.681-0400 I NETWORK  [thread1] Successfully connected to localhost:27017 (1 connections now open to localhost:27017 with a 5 second timeout)
{"_id":{"_data":{"$binary":"glsHb/QAAAABRmRfaWQAZFsHakyNJ2fGYr5hPABaEATsy7A+TF1ABJv3m91HJyGqBA==","$type":"00"}},"operationType":"update","ns":{"db":"demo","coll":"coll"},"documentKey":{"_id":{"$oid":"5b076a4c8d2767c662be613c"}},"updateDescription":{"updatedFields":{"test":2},"removedFields":[]}}

插入也有类似信息:

rs:PRIMARY> db.coll.insert({"test2":1})
{"_id":{"_data":{"$binary":"glsHcNQAAAABRmRfaWQAZFsHcNSZCR9iEdNTjwBaEATsy7A+TF1ABJv3m91HJyGqBA==","$type":"00"}},"operationType":"insert","fullDocument":{"_id":{"$oid":"5b0770d499091f6211d3538f"},"test2":1},"ns":{"db":"demo","coll":"coll"},"documentKey":{"_id":{"$oid":"5b0770d499091f6211d3538f"}}}

监控还可以设置条件,具体请参考备注[4]。

[1]https://stackoverflow.com/questions/9691316/how-to-listen-for-changes-to-a-mongodb-collection
[2]https://emptysqua.re/blog/driver-features-for-mongodb-3-6/#change-streams
[3]https://dba.stackexchange.com/questions/138300/how-to-get-old-and-new-values-on-redis-notification
[4]https://www.mongodb.com/blog/post/an-introduction-to-change-streams



网友评论已有0条评论, 我也要评论

发表评论

*

* (保密)

Ctrl+Enter 快捷回复