public void run () {
......
case Leader.REQUEST :
bb = ByteBuffer. wrap(qp .getData());
//从QuorumPacket中读取sesssionId
sessionId = bb.getLong();
//从QuorumPacket中读取 cxid
cxid = bb.getInt();
//从QuorumPacket中读取操作类型
type = bb.getInt();
bb = bb.slice();
Request si;
//如果操作Code的类型是OpCode.sync,则构造LearnerSyncRequest对象
if (type == OpCode.sync){
si = new LearnerSyncRequest( this , sessionId, cxid, type , bb, qp.getAuthinfo());
}
//如果操作Code的类型不是OpCode.sync, 则构造Request对象
else {
si = new Request( null , sessionId, cxid, type , bb, qp.getAuthinfo());
}
//设置owner
si.setOwner( this );
//提交请求
leader.zk .submitRequest(si);
break ;
......
}
PrepRequestProcessor处理器线程会从PrepRequestProcessor.submittedRequests队列中取出Request对象,并根据Request类型构建TxnHeader和Record对象,然后分别赋给Request.hdr和Request.txn。之后会调用下一个处理器ProposalRequestProcessor的processRequest方法,将Request对象传递给处理器ProposalRequestProcessor。(如果发现有异常会则会创建一个错误Record类型对象)
PrepRequestProcessor的run方法如下:
public void run() {
try {
while (true ) {
//从submittedRequests队列中取去第一个request对象
Request request = submittedRequests .take();
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
//如果是OpCode.ping操作,则将traceMask设置成ZooTrace. CLIENT_PING_TRACE_MASK
if (request.type == OpCode.ping) {
traceMask = ZooTrace. CLIENT_PING_TRACE_MASK;
}
if (LOG .isTraceEnabled()) {
ZooTrace. logRequest( LOG, traceMask, 'P' , request, "" );
}
//如果request是一个requestOfDeath, 则退出while循环。
if (Request.requestOfDeath == request) {
break ;
}
//处理请求
pRequest(request);
}
} catch (InterruptedException e) {
LOG.error( "Unexpected interruption" , e);
} catch (RequestProcessorException e) {
if (e.getCause() instanceof XidRolloverException) {
LOG.info(e.getCause().getMessage());
}
LOG.error( "Unexpected exception" , e);
} catch (Exception e) {
LOG.error( "Unexpected exception" , e);
}
LOG.info( "PrepRequestProcessor exited loop!" );
}
PrepRequestProcessor的pRequest2Txn方法,该方法会在pRequest方法中调用,构建TxnHeader和Record对象。下面是关于OpCode.setData请求的代码: