Hadoop 生态圈(二十七)- MapReduce Job 提交源码分析
|字数总计:2.7k|阅读时长:11分钟|阅读量:
前言
部分内容摘自尚硅谷、黑马等等培训资料
1. Debug环境准备
1.1 Debug代码:MR经典入门案例WordCount
1.1.1 Mapper类
1 2 3 4 5 6 7 8 9 10
| public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split("\\s+"); for (String word : words) { context.write(new Text(word),new LongWritable(1)); } } }
|
1.1.2 Reducer类
1 2 3 4 5 6 7 8 9 10
| public class WordCountReducer extends Reducer<Text, LongWritable,Text,LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count = 0; for (LongWritable value : values) { count +=value.get(); } context.write(key,new LongWritable(count)); } }
|
1.1.3 程序运行的主类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| public class WordCountDriver extends Configured implements Tool {
@Override public int run(String[] args) throws Exception { Job job = Job.getInstance(getConf(), WordCountDriver.class.getSimpleName()); job.setJarByClass(this.getClass());
job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileSystem fs = FileSystem.get(getConf()); if(fs.exists(new Path(args[1]))){ fs.delete(new Path(args[1]),true); }
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); int status = ToolRunner.run(conf, new WordCountDriver(), args); System.exit(status); } }
|
2. MapReduce Job提交源码追踪
Debug 功能的使用方法可参考:《IntelliJ IDEA Debug工具的使用》
2.1 MapReduce程序入口方法
作为使用 java 语言编写的 MapReduce 程序,其入口方法为 main 方法。在 main 方法中,使用了 ToolRunner 启动运行了 MapReduce客户端主类,其逻辑实现定义在run方法中
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(getConf(), WordCountDriver.class.getSimpleName()); job.setJarByClass(this.getClass()); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); FileSystem fs = FileSystem.get(getConf()); if(fs.exists(new Path(args[1]))){ fs.delete(new Path(args[1]),true); } return job.waitForCompletion(true) ? 0 : 1; }
|
2.2 job.waitForCompletion
客户端的最后执行了Job.waitForCompletion()
方法,从名字上可以看出该方法的功能是等待 MR 程序执行完毕。进入该方法内部:
在判断状态 state 可以提交 Job 后,执行submit()
方法。monitorAndPrintJob()
方法会不断的刷新获取 job 运行的进度信息,并打印。boolean 参数 verbose 为 true 表明要打印运行进度,为 false 就只是等待 job 运行结束,不打印运行日志。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException,ClassNotFoundException { if (state == JobState.DEFINE) { submit(); } if (verbose) { monitorAndPrintJob(); } else { int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf()); while (!isComplete()) { try { Thread.sleep(completionPollIntervalMillis); } catch (InterruptedException ie) { } } } return isSuccessful(); }
|
2.3 job.submit
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); setUseNewAPI(); connect(); final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { return submitter.submitJobInternal(Job.this, cluster); } }); state = JobState.RUNNING; LOG.info("The url to track the job: " + getTrackingURL()); }
|
2.3.1 connect
MapReduce 作业提交时,连接集群是通过Job的connect()方法实现的
,它实际上是构造集群Cluster实例cluster
。Cluster为连接MapReduce集群的一种工具,提供了一种获取MapReduce集群信息的方法
。
在 Cluster 内部,有一个与集群进行通信的客户端通信协议 ClientProtocol 实例 client,它由 ClientProtocolProvider 的静态 create() 方法构造,而 Hadoop2.x 中提供了两种模式的 ClientProtocol,分别为 Yarn 模式的 YARNRunner 和 Local 模式的 LocalJobRunner,Cluster 实际上是由它们负责与集群进行通信的,而 Yarn 模式下,ClientProtocol 实例 YARNRunner 对象内部有一个 ResourceManager 代理 ResourceMgrDelegate 实例 resMgrDelegate,Yarn 模式下整个 MapReduce 客户端就是由它负责与 Yarn 集群进行通信,完成诸如作业提交、作业状态查询等过程,通过它获取集群的信息。
1 2 3 4 5 6 7 8 9
| private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException { if (cluster == null) { cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() { public Cluster run() throws IOException, InterruptedException, ClassNotFoundException { return new Cluster(getConfiguration()); } }); } }
|
2.3.1.1 Cluster
Cluster 类中最重要的两个成员变量是客户端通信协议提供者ClientProtocolProvider、客户端通信协议ClientProtocol,实例叫做client
,而后者是依托前者的 create() 方法生成的。
在 ClientProtocol 中,定义了很多方法,客户端可以使用这些方法进行 job 的提交、杀死、或是获取一些程序状态信息。
在 Cluster 的构造方法中,完成了初始化的动作。
2.3.1.2 initialize
在 Cluster 类的构造方法中,调用了 initialize 初始化方法。依次取出每个 ClientProtocolProvider,通过其 create() 方法构造 ClientProtocol 实例。如果配置文件没有配置 YARN 信息,则构建 LocalRunner,MR 任务本地运行,如果配置文件有配置 YARN 信息,则构建 YarnRunner,MR 任务在 YARN 集群上运行。
2.3.2 ClientProtocolProvider
上面 create() 方法时提到了两种 ClientProtocolProvider 实现类。
MapReduce 中,ClientProtocolProvider 抽象类的实现共有 YarnClientProtocolProvider、LocalClientProtocolProvider 两种,前者为 Yarn 模式,而后者为 Local 模式。
Cluster 中客户端通信协议 ClientProtocol 实例,要么是 Yarn 模式下的YARNRunner
,要么就是 Local 模式下的LocalJobRunner
。
2.3.2.1 LocalClientProtocolProvider
2.3.2.2 YarnClientProtocolProvider
YARNRunner 中最重要的一个变量就是 ResourceManager 的代理 ResourceMgrDelegate 类型的resMgrDelegate
实例。
Yarn 模式下整个 MapReduce 客户端就是由它负责与 Yarn 集群进行通信,完成诸如作业提交、作业状态查询等过程,通过它获取集群的信息,其内部有一个实例YarnClient
,负责与 Yarn 进行通信,还有 ApplicationId、ApplicationSubmissionContext 等与特定应用程序相关的成员变量。
2.3.3 submitJobInternal
在 submit 方法的最后,调用了提交器submitter.submitJobInternal方法进行任务的提交
。它是提交Job的内部方法,实现了提交 Job 的所有业务逻辑。
JobSubmitter 的类一共有四个类成员变量,分别为:
- 文件系统 FileSystem 实例 jtFs:用于操作作业运行需要的各种文件等;
- 客户端通信协议 ClientProtocol 实例 submitClient:用于与集群交互,完成作业提交、作业状态查询等。
- 提交作业的主机名 submitHostName;
- 提交作业的主机地址 submitHostAddress。
下面就是提交任务的核心代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
| JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {
checkSpecs(job); Configuration conf = job.getConfiguration(); addMRFrameworkToDistributedCache(conf); Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); InetAddress ip = InetAddress.getLocalHost(); if (ip != null) { submitHostAddress = ip.getHostAddress(); submitHostName = ip.getHostName(); conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress); } JobID jobId = submitClient.getNewJobID(); job.setJobID(jobId); Path submitJobDir = new Path(jobStagingArea, jobId.toString()); JobStatus status = null; try { conf.set(MRJobConfig.USER_NAME, UserGroupInformation.getCurrentUser().getShortUserName()); conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer"); conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString()); LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir"); TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { submitJobDir }, conf); populateTokenCache(conf, job.getCredentials()); if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) { KeyGenerator keyGen; try { keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM); keyGen.init(SHUFFLE_KEY_LENGTH); } catch (NoSuchAlgorithmException e) { throw new IOException("Error generating shuffle secret key", e); } SecretKey shuffleKey = keyGen.generateKey(); TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), job.getCredentials()); } if (CryptoUtils.isEncryptedSpillEnabled(conf)) { conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1); LOG.warn("Max job attempts set to 1 since encrypted intermediate" + "data spill is enabled"); } copyAndConfigureFiles(job, submitJobDir); Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); int maps = writeSplits(job, submitJobDir); conf.setInt(MRJobConfig.NUM_MAPS, maps); LOG.info("number of splits:" + maps); int maxMaps = conf.getInt(MRJobConfig.JOB_MAX_MAP, MRJobConfig.DEFAULT_JOB_MAX_MAP); if (maxMaps >= 0 && maxMaps < maps) { throw new IllegalArgumentException("The number of map tasks " + maps + " exceeded limit " + maxMaps); } String queue = conf.get(MRJobConfig.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME); AccessControlList acl = submitClient.getQueueAdmins(queue); conf.set(toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString()); TokenCache.cleanUpTokenReferral(conf); if (conf.getBoolean( MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED, MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) { ArrayList<String> trackingIds = new ArrayList<String>(); for (Token<? extends TokenIdentifier> t : job.getCredentials().getAllTokens()) { trackingIds.add(t.decodeIdentifier().getTrackingId()); } conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS, trackingIds.toArray(new String[trackingIds.size()])); } ReservationId reservationId = job.getReservationId(); if (reservationId != null) { conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString()); } writeConf(conf, submitJobFile); printTokens(jobId, job.getCredentials()); status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials()); if (status != null) { return status; } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != null) jtFs.delete(submitJobDir, true); } } }
|