Hadoop Map/Reduce 执行流程关键代码
JobClient.runJob(conf) | 运行job
|-->JobClient jc = new JobClient(job);
|-->RunningJob rj = jc.submitJob(job);
|-->submitJobInternal(job);
|-->int reduces = job.getNumReduceTasks();
|-->JobContext context = new JobContext(job, jobId);
|-->maps = writeOldSplits(job, submitSplitFile);
|-->job.setNumMapTasks(maps);
|-->job.writeXml(out);
|-->JobStatus status = jobSubmitClient.submitJob(jobId);
JobTracker.submitJob(JobId) |提交job
|-->JobInProgress job = new JobInProgress(jobId, this, this.conf);
|-->checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB); |检查权限
|-->checkMemoryRequirements(job); |检查内存需求
|-->addJob(jobId, job); |添加至job队列
|-->jobs.put(job.getProfile().getJobID(), job);
|--> for (JobInProgressListener listener : jobInProgressListeners) |添加至监听器,供调度使用
|-->listener.jobAdded(job);
JobTracker.heartbeat() |JobTracker启动后供TaskTracker以RPC方式来调用,返回Response集合
|-->List
|-->tasks = taskScheduler.assignTasks(taskTrackerStatus); |通过调度器选择合适的tasks
|-->for (Task task : tasks)
|-->expireLaunchingTasks.addNewTask(task.getTaskID());
|-->actions.add(new LaunchTaskAction(task)); |实际actions还会添加commmitTask等
|-->response.setHeartbeatInterval(nextInterval);
|-->response.setActions(actions.toArray(new TaskTrackerAction[actions.size()]));
|-->return response;
TaskTracker.offerService |TaskTracker启动后通过offerservice()不断发心跳至JobTracker中
|-->transmitHeartBeat()
|-->HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, justStarted, justInited,askForNewTask, heartbeatResponseId);
|-->TaskTrackerAction[] actions = heartbeatResponse.getActions();
|-->for(TaskTrackerAction action: actions)
|-->if (action instanceof LaunchTaskAction)
|-->addToTaskQueue((LaunchTaskAction)action); |添加至执行Queue,根据map/reduce task分别添加
|-->if (action.getTask().isMapTask()) {
|-->mapLauncher.addToTaskQueue(action);
|-->TaskInProgress tip = registerTask(action, this);
|-->tasksToLaunch.add(tip);
|-->tasksToLaunch.notifyAll(); |唤醒阻塞进程
|-->else
|-->reduceLauncher.addToTaskQueue(action);
TaskLauncher.run()
|--> while (tasksToLaunch.isEmpty())
|-->tasksToLaunch.wait();
|-->tip = tasksToLaunch.remove(0);
|-->startNewTask(tip);
|-->localizeJob(tip);
|-->launchTaskForJob(tip, new JobConf(rjob.jobConf));
|-->tip.setJobConf(jobConf);
|-->tip.launchTask(); |TaskInProgress.launchTask()
|-->this.runner = task.createRunner(TaskTracker.this, this); |区分map/reduce
|-->this.runner.start();
MapTaskRunner.run() |执行MapTask
|-->File workDir = new File(lDirAlloc.getLocalPathToRead() |准备执行路径
|-->String jar = conf.getJar(); |准备jar包
|-->File jvm = new File(new File(System.getProperty("java.home"), "bin"), "java"); |获取jvm
|-->vargs.add(Child.class.getName()); |添加参数,Child类作为main主函数启动
|-->tracker.addToMemoryManager(t.getTaskID(), t.i