ZooKeeper源码分析:Quorum请求的整个流程(5)

3)调用SyncRequestProcessor处理器的processRequest方法。该方法会将请求放入SyncRequestProcessor.queuedRequests队列中。(【Leader A, Step 7(1)】SyncRequestProcessor线程会记录Log, 然后传递给SendAckRequestProcessor。SendAckRequestProcessor会发送一个Leader.ACK的Quorum数据包给自己)

如果是LearnerSyncRequest类型,说明该请求是OpCode.sync操作,则会直接调用Leader.processSync方法。

ProposalRequestProcessor的processRequest方法如下:

public void processRequest(Request request) throws RequestPrzocessorException {
        //如果是sync操作,则调用Leader.processSync方法
        if (request instanceof LearnerSyncRequest){
            zks.getLeader().processSync(( LearnerSyncRequest)request);
        }
        //如果不是sync操作
        else {
            //传递到下一个处理器
            nextProcessor.processRequest(request);
            if (request.hdr != null) {
                // We need to sync and get consensus on any transactions
                try {
                    //发送proposal给所有的follower
                    zks.getLeader().propose(request);
                } catch (XidRolloverException e) {
                    throw new RequestProcessorException (e.getMessage(), e);
                }
                //调用SyncRequestProcessor处理器的processRequest方法
                syncProcessor.processRequest(request);
            }
        }
    }
Leader的propose方法如下:
    /**
    * 创建Proposal,并发送给所有的members
    * @param request
    * @return the proposal that is queued to send to all the members
    */
    public Proposal propose(Request request) throws XidRolloverException {


        //解决 rollover的问题,所有低32位重置表示一个新的leader选择。强制重新选择Leader。
        //See ZOOKEEPER- 1277
        if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
            String msg =
                    "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
            shutdown(msg);
            throw new XidRolloverException (msg);
        }
     
        //将request.hdr和request.txn序列化到boa中
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        BinaryOutputArchive boa = BinaryOutputArchive. getArchive(baos);
        try {
            request.hdr.serialize(boa, "hdr" );
            if (request.txn != null) {
                request. txn.serialize(boa, "txn" );
            }
            baos.close();
        } catch (IOException e) {
            LOG.warn( "This really should be impossible" , e);
        }
        //构造Leader.PROPOSAL的QuorumPacket
        QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
                baos.toByteArray(), null );
        //构造Proposal对象
        Proposal p = new Proposal();
        p.packet = pp;
        p.request = request;
        synchronized (this ) {
            if (LOG .isDebugEnabled()) {
                LOG.debug( "Proposing:: " + request);
            }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/bf8a6d0ea7831f3c07af4e6087dbcc6f.html