protected void pRequest2Txn( int type, long zxid, Request request, Record record, boolean deserialize)
throws KeeperException, IOException, RequestProcessorException
{
request.hdr = new TxnHeader(request.sessionId , request.cxid, zxid,
zks.getTime(), type);
switch (type) {
.....
case OpCode.setData:
//检查session
zks.sessionTracker .checkSession(request.sessionId, request.getOwner());
//将record转成SetDataRequest类型
SetDataRequest setDataRequest = ( SetDataRequest)record;
if (deserialize)
//将Request.reques数据反序列化成setDataRequest对象
ByteBufferInputStream.byteBuffer2Record(request. request, setDataRequest);
//获取需要需要修改的znode的path
path = setDataRequest.getPath();
//获取内存数据中获取path对于的znode信息
nodeRecord = getRecordForPath( path);
//检查对 znode是否有写权限
checkACL( zks, nodeRecord .acl , ZooDefs.Perms.WRITE,
request.authInfo);
//获取客户端设置的版本号
version = setDataRequest.getVersion();
//获取节点当前版本号
int currentVersion = nodeRecord.stat.getVersion();
//如果客户端设置的版本号不是-1,且不等于当前版本号,则抛出KeeperException.BadVersionException异常
if (version != -1 && version != currentVersion) {
throw new KeeperException .BadVersionException(path);
}
//version等于当前版本加1
version = currentVersion + 1;
//构建SetDataTxn对象,并赋给request.txn
request. txn = new SetDataTxn( path, setDataRequest.getData(), version);
//拷贝nodeRecord
nodeRecord = nodeRecord.duplicate(request.hdr.getZxid());
//将nodeRecord的当前版本号设置为version
nodeRecord.stat.setVersion( version);
//将nodeRecord放入outstandingChanges
//path和nodeRecord map放入outstandingChangesForPath
addChangeRecord( nodeRecord);
break ;
......
}
}
【Leader A, Step 5,6】处理器ProposalRequestProcessor会先判断Request对象是否是LearnerSyncRequest类型。
如果不是LearnerSyncRequest类型(也就是Quorum请求),会按如下步骤执行:
1)调用下一个处理器CommitProcessor的processRequest方法,将Request对象放入CommitProcessor.queuedRequests队列中;
2)将proposal发送到所有的Follower;