/**
*当接收到一个COMMIT消息,这个方法会被调用。该方法会将COMMIT消息
*中的zxid和pendingTxns队列中的第一个对象的zxid进行匹配。如何相同,则
*传递给处理器CommitProcessor进行commit
* @param zxid - must correspond to the head of pendingTxns if it exists
*/
public void commit( long zxid ) {
if (pendingTxns .size() == 0) {
LOG.warn( "Committing " + Long. toHexString (zxid)
+ " without seeing txn" );
return ;
}
//取��pendingTxns第一个元素的 zxid
long firstElementZxid = pendingTxns .element().zxid;
//如果第一个元素的 zxid不等于COMMIT消息中的 zxid, 则退出程序
if (firstElementZxid != zxid) {
LOG.error( "Committing zxid 0x" + Long. toHexString (zxid)
+ " but next pending txn 0x"
+ Long. toHexString(firstElementZxid));
System. exit(12);
}
//pendingTxns取出,并删除第一个元素
Request request = pendingTxns .remove();
//将从pendingTxns队列中取出的第一个 reqeust对象传递给CommitProcessor处理器进行commit
commitProcessor.commit(request);
}
【All Follower, Step 11】处理器CommitProcessor线程会处理提交的Request对象。
如果是Follower A, nextPending对象是和提交Request对象是一致的,所以将提交Request对象内容替换nextPending中的内容,并放入toProcess队列中。在下一个循环会从toProcess队列中取出并传递到下一个迭代器FinalRequestProcessor中。(和Leader中的CommitProcessor线程处理逻辑是一样的)
如果不是Follower A, 则可能有下面两种情况:
1)queuedRequest队列为empty且nextPending为null, 也就是这个Follower没有自己转发的request正在处理;
2)nextPending不为null, 也就是有转发的request正在处理。但nextPending对象一定和提交的Request对象是不一致的。
不管是哪一种,都会直接将提交的Request对象加入到toProcess队列中。处理器CommitProcessor线程会从中取出并传递到下一个迭代器FinalRequestProcessor中。
CommitProcessor.run方法如下:
public void run() {
try {
Request nextPending = null;
while (!finished ) {
int len = toProcess .size();
for (int i = 0; i < len; i++) {
nextProcessor.processRequest( toProcess .get(i));
}
//当将所有的request传递到下一个处理器FinalRequestProcessor后,将toProcess清空
toProcess.clear();
synchronized (this ) {
//如果queuedRequests队列为空,或者nextPending为null, 或者committedRequest队列为控股,则等待。
if ((queuedRequests .size() == 0 || nextPending != null )
&& committedRequests.size() == 0) {
wait();
continue ;
}
//第一步,检查这个commit是否为了pending request而来
//如果commit request到来,但是queuedRequests为空,或者nextPending为null
if ((queuedRequests .size() == 0 || nextPending != null )
&& committedRequests.size() > 0) {
Request r = committedRequests .remove();
/*
* We match with nextPending so that we can move to the
* next request when it is committed. We also want to
* use nextPending because it has the cnxn member set
* properly.
*/
//如果nextPending不等于null,
if (nextPending != null
&& nextPending. sessionId == r.sessionId
&& nextPending. cxid == r.cxid ) {
// we want to send our version of the request.
// the pointer to the connection in the request
nextPending.hdr = r. hdr;
nextPending. txn = r.txn ;
nextPending. zxid = r.zxid ;
toProcess.add(nextPending);
nextPending = null ;
} else {
// this request came from someone else so just
// send the commit packet
//如果这个请求来自于其他人,则直接加入到toProcess中
//sync请求,或者不是Follower发起的请求
toProcess.add(r);
}
}
}