ZooKeeper源码分析:Quorum请求的整个流程(3)

public void run () {
        ......
        case Leader.REQUEST :
          bb = ByteBuffer. wrap(qp .getData());
          //从QuorumPacket中读取sesssionId
          sessionId = bb.getLong();
          //从QuorumPacket中读取 cxid
          cxid = bb.getInt();
          //从QuorumPacket中读取操作类型
          type = bb.getInt();
          bb = bb.slice();
          Request si;
          //如果操作Code的类型是OpCode.sync,则构造LearnerSyncRequest对象
          if (type == OpCode.sync){
              si = new LearnerSyncRequest( this , sessionId, cxid, type , bb, qp.getAuthinfo());
          }
            //如果操作Code的类型不是OpCode.sync, 则构造Request对象
          else {
              si = new Request( null , sessionId, cxid, type , bb, qp.getAuthinfo());
          }


          //设置owner
          si.setOwner( this );
          //提交请求
          leader.zk .submitRequest(si);
          break ;
      ......
  }
PrepRequestProcessor处理器线程会从PrepRequestProcessor.submittedRequests队列中取出Request对象,并根据Request类型构建TxnHeader和Record对象,然后分别赋给Request.hdr和Request.txn。之后会调用下一个处理器ProposalRequestProcessor的processRequest方法,将Request对象传递给处理器ProposalRequestProcessor。(如果发现有异常会则会创建一个错误Record类型对象)

PrepRequestProcessor的run方法如下:

public void run() {
        try {
            while (true ) {
                //从submittedRequests队列中取去第一个request对象
                Request request = submittedRequests .take();
                long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
                //如果是OpCode.ping操作,则将traceMask设置成ZooTrace. CLIENT_PING_TRACE_MASK
                if (request.type == OpCode.ping) {
                    traceMask = ZooTrace. CLIENT_PING_TRACE_MASK;
                }
                if (LOG .isTraceEnabled()) {
                    ZooTrace. logRequest( LOG, traceMask, 'P' , request, "" );
                }
                //如果request是一个requestOfDeath, 则退出while循环。
                if (Request.requestOfDeath == request) {
                    break ;
                }
                //处理请求
                pRequest(request);
            }
        } catch (InterruptedException e) {
            LOG.error( "Unexpected interruption" , e);
        } catch (RequestProcessorException e) {
            if (e.getCause() instanceof XidRolloverException) {
                LOG.info(e.getCause().getMessage());
            }
            LOG.error( "Unexpected exception" , e);
        } catch (Exception e) {
            LOG.error( "Unexpected exception" , e);
        }
        LOG.info( "PrepRequestProcessor exited loop!" );
    } 
   
PrepRequestProcessor的pRequest2Txn方法,该方法会在pRequest方法中调用,构建TxnHeader和Record对象。下面是关于OpCode.setData请求的代码:

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/bf8a6d0ea7831f3c07af4e6087dbcc6f.html