3)调用SyncRequestProcessor处理器的processRequest方法。该方法会将请求放入SyncRequestProcessor.queuedRequests队列中。(【Leader A, Step 7(1)】SyncRequestProcessor线程会记录Log, 然后传递给SendAckRequestProcessor。SendAckRequestProcessor会发送一个Leader.ACK的Quorum数据包给自己)
如果是LearnerSyncRequest类型,说明该请求是OpCode.sync操作,则会直接调用Leader.processSync方法。
ProposalRequestProcessor的processRequest方法如下:
public void processRequest(Request request) throws RequestPrzocessorException {
//如果是sync操作,则调用Leader.processSync方法
if (request instanceof LearnerSyncRequest){
zks.getLeader().processSync(( LearnerSyncRequest)request);
}
//如果不是sync操作
else {
//传递到下一个处理器
nextProcessor.processRequest(request);
if (request.hdr != null) {
// We need to sync and get consensus on any transactions
try {
//发送proposal给所有的follower
zks.getLeader().propose(request);
} catch (XidRolloverException e) {
throw new RequestProcessorException (e.getMessage(), e);
}
//调用SyncRequestProcessor处理器的processRequest方法
syncProcessor.processRequest(request);
}
}
}
Leader的propose方法如下:
/**
* 创建Proposal,并发送给所有的members
* @param request
* @return the proposal that is queued to send to all the members
*/
public Proposal propose(Request request) throws XidRolloverException {
//解决 rollover的问题,所有低32位重置表示一个新的leader选择。强制重新选择Leader。
//See ZOOKEEPER- 1277
if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
String msg =
"zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
shutdown(msg);
throw new XidRolloverException (msg);
}
//将request.hdr和request.txn序列化到boa中
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive. getArchive(baos);
try {
request.hdr.serialize(boa, "hdr" );
if (request.txn != null) {
request. txn.serialize(boa, "txn" );
}
baos.close();
} catch (IOException e) {
LOG.warn( "This really should be impossible" , e);
}
//构造Leader.PROPOSAL的QuorumPacket
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
baos.toByteArray(), null );
//构造Proposal对象
Proposal p = new Proposal();
p.packet = pp;
p.request = request;
synchronized (this ) {
if (LOG .isDebugEnabled()) {
LOG.debug( "Proposing:: " + request);
}