Hadoop JobTracker提交job源码浅析(2)

6.JobTracker.main():在实例化jobTracker之后,会调用tracker.offerService()方法,之后main方法就没什么了,下面看看tracker.offerService()这个方法。

public static void main(String argv[]
                          ) throws IOException, InterruptedException {
    StringUtils.startupShutdownMessage(JobTracker.class, argv, LOG);
   
    try {
      if(argv.length == 0) {
        JobTracker tracker = startTracker(new JobConf());
        tracker.offerService();
      }
      else {
        if ("-dumpConfiguration".equals(argv[0]) && argv.length == 1) {
          dumpConfiguration(new PrintWriter(System.out));
        }
        else {
          System.out.println("usage: JobTracker [-dumpConfiguration]");
          System.exit(-1);
        }
      }
    } catch (Throwable e) {
      LOG.fatal(StringUtils.stringifyException(e));
      System.exit(-1);
    }
  }

7.JobTracker.offerService():这个方法中有一些其他东西,略掉,只看taskScheduler.start()这个方法,因为这里只是想分析下JobTracker提交job的过程,所以省去很多复杂的东西。

8.taskScheduler.start():这个方法就是启动TaskScheduler,这个方法不同taskScheduler也不同,但是统一的还是会有一个taskTrackerManager.addJobInProgressListener(jobListener)这个操作,taskTrackerManager就是jobTracker(第5步),这句的意思是为jobTracker添加jobListener,用来监听job的。这句的内部就是调用jobTracker的jobInProgressListeners集合的add(listener)方法。

到这里可以说看完了整个JobTracker的启动过程,虽然很浅显,但是对于后面将要分析的内容,这些就够了。下面来看看job的提交过程,也就是jobTracker的submit()方法。

1.jobTracker.submit():第一步是checkSafeMode(),检查是否在安全模式,在安全模式则抛出异常。然后执行jobInfo = new JobInfo(jobId, new Text(ugi.getShortUserName()),new Path(jobSubmitDir),生成一个jobInfo对象,jobInfo主要保存job的id,user,jobSubmitDir(也就是job的任务目录,上一篇文章提到)。接着是判断job是否可被recovered(job失败的时候尝试再次执行),如果允许的话(默认允许),则将jobInfo对象序列化到job-info文件中。接着到达最关键的地方,job = new JobInProgress(this, this.conf, jobInfo, 0, ts),为job实例化一个JobInProgress对象,这个对象将会对job以后的所有情况进行负责,如初始化,执行等。下面看看JobInProgress对象的初始化操作。

2.JobInProgress:这里看下将job.xml下载到本地的操作。然后就是job的队列信息,默认的队列名是default,Queue queue = this.jobtracker.getQueueManager().getQueue(queueName),这个主要是根据Hadoop所使用的taskScheduler有关,具体不研究。剩下的是一些参数的初始化,如map的数目,reduce的数目等。这里还有个设置job的优先级的,默认是normal。this.priority = conf.getJobPriority();this.status.setJobPriority(this.priority);还有检查taskLimit的操作,就是检查map+reduce的任务数是否超出mapred.jobtracker.maxtasks.per.job设置的值,默认是-1,就是没有限制的意思。回到jobTracker.submit()方法

this.localJobFile = default_conf.getLocalPath(JobTracker.SUBDIR
          +"/"+jobId + ".xml");
      Path jobFilePath = JobSubmissionFiles.getJobConfPath(jobSubmitDir);
      jobFile = jobFilePath.toString();
      fs.copyToLocalFile(jobFilePath, localJobFile);
      conf = new JobConf(localJobFile);

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/67d76d6fd816a2fbe1b1d392c3c9baca.html