博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Job流程:提交MR-Job过程
阅读量:5153 次
发布时间:2019-06-13

本文共 12551 字,大约阅读时间需要 41 分钟。

1.一个标准 MR-Job 的执行入口:

//参数 true 表示检查并打印 Job 和 Task 的运行状况System.exit(job.waitForCompletion(true) ? 0 : 1);

2.job.waitForCompletion(true)方法的内部实现:

//job.waitForCompletion()方法的内部实现public boolean waitForCompletion(boolean verbose                                   ) throws IOException, InterruptedException,                                            ClassNotFoundException {    if (state == JobState.DEFINE) {      submit(); //此方法的核心在于submit()    }    if (verbose) { //根据传入的参数,决定是否打印Job运行的详细过程      monitorAndPrintJob();    } else {      // get the completion poll interval from the client.      int completionPollIntervalMillis =         Job.getCompletionPollInterval(cluster.getConf());      while (!isComplete()) {        try {          Thread.sleep(completionPollIntervalMillis);        } catch (InterruptedException ie) {        }      }}

3. Job 类 submit()方法的内部实现:

public void submit()          throws IOException, InterruptedException, ClassNotFoundException {    ensureState(JobState.DEFINE);       setUseNewAPI();//使用MapReduce新的API      connect();//返回一个【客户端代理对象Cluster】(属于Job类)用于和服务端NN建立RPC通信     final JobSubmitter submitter =         getJobSubmitter(cluster.getFileSystem(), cluster.getClient());    status = ugi.doAs(new PrivilegedExceptionAction
() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {  //提交Job  return submitter.submitJobInternal(Job.this, cluster); } }); state = JobState.RUNNING;//设置 JobStatus 为 Running   LOG.info("The url to track the job: " + getTrackingURL());}

3.1.1.查看Connect()方法的内部实现:

private synchronized void connect()          throws IOException, InterruptedException, ClassNotFoundException {    if (cluster == null) {      cluster =         ugi.doAs(new PrivilegedExceptionAction
() { public Cluster run() throws IOException, InterruptedException, ClassNotFoundException { //返回一个Cluster对象,并将此对象作为 Job 类的一个成员变量 //即 Job 类持有 Cluster 的引用。 return new Cluster(getConfiguration()); } }); }}

3.1.2.查看new Cluster()的实现过程:

public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)       throws IOException {    this.conf = conf;    this.ugi = UserGroupInformation.getCurrentUser();    initialize(jobTrackAddr, conf);//重点在于此方法的内部实现}

3.1.3.客户端代理对象Cluster实例化过程:

synchronized (frameworkLoader) {      for (ClientProtocolProvider provider : frameworkLoader) {        LOG.debug("Trying ClientProtocolProvider : "            + provider.getClass().getName()); //ClientProtocol是Client和NN通信的RPC协议,根据RPC通信原理,此协议接口中必定包含一个 versionID 字段。         ClientProtocol clientProtocol = null;         try {          if (jobTrackAddr == null) {
//provider创建YARNRunner对象 clientProtocol = provider.create(conf); } else { clientProtocol = provider.create(jobTrackAddr, conf); } if (clientProtocol != null) { //初始化Cluster内部成员变量 clientProtocolProvider = provider; client = clientProtocol; //创建Cluster类的客户端代理对象client LOG.debug("Picked " + provider.getClass().getName() + " as the ClientProtocolProvider"); break; } else { LOG.debug("Cannot pick " + provider.getClass().getName() + " as the ClientProtocolProvider - returned null protocol"); } } catch (Exception e) { LOG.info("Failed to use " + provider.getClass().getName() + " due to error: " + e.getMessage()); } } }

3.1.4.ClientProtocol接口中包含的versionID 字段

//Version 37: More efficient serialization format for framework counterspublic static final long versionID = 37L;

3.1.5.provider.create()方法创建【客户端代理对象】有两种实现方式:LocalClientProtocolProvider(本地模式,此处不做研究) 和 YarnClientProtocolProvider(Yarn模式)。

public ClientProtocol create(Configuration conf) throws IOException {    if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {      return new YARNRunner(conf);//实例化【客户端代理对象YARNRunner】    }    return null;}

3.1.6.new YARNRunner()方法的实现

其中,ResourceMgrDelegate实际上ResourceManager的代理类,其实现了YarnClient接口,通过ApplicationClientProtocol代理直接向RM提交Job,杀死Job,查看Job运行状态等操作。同时,在ResourceMgrDelegate类中会通过YarnConfiguration来读取yarn-site.xml、core-site.xml等配置文件中的配置属性。

public YARNRunner(Configuration conf) {   this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf)));  }
public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,      ClientCache clientCache) {    this.conf = conf;    try {      this.resMgrDelegate = resMgrDelegate;      this.clientCache = clientCache;      this.defaultFileContext = FileContext.getFileContext(this.conf);    } catch (UnsupportedFileSystemException ufe) {      throw new RuntimeException("Error in instantiating YarnClient", ufe);    }  }

3.2.1.查看 JobSubmitter 类中 submitJobInternal()方法的实现:

JobStatus submitJobInternal(Job job, Cluster cluster)   throws ClassNotFoundException, InterruptedException, IOException {    //检查job的输出路径是否存在,如果存在则抛出异常     checkSpecs(job); //返回存放Job相关资源【比如jar包,Job.xml,Splits文件等】路径的前缀     //默认位置 /tmp/hadoop-yarn/staging/root/.staging,可通过 yarn.app.mapreduce.am.staging-dir 修改        Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster,                                                      job.getConfiguration());    //获取从命令行配置的Job参数    Configuration conf = job.getConfiguration(); //获取客户端的主机名和IP     InetAddress ip = InetAddress.getLocalHost();    if (ip != null) {      submitHostAddress = ip.getHostAddress();      submitHostName = ip.getHostName();      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);    }     //通过RPC,向Yarn的ResourceManager申请JobID对象     JobID jobId = submitClient.getNewJobID();    job.setJobID(jobId); //将 存放路径的前缀 和 JobId 拼接成完整的【Job相关文件的存放路径】     Path submitJobDir = new Path(jobStagingArea, jobId.toString());    JobStatus status = null;    try {      conf.set(MRJobConfig.USER_NAME,          UserGroupInformation.getCurrentUser().getShortUserName());      conf.set("hadoop.http.filter.initializers",           "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());      LOG.debug("Configuring job " + jobId + " with " + submitJobDir           + " as the submit dir");      // get delegation token for the dir      TokenCache.obtainTokensForNamenodes(job.getCredentials(),          new Path[] { submitJobDir }, conf);            populateTokenCache(conf, job.getCredentials());      // generate a secret to authenticate shuffle transfers      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {        KeyGenerator keyGen;        try {          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);          keyGen.init(SHUFFLE_KEY_LENGTH);        } catch (NoSuchAlgorithmException e) {          throw new IOException("Error generating shuffle secret key", e);        }        SecretKey shuffleKey = keyGen.generateKey();        TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),            job.getCredentials());      }      //向集群中拷贝所需文件,默认写入 10 份(mapreduce.client.submit.file.replication)       copyAndConfigureFiles(job, submitJobDir);      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); // Create the splits for the job      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); //计算并确定map的个数,以及各个输入切片 Splits 的相关信息【后面详述】 int maps = writeSplits(job, submitJobDir);      conf.setInt(MRJobConfig.NUM_MAPS, maps);      LOG.info("number of splits:" + maps);      // write "queue admins of the queue to which job is being submitted"      // to job file.(设置调度队列名)      String queue = conf.get(MRJobConfig.QUEUE_NAME,          JobConf.DEFAULT_QUEUE_NAME);      AccessControlList acl = submitClient.getQueueAdmins(queue);      conf.set(toFullPropertyName(queue,          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());      // removing jobtoken referrals before copying the jobconf to HDFS      // as the tasks don't need this setting, actually they may break      // because of it if present as the referral will point to a      // different job.      TokenCache.cleanUpTokenReferral(conf);      if (conf.getBoolean(          MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,          MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {        // Add HDFS tracking ids        ArrayList
trackingIds = new ArrayList
(); for (Token
t : job.getCredentials().getAllTokens()) { trackingIds.add(t.decodeIdentifier().getTrackingId()); } conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS, trackingIds.toArray(new String[trackingIds.size()])); } // Write job file to submit dir //写入job.xml writeConf(conf, submitJobFile); // // Now, actually submit the job (using the submit name) //    printTokens(jobId, job.getCredentials()); //真正的提交任务方法submitJob()【详细分析】 status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials()); if (status != null) { return status; } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != null) jtFs.delete(submitJobDir, true); } } }

3.2.2.查看submitClient.submitJob()方法的实现:

  submitJob()方法是接口 ClientProtocol(RPC 协议)中的一个抽象方法。根据 RPC 原理,在【客户端代理对象submitClient】调用RPC协议中的submitJob()方法,此方法一定在服务端执行。该方法也有两种实现: LocalJobRunner(本地模式,略)和 YARNRunner(YARN模式)。

public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)  throws IOException, InterruptedException {        addHistoryToken(ts);        // 构建必要的信息,以启动 MR-AM    ApplicationSubmissionContext appContext =      createApplicationSubmissionContext(conf, jobSubmitDir, ts);    //提交Job到RM,返回applicationId    try {      ApplicationId applicationId =          resMgrDelegate.submitApplication(appContext);      ApplicationReport appMaster = resMgrDelegate          .getApplicationReport(applicationId);      String diagnostics =          (appMaster == null ?              "application report is null" : appMaster.getDiagnostics());      if (appMaster == null          || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED          || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {        throw new IOException("Failed to run job : " +            diagnostics);      }      //最后返回 Job 此时的状态,函数退出      return clientCache.getClient(jobId).getJobStatus(jobId);    } catch (YarnException e) {      throw new IOException(e);    }}

总结:

1.为什么会产生Yarn?

  Hadoop1.0生态几乎是以MapReduce为核心的,其扩展性差、资源利用率低、可靠性等问题都越来越让人觉得不爽,于是才产生了Yarn,并且Hadoop2.0生态都是以Yarn为核心。Storm、Spark等都可以基于Yarn使用。

2.Configuration类的作用是什么?

  配置文件类Configuration,是Hadoop各个模块的公共使用类,用于加载类路径下的各种配置文件,读写其中的配置选项。

3.GenericOptionsParser类的作用是什么?  
4.如何将命令行中的参数配置到变量conf中?
5.哪个方法会获得传入的参数?

  GenericOptionsParser类是将命令行中参数自动设置到变量conf中。其构造方法内部调用parseGeneralOptions()对传入的参数进行解析。

Configuration conf = new Configuration();        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

6.如何在命令行指定reduce的个数?

  命令行配置参数的规则是:-D加MapReduce的配置选项,当然还支持-fs等其他参数传入。

7.默认情况map、reduce为几?

  默认情况下Reduce的数目为1,Map的数目也为1。

8.setJarByClass的作用是什么?

  setJarByClass()首先判断当前Job的状态是否是运行中,接着通过class找到其所属的jar文件,将jar路径赋值给mapreduce.job.jar属性。至于寻找jar文件的方法,则是通过classloader获取类路径下的资源文件,进行循环遍历。具体实现见ClassUtil类中的findContainingJar方法。

9.如果想在控制台打印job(maoreduce)当前的进度,需要设置哪个参数?

  如果想在控制台打印当前的进度,则设置job.waitForCompletion(true)的参数为true。

10.配置了哪个参数,在提交job的时候,会创建一个YARNRunner对象来进行任务的提交?

  如果当前在HDFS的配置文件中配置了mapreduce.framework.name属性为“yarn”的话,会创建一个YARNRunner对象来进行任务的提交。

11.哪个类实现了读取yarn-site.xml、core-site.xml等配置文件中的配置属性的?  
12.JobSubmitter类中的哪个方法实现了把job提交到集群?

  JobSubmitter类中的submitJobInternal()方法。

13.DistributedCache在mapreduce中发挥了什么作用?

  文件上传到HDFS之后,还要被DistributedCache进行缓存起来。这是因为计算节点收到该作业的第一个任务后,就会用DistributedCache自动将作业文件Cache到节点本地目录下,并且会对压缩文件进行解压,如:.zip,.jar,.tar等等,然后开始任务。最后,对于同一个计算节点接下来收到的任务,DistributedCache不会重复去下载作业文件,而是直接运行任务。如果一个作业的任务数很多,这种设计避免了在同一个节点上对用一个job的文件会下载多次,大大提高了任务运行的效率。

14.对每个输入文件进行split划分,是物理划分还是逻辑划分,他们有什么区别?

  逻辑划分。存储时分块Block是物理划分。

15.分片的大小有哪些因素来决定?

  
16.分片是如何计算得来的?
  三个参数,详见
  

转载于:https://www.cnblogs.com/skyl/p/4746446.html

你可能感兴趣的文章
201421410014蒋佳奇
查看>>
Xcode5和ObjC新特性
查看>>
Centos 7.0 安装Mono 3.4 和 Jexus 5.6
查看>>
CSS属性值currentColor
查看>>
Real-Time Rendering 笔记
查看>>
实验四2
查看>>
多路复用
查看>>
Java学习笔记--字符串和文件IO
查看>>
在js在添版本号
查看>>
sublime3
查看>>
JIRA
查看>>
小技巧——直接在目录中输入cmd然后就打开cmd命令窗口
查看>>
深浅拷贝(十四)
查看>>
HDU 6370(并查集)
查看>>
BZOJ 1207(dp)
查看>>
Attributes.Add用途与用法
查看>>
L2-001 紧急救援 (dijkstra+dfs回溯路径)
查看>>
javascript 无限分类
查看>>
spring IOC装配Bean(注解方式)
查看>>
[面试算法题]有序列表删除节点-leetcode学习之旅(4)
查看>>