FairScheduler job初始化过程源码浅析(3)

把上次遗留的问题继续研究一下。

for (Pool pool: poolMgr.getPools()) {
        pool.getMapSchedulable().updateDemand();
        pool.getReduceSchedulable().updateDemand();
      }

这里是更新每个pool的slot需求情况,下面来看看,pool.getMapSchedulable().updateDemand(),pool.getReduceSchedulable().updateDemand()两个基本相同。

7.PoolSchedulable.updateDemand():第一句poolMgr.getMaxSlots(pool.getName(), taskType)是获取pool的最大slot数量,从配置文件获取,配置文件是之前加载过的,前面有说到。每个PoolSchedulable中都会存在多个JobSchedulable对象,在JobListener.addJob()时添加。一个JobSchedulable对应一个jobInProgress对象。然后调用JobSchedulable.updateDemand()更新每个JobSchedulable的slot的需求。

public void updateDemand() {
    // limit the demand to maxTasks
    int maxTasks = poolMgr.getMaxSlots(pool.getName(), taskType);
    demand = 0;
    for (JobSchedulable sched: jobScheds) {
      sched.updateDemand();
      demand += sched.getDemand();
      if (demand >= maxTasks) {
        demand = maxTasks;
        break;
      }
    }
    if (LOG.isDebugEnabled()) {
      LOG.debug("The pool " + pool.getName() + " demand is " + demand
          + "; maxTasks is " + maxTasks);
    }
  }

8.JobSchedulable.updateDemand():首先第一步就是判断该JobSchedulable的job是否已运行(RUNNING),没有运行则不分配slot。然后判断该JobSchedulable是Map还是Reduce,如果是Reduce则需先判断完成的Map数量(finishedMapTasks)数量+失败的Map(failedMapTIPs)数量>=completedMapsForReduceSlowstart(由"mapred.reduce.slowstart.completed.maps参数值*numMapTasks),满足则表示Reduce任务可以启动,否则不可启动。而对于Map任务直接计算其slot需求。TaskInProgress[] tips = (taskType == TaskType.MAP ? job.getTasks(TaskType.MAP) : job.getTasks(TaskType.REDUCE)),获取对应的taskInPorgress数量(tip),boolean speculationEnabled = (taskType == TaskType.MAP ?job.getMapSpeculativeExecution() : job.getReduceSpeculativeExecution())判断是否启用推测执行,double avgProgress = (taskType == TaskType.MAP ?job.getStatus().mapProgress() : job.getStatus().reduceProgress())获取map/reduce任务的进度,即map/reduce已完成多少,之后计算每个taskInProgress的slot需求。如果taskInProgress未完成则正在运行中,则demand += tip.getActiveTasks().size()计算出所需的slot数量,而tip的ActiveTasks则是任务调用的时候,即调用tip.addRunningTask()方法时添加的,而该方法的调用者则是FairScheduler的assignTasks()方法,即方法调度。获取到tip的activeTasks数量,则就是该tip所需要的slot数量,同时如果启用了推测执行,则还需多加一个slot用于推测执行任务,这样就获得了一个JobSchedulable所需的总slot数量,求和即为这个pool所需的总slot数量,当所需数量大于maxTasks(该pool所拥有的最大slot数),则返回。继续回到FairScheduler.update()方法。

9.FairScheduler.update():

List<PoolSchedulable> mapScheds = getPoolSchedulables(TaskType.MAP);
      List<PoolSchedulable> reduceScheds = getPoolSchedulables(TaskType.REDUCE);
      SchedulingAlgorithms.computeFairShares(
          mapScheds, clusterStatus.getMaxMapTasks());
      SchedulingAlgorithms.computeFairShares(
          reduceScheds, clusterStatus.getMaxReduceTasks());
     
      // Use the computed shares to assign shares within each pool
      for (Pool pool: poolMgr.getPools()) {
        pool.getMapSchedulable().redistributeShare();
        pool.getReduceSchedulable().redistributeShare();
      }
     
      if (preemptionEnabled)
        updatePreemptionVariables();

这里涉及的就是FairScheduler的核心之处——资源分配算法。先看看前两句,前两句就是获取所有的MapPoolSchedulable和ReducePoolSchedulable,一个pool中分别包含一个MapPoolSchedulable和ReducePoolSchedulable。下面两句就是具体的资源分配,调用的是SchedulingAlgorithms类进行资源分配的。

10.SchedulingAlgorithms.computeFairShares():

private static double slotSUSEdWithWeightToSlotRatio(double w2sRatio,
      Collection<? extends Schedulable> schedulables) {
    double slotsTaken = 0;
    for (Schedulable sched: schedulables) {
      double share = computeShare(sched, w2sRatio);
      slotsTaken += share;
    }
    return slotsTaken;
  }

调用computeShare()方法根据job的weight和w2sRatio(相当于总权重,1.0)计算每个Schedulable根据权重应该获得slot数量。

11.SchedulingAlgorithms.computeShare():第一句double share = sched.getWeight() * w2sRatio,获取Pool的权重,该权重是在fair-scheduler.xml中设置pool时为pool设置了weigth,默认是1.0。获得job权重之后,根据weigth*w2sRatio获得一个share值,然后share=Math.max(share, sched.getMinShare())(minShare默认是0),share = Math.min(share, sched.getDemand()),即获得share值。

public double getJobWeight(JobInProgress job, TaskType taskType) {
    if (!isRunnable(job)) {
      // Job won't launch tasks, but don't return 0 to avoid division errors
      return 1.0;
    } else {
      double weight = 1.0;
      if (sizeBasedWeight) {
        // Set weight based on runnable tasks
        JobInfo info = infos.get(job);
        int runnableTasks = (taskType == TaskType.MAP) ?
            info.mapSchedulable.getDemand() :
            info.reduceSchedulable.getDemand();
        weight = Math.log1p(runnableTasks) / Math.log(2);
      }
      weight *= getPriorityFactor(job.getPriority());
      if (weightAdjuster != null) {
        // Run weight through the user-supplied weightAdjuster
        weight = weightAdjuster.adjustWeight(job, taskType, weight);
      }
      return weight;
    }
  }
private static double computeShare(Schedulable sched, double w2sRatio) {
    double share = sched.getWeight() * w2sRatio;
    share = Math.max(share, sched.getMinShare());
    share = Math.min(share, sched.getDemand());
    return share;
  }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/b863d2b6f31e8e2afdef814c23e98022.html