一聚教程网:一个值得你收藏的教程网站

最新下载

热门教程

MongoDB数据库复制集同步原理与例子

时间:2022-06-29 10:34:09 编辑:袖梨 来源:一聚教程网

同步过程

选取从哪个节点同步后,拉取oplog
1.Applies the op执行这个op日志
2.Writes the op to its own oplog (also local.oplog.rs)将这个op日志写入到自己的oplog中
3.Requests the next op请求下一个op日志secondary节点同步到哪了
secondary节点同步到哪了

主节点根据从节点获取oplog的时间戳可以判断数据同步到哪了

How does primary know where secondary is synced to? Well, secondary is querying primary‘s oplog for more results. So, if secondary requests an op written at 3pm, primary knows seconday has replicated all ops written before 3pm.
So, it goes like:
1.Do a write on primary.
2.Write is written to the oplog on primary, with a field “ts” saying the write occurred at time t.
3.{getLastError:1,w:2} is called on primary. primary has done the write, so it is just waiting for one more server to get the write (w:2).
4.secondary queries the oplog on primary and gets the op
5.secondary applies the op from time t
6.secondary requests ops with {ts:{$gt:t}} from primary‘s oplog
7.primary updates that secondary has applied up to t because it is requesting ops > t.
8.getLastError notices that primary and secondary both have the write, so w:2 is satisfied and it returns.
同步原理
如果A从B同步数据,B从C同步,C怎么知道A同步到哪了?看oplog读取协议:
当A从B同步数据,B对A说,我从年度oplog同步数据,如果你有写操作,告诉我一下。
B回答,我不是主节点,等我转发一下;B就跟主节点C说,就当做我是A,我代表A从你这同步数据。这时B与主节点C有两个连接,一个是B自己的,一个是代表A的。
A向B请求ops(写操作),B就转向C,这样来完成A的请求。
A            B          C
<====>
<====> <---->
<====> 是”真正”的同步连接. <----> “ghost” connection,B代表A与C的连接。
初始化同步
新增成员或者重做同步的时候,会进行初始化同步。
如下7步:
1.Check the oplog. If it is not empty, this node does not initial sync, it just starts syncing normally. If the oplog is empty, then initial sync is necessary, continue to step #2:检查oplog,如果空的,需要进行初始化同步,否则进行普通的同步。
2.Get the latest oplog time from the source member: call this time start.取同步来源节点最新的oplog time,标记为start
3.Clone all of the data from the source member to the destination member.复制所有数据到目标节点
4.Build indexes on destination. 目标节点建索引,2.0版本包含在复制数据步骤里,2.2在复制数据后建索引。
5.Get the latest oplog time from the sync target, which is called minValid.取目标节点最新的oplog time,标记为minValid
6.Apply the sync target’s oplog from start to minValid.在目标节点执行start 到minValid之间的oplog
7.Become a “normal” member (transition into secondary state).成为正常的成员
个人理解,start 到minValid之间的oplog是复制过来的没有执行的oplog,没有完成最终一致性的那部分,就是一个oplog replay的过程。
查看源码rs_initialsync.cpp,同步初始化步骤如下:
/**
Do the initial sync for this member. There are several steps to this process:
*
Record start time.
Clone.
Set minValid1 to sync target’s latest op time.
Apply ops from start to minValid1, fetching missing docs as needed.
Set minValid2 to sync target’s latest op time.
Apply ops from minValid1 to minValid2.
Build indexes.
Set minValid3 to sync target’s latest op time.
Apply ops from minValid2 to minValid3.
*
At that point, initial sync is finished. Note that the oplog from the sync target is applied
three times: step 4, 6, and 8. 4 may involve refetching, 6 should not. By the end of 6,
this member should have consistent data. 8 is “cosmetic,” it is only to get this member
closer to the latest op time before it can transition to secondary state.
*/

clone data 复制数据的过程:


for each db on sourceServer:
    for each collection in db:
        for each doc in db.collection.find():
             destinationServer.getDB(db).getCollection(collection).insert(doc)
初始化同步特点
好处:数据更紧凑,节省磁盘空间,因为所有操作都是insert。注意padding factor会设置为1。
不好的地方:同步速度太慢。使用fsync+lock 加写锁复制数据文件同步更快。
另外,mongodump/mongorestore来恢复不带oplog,实际上不太适合作为“从备份恢复”的策略。
从哪个成员来同步数据(Who to sync from)
MongoDB初始化同步数据的时候,可能从主节点同步,也可能是从从节点同步,根据最邻近原则,选择最邻近节点去同步数据;

By default, the member syncs from the the closest member of the set that is either the primary or another secondary with more recent oplog entries. This prevents two secondaries from syncing from each other.
http://docs.mongodb.org/manual/core/replication-internals/

如在上一篇文章提到的日志里[rsSync] replSet syncing to: 10.0.0.106:20011
这里syncing to 实际上是syncing from的意思,由于版本兼容原因沿用,正如kristina chodorow 所说’Backwards compatibility sucks’.
Replica Sets通过选择最邻近的节点(基于ping值),通过如下算法选择从哪个节点同步:
for each member that is healthy:
if member[state] == PRIMARY
add to set of possible sync targets


if member[lastOpTimeWritten] > our[lastOpTimeWritten]
    add to set of possible sync targets
sync target = member with the min ping time from the possible sync targets
对于节点是否healthy的判断,各个版本不同,但是其目的都是找出正常运转的节点。在2.0版本中,它的判断还包括了salve delay这个因素。
在secondary运行db.adminCommand({replSetGetStatus:1}) 或者rs.status()命令来查看当前的节点状况,可以看到syncingTo这个字段,这个字段的值就是这个secondary的同步来源。
2.2新增replSetSyncFrom命令,可以指定从哪个节点同步数据。


db.adminCommand( { replSetSyncFrom: "[hostname]:[port]" } )

或者

rs.syncFrom("[hostname]:[port]")

如何选择最邻近的节点?看源码,最新2.2.2为例:mongodb-src-r2.2.2/src/mongo/db/repl/rs_initialsync.cpp


Member* ReplSetImpl::getMemberToSyncTo() {
 lock lk(this);
 
bool buildIndexes = true;
 
// if we have a target we’ve requested to sync from, use it
 
if (_forceSyncTarget) {
 Member* target = _forceSyncTarget;
 _forceSyncTarget = 0;
 sethbmsg( str::stream() << “syncing to: ” << target->fullName() << ” by request”, 0);
 return target;
 }
 
Member* primary = const_cast(box.getPrimary());
 
// wait for 2N pings before choosing a sync target
 if (_cfg) {
 int needMorePings = config().members.size()*2 – HeartbeatInfo::numPings;
 
if (needMorePings > 0) {
 OCCASIONALLY log() << “waiting for ” << needMorePings << ” pings from other members before syncing” << endl;
 return NULL;
 }
 
buildIndexes = myConfig().buildIndexes;
 
// If we are only allowed to sync from the primary, return that
 if (!_cfg->chainingAllowed()) {
 // Returns NULL if we cannot reach the primary
 return primary;
 }
 }
 
// find the member with the lowest ping time that has more data than me
 
// Find primary’s oplog time. Reject sync candidates that are more than
 // MAX_SLACK_TIME seconds behind.
 OpTime primaryOpTime;
 static const unsigned maxSlackDurationSeconds = 10 * 60; // 10 minutes
 if (primary)
 primaryOpTime = primary->hbinfo().opTime;
 else
 // choose a time that will exclude no candidates, since we don’t see a primary
 primaryOpTime = OpTime(maxSlackDurationSeconds, 0);
 
if ( primaryOpTime.getSecs() < maxSlackDurationSeconds ) {
 // erh – I think this means there was just a new election
 // and we don’t yet know the new primary’s optime
 primaryOpTime = OpTime(maxSlackDurationSeconds, 0);
 }
 
OpTime oldestSyncOpTime(primaryOpTime.getSecs() – maxSlackDurationSeconds, 0);
 
Member *closest = 0;
 time_t now = 0;
 
// Make two attempts. The first attempt, we ignore those nodes with
 // slave delay higher than our own. The second attempt includes such
 // nodes, in case those are the only ones we can reach.
 // This loop attempts to set ‘closest’.
for (int attempts = 0; attempts < 2; ++attempts) {
 for (Member *m = _members.head(); m; m = m->next()) {
 if (!m->hbinfo().up())
 continue;
 // make sure members with buildIndexes sync from other members w/indexes
 if (buildIndexes && !m->config().buildIndexes)
 continue;
 
if (!m->state().readable())
 continue;
 
if (m->state() == MemberState::RS_SECONDARY) {
 // only consider secondaries that are ahead of where we are 只考虑OpTime在当前节点之前的节点
if (m->hbinfo().opTime <= lastOpTimeWritten)
 continue;
 // omit secondaries that are excessively behind, on the first attempt at least.
 if (attempts == 0 &&
 m->hbinfo().opTime < oldestSyncOpTime)
 continue;
 }
 
// omit nodes that are more latent than anything we’ve already considered 忽略ping值延迟的
if (closest &&
 (m->hbinfo().ping > closest->hbinfo().ping))
 continue;
 
if (attempts == 0 &&
 (myConfig().slaveDelay < m->config().slaveDelay || m->config().hidden)) {
 continue; // skip this one in the first attempt
 }
 
map::iterator vetoed = _veto.find(m->fullName());
 if (vetoed != _veto.end()) {
 // Do some veto housekeeping
 if (now == 0) {
 now = time(0);
 }
 
// if this was on the veto list, check if it was vetoed in the last “while”. 判断是否在否决列表里
// if it was, skip.
 if (vetoed->second >= now) {
 if (time(0) % 5 == 0) {
 log() << “replSet not trying to sync from ” << (*vetoed).first
 << “, it is vetoed for ” << ((*vetoed).second – now) << ” more seconds” << rsLog;
 }
 continue;
 }
 _veto.erase(vetoed);
 // fall through, this is a valid candidate now
 }
 // This candidate has passed all tests; set ‘closest’  满足所有条件,设置为最近的节点
closest = m;
 }
 if (closest) break; // no need for second attempt
 }
 
if (!closest) {
 return NULL;
 }
 
sethbmsg( str::stream() << “syncing to: ” << closest->fullName(), 0);
 
return closest;
 }

 

热门栏目