FairScheduler job初始化过程源码浅析(2)

4.JobTracker.initJob():主要调用job.initTasks(),下面进入到JobInProgress.initTasks()。

5.JobInProgress.initTasks():为job对象设置优先级setPriority(this.priority),接着读取分片信息文件获取分片信息,SplitMetaInfoReader.readSplitMetaInfo()这个方就是jobInPorgress用来读取分分片信息的,读取过程与写入过程相对应,具体还是较简单的。读取了分片信息之后,根据分片数量创建相应数量的mapTask(TaskInProgress对象),接下来会执行nonRunningMapCache = createCache(splits, maxLevel),这个方法是根据每个分片的location信息,然后根据location的host判断每个host上所有的job,并放入cache中。接着根据设置的reduce数量新建对应的reduceTask(TaskInProgress对象),并加入到nonRunningReduces队列中,并根据mapred.reduce.slowstart.completed.maps(百分比,默认是5%)参数的值计算completedMapsForReduceSlowstart(多少map任务完成的时候启动reduce任务)。之后就是分别新建两个setUp任务和cheanUp任务,分别对应map和reduce task。到此initTask完成,initTask完成JobTracker的initJob也就差不多完成了,接着FairScheduler的updateRunnability()也就完成了。回到FairScheduler.update()。

6.FairScheduler.update():

for (Pool pool: poolMgr.getPools()) {
        pool.getMapSchedulable().updateDemand();
        pool.getReduceSchedulable().updateDemand();
      }
     
      // Compute fair shares based on updated demands
      List<PoolSchedulable> mapScheds = getPoolSchedulables(TaskType.MAP);
      List<PoolSchedulable> reduceScheds = getPoolSchedulables(TaskType.REDUCE);
      SchedulingAlgorithms.computeFairShares(
          mapScheds, clusterStatus.getMaxMapTasks());
      SchedulingAlgorithms.computeFairShares(
          reduceScheds, clusterStatus.getMaxReduceTasks());
     
      // Use the computed shares to assign shares within each pool
      for (Pool pool: poolMgr.getPools()) {
        pool.getMapSchedulable().redistributeShare();
        pool.getReduceSchedulable().redistributeShare();
      }
     
      if (preemptionEnabled)
        updatePreemptionVariables();
    }

看不懂,先到这吧,等下次慢慢研究吧。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/b863d2b6f31e8e2afdef814c23e98022.html