From c3445a1e5239a2886070f9a97bb0e3da6b3de60f Mon Sep 17 00:00:00 2001 From: hansbogert Date: Mon, 13 Apr 2015 14:32:36 +0200 Subject: [PATCH 01/14] Update README.md correct spelling mistake propertios->properties --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index a9a4390..3b3d285 100644 --- a/README.md +++ b/README.md @@ -135,7 +135,7 @@ operation](http://hadoop.apache.org/docs/stable/single_node_setup.html#PseudoDis ``` -[More details on configuration propertios can be found here.](configuration.md) +[More details on configuration properties can be found here.](configuration.md) #### Start #### From b69bf8f8bb12d2f571778916183506dfc65781b5 Mon Sep 17 00:00:00 2001 From: hansbogert Date: Fri, 24 Apr 2015 14:52:53 +0200 Subject: [PATCH 02/14] Use MESOS_NATIVE_JAVA_LIBRARY in README.md Warning: MESOS_NATIVE_LIBRARY is deprecated, use MESOS_NATIVE_JAVA_LIBRARY instead. Future releases will not support JNI bindings via MESOS_NATIVE_LIBRARY. --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 3b3d285..cd5ab0f 100644 --- a/README.md +++ b/README.md @@ -145,13 +145,13 @@ to the Mesos native library. On Linux: ``` -$ MESOS_NATIVE_LIBRARY=/path/to/libmesos.so hadoop jobtracker +$ MESOS_NATIVE_JAVA_LIBRARY=/path/to/libmesos.so hadoop jobtracker ``` And on OS X: ``` -$ MESOS_NATIVE_LIBRARY=/path/to/libmesos.dylib hadoop jobtracker +$ MESOS_NATIVE_JAVA_LIBRARY=/path/to/libmesos.dylib hadoop jobtracker ``` > **NOTE: You do not need to worry about distributing your Hadoop From 73569c50f77d0d4a6cd4a33615f3b47ff8a6b502 Mon Sep 17 00:00:00 2001 From: Brian Topping Date: Fri, 8 May 2015 18:33:32 +0700 Subject: [PATCH 03/14] * Restructure o.a.h.mapred.MesosScheduler#assignTasks to always pass through tasks, then decide whether they are a managed TaskTracker * ResourcePolicy is abstract for all intents, but it could be instantiated. Make it literally abstract * A bunch of lint warnings removed * Rearrange code to be easier to read -- interface implementations commented and methods in order, update some docs to JavaDoc format. --- pom.xml | 3 +- .../apache/hadoop/mapred/MesosExecutor.java | 124 +++--- .../apache/hadoop/mapred/MesosScheduler.java | 419 ++++++++---------- .../apache/hadoop/mapred/MesosTracker.java | 51 ++- .../apache/hadoop/mapred/ResourcePolicy.java | 276 ++++++------ .../hadoop/mapred/ResourcePolicyFixed.java | 21 +- .../hadoop/mapred/ResourcePolicyVariable.java | 101 ++--- 7 files changed, 477 insertions(+), 518 deletions(-) diff --git a/pom.xml b/pom.xml index f9b0832..4c82358 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ org.apache.mesos hadoop-mesos - 0.1.0 + 0.1.1-SNAPSHOT UTF-8 @@ -15,6 +15,7 @@ 1.1.3 3.1 + 2.5.0-mr1-cdh5.2.0 0.21.0 2.5.0 diff --git a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java index 8ab2a4f..d2ab783 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java @@ -21,49 +21,18 @@ public class MesosExecutor implements Executor { public static final Log LOG = LogFactory.getLog(MesosExecutor.class); - private SlaveInfo slaveInfo; - private TaskTracker taskTracker; protected final ScheduledExecutorService timerScheduler = - Executors.newScheduledThreadPool(1); + Executors.newScheduledThreadPool(1); + private SlaveInfo slaveInfo; + private TaskTracker taskTracker; public static void main(String[] args) { MesosExecutorDriver driver = new MesosExecutorDriver(new MesosExecutor()); System.exit(driver.run() == Status.DRIVER_STOPPED ? 0 : 1); } - private JobConf configure(final TaskInfo task) { - JobConf conf = new JobConf(false); - try { - byte[] bytes = task.getData().toByteArray(); - conf.readFields(new DataInputStream(new ByteArrayInputStream(bytes))); - } catch (IOException e) { - LOG.warn("Failed to deserialize configuration.", e); - System.exit(1); - } - - // Output the configuration as XML for easy debugging. - try { - StringWriter writer = new StringWriter(); - conf.writeXml(writer); - writer.flush(); - String xml = writer.getBuffer().toString(); - LOG.info("XML Configuration received:\n" + - org.apache.mesos.hadoop.Utils.formatXml(xml)); - } catch (Exception e) { - LOG.warn("Failed to output configuration as XML.", e); - } - - // Get hostname from Mesos to make sure we match what it reports - // to the JobTracker. - conf.set("slave.host.name", slaveInfo.getHostname()); - - // Set the mapred.local directory inside the executor sandbox, so that - // different TaskTrackers on the same host do not step on each other. - conf.set("mapred.local.dir", System.getenv("MESOS_DIRECTORY") + "/mapred"); - - return conf; - } +// --------------------- Interface Executor --------------------- @Override public void registered(ExecutorDriver driver, ExecutorInfo executorInfo, @@ -72,6 +41,16 @@ public void registered(ExecutorDriver driver, ExecutorInfo executorInfo, this.slaveInfo = slaveInfo; } + @Override + public void reregistered(ExecutorDriver driver, SlaveInfo slaveInfo) { + LOG.info("Executor reregistered with the slave"); + } + + @Override + public void disconnected(ExecutorDriver driver) { + LOG.info("Executor disconnected from the slave"); + } + @Override public void launchTask(final ExecutorDriver driver, final TaskInfo task) { LOG.info("Launching task : " + task.getTaskId().getValue()); @@ -86,10 +65,7 @@ public void launchTask(final ExecutorDriver driver, final TaskInfo task) { try { taskTracker = new TaskTracker(conf); - } catch (IOException e) { - LOG.fatal("Failed to start TaskTracker", e); - System.exit(1); - } catch (InterruptedException e) { + } catch (IOException | InterruptedException e) { LOG.fatal("Failed to start TaskTracker", e); System.exit(1); } @@ -119,7 +95,7 @@ public void run() { // Stop the executor. driver.stop(); } catch (Throwable t) { - LOG.error("Caught exception, committing suicide.", t); + LOG.fatal("Caught exception, committing suicide.", t); driver.stop(); System.exit(1); } @@ -143,38 +119,61 @@ public void killTask(final ExecutorDriver driver, final TaskID taskId) { @Override public void run() { driver.sendStatusUpdate(TaskStatus.newBuilder() - .setTaskId(taskId) - .setState(TaskState.TASK_FINISHED) - .build()); + .setTaskId(taskId) + .setState(TaskState.TASK_FINISHED) + .build()); } }.start(); } } - @Override - public void reregistered(ExecutorDriver driver, SlaveInfo slaveInfo) { - LOG.info("Executor reregistered with the slave"); - } - - @Override - public void disconnected(ExecutorDriver driver) { - LOG.info("Executor disconnected from the slave"); - } - @Override public void frameworkMessage(ExecutorDriver d, byte[] msg) { LOG.info("Executor received framework message of length: " + msg.length + " bytes"); } + @Override + public void shutdown(ExecutorDriver d) { + LOG.info("Executor asked to shutdown"); + } + @Override public void error(ExecutorDriver d, String message) { LOG.error("MesosExecutor.error: " + message); } - @Override - public void shutdown(ExecutorDriver d) { - LOG.info("Executor asked to shutdown"); + private JobConf configure(final TaskInfo task) { + JobConf conf = new JobConf(false); + try { + byte[] bytes = task.getData().toByteArray(); + conf.readFields(new DataInputStream(new ByteArrayInputStream(bytes))); + } catch (IOException e) { + LOG.warn("Failed to deserialize configuration.", e); + System.exit(1); + } + + // Output the configuration as XML for easy debugging. + try { + StringWriter writer = new StringWriter(); + conf.writeXml(writer); + writer.flush(); + String xml = writer.getBuffer().toString(); + LOG.info("XML Configuration received:\n" + + org.apache.mesos.hadoop.Utils.formatXml(xml)); + } catch (Exception e) { + LOG.warn("Failed to output configuration as XML.", e); + } + + // Get hostname from Mesos to make sure we match what it reports + // to the JobTracker. + conf.set("slave.host.name", slaveInfo.getHostname()); + + // Set the mapred.local directory inside the executor sandbox, so that + // different TaskTrackers on the same host do not step on each other. + conf.set("mapred.local.dir", System.getenv("MESOS_DIRECTORY") + "/mapred"); + + return conf; } public void revokeSlots() { @@ -190,6 +189,7 @@ public void revokeSlots() { // Be sure there's nothing running and nothing in the launcher queue. // If we expect to have no slots, let's go ahead and terminate the task launchers + // CONDITION IS ALWAYS TRUE if (maxMapSlots == 0) { try { Field launcherField = taskTracker.getClass().getDeclaredField("mapLauncher"); @@ -204,6 +204,7 @@ public void revokeSlots() { } } + // CONDITION IS ALWAYS TRUE if (maxReduceSlots == 0) { try { Field launcherField = taskTracker.getClass().getDeclaredField("reduceLauncher"); @@ -245,20 +246,17 @@ public void run() { try { taskTracker.shutdown(); - } catch (IOException e) { - LOG.error("Failed to shutdown TaskTracker", e); - } catch (InterruptedException e) { + } catch (IOException | InterruptedException e) { LOG.error("Failed to shutdown TaskTracker", e); } - } - else { + } else { try { Field field = taskTracker.getClass().getDeclaredField("tasksToCleanup"); field.setAccessible(true); BlockingQueue tasksToCleanup = ((BlockingQueue) field.get(taskTracker)); LOG.info("TaskTracker has " + taskTracker.tasks.size() + - " running tasks and " + tasksToCleanup + - " tasks to clean up."); + " running tasks and " + tasksToCleanup + + " tasks to clean up."); } catch (ReflectiveOperationException e) { LOG.fatal("Failed to get task counts from TaskTracker", e); } diff --git a/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java b/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java index 3f1e63f..6577fe1 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java @@ -36,42 +36,44 @@ public class MesosScheduler extends TaskScheduler implements Scheduler { public static final double SLOT_CPUS_DEFAULT = 1; // 1 cores. public static final int SLOT_DISK_DEFAULT = 1024; // 1 GB. public static final int SLOT_JVM_HEAP_DEFAULT = 1024; // 1024MB. + public static final double TASKTRACKER_CPUS_DEFAULT = 1.0; // 1 core. public static final int TASKTRACKER_MEM_DEFAULT = 1024; // 1 GB. public static final int TASKTRACKER_DISK_DEFAULT = 1024; // 1 GB. + // The default behavior in Hadoop is to use 4 slots per TaskTracker: public static final int MAP_SLOTS_DEFAULT = 2; public static final int REDUCE_SLOTS_DEFAULT = 2; - // The amount of time to wait for task trackers to launch before - // giving up. + + // The amount of time to wait for task trackers to launch before giving up. public static final long LAUNCH_TIMEOUT_MS = 300000; // 5 minutes public static final long PERIODIC_MS = 300000; // 5 minutes public static final long DEFAULT_IDLE_CHECK_INTERVAL = 5; // 5 seconds + // Destroy task trackers after being idle for N idle checks public static final long DEFAULT_IDLE_REVOCATION_CHECKS = 5; - private SchedulerDriver driver; + public Metrics metrics; protected TaskScheduler taskScheduler; protected JobTracker jobTracker; protected Configuration conf; protected File stateFile; + // Count of the launched trackers for TaskID generation. protected long launchedTrackers = 0; + // Use a fixed slot allocation policy? protected boolean policyIsFixed = false; protected ResourcePolicy policy; protected boolean enableMetrics = false; - public Metrics metrics; // Maintains a mapping from {tracker host:port -> MesosTracker}. // Used for tracking the slots of each TaskTracker and the corresponding // Mesos TaskID. - protected Map mesosTrackers = - new ConcurrentHashMap(); + protected Map mesosTrackers = new ConcurrentHashMap<>(); - protected final ScheduledExecutorService timerScheduler = - Executors.newScheduledThreadPool(1); + protected final ScheduledExecutorService timerScheduler = Executors.newScheduledThreadPool(1); protected JobInProgressListener jobListener = new JobInProgressListener() { @Override @@ -107,8 +109,7 @@ public void jobUpdated(JobChangeEvent event) { for (String hostname : flakyTrackers) { for (MesosTracker mesosTracker : mesosTrackers.values()) { if (mesosTracker.host.getHostName().startsWith(hostname)) { - LOG.info("Killing Mesos task: " + mesosTracker.taskId + " on host " - + mesosTracker.host + " because it has been marked as flaky"); + LOG.info("Killing Mesos task: " + mesosTracker.taskId + " on host " + mesosTracker.host + " because it has been marked as flaky"); if (metrics != null) { metrics.flakyTrackerKilledMeter.mark(); } @@ -131,16 +132,14 @@ public void jobUpdated(JobChangeEvent event) { LOG.info("Completed job : " + job.getJobID()); // Remove the task from the map. - final Set trackers = new HashSet(mesosTrackers.keySet()); + final Set trackers = new HashSet<>(mesosTrackers.keySet()); for (HttpHost tracker : trackers) { MesosTracker mesosTracker = mesosTrackers.get(tracker); mesosTracker.jobs.remove(job.getJobID()); - // If the TaskTracker doesn't have any running job tasks assigned, - // kill it. + // If the TaskTracker doesn't have any running job tasks assigned, kill it. if (mesosTracker.jobs.isEmpty() && mesosTracker.active) { - LOG.info("Killing Mesos task: " + mesosTracker.taskId + " on host " - + mesosTracker.host + " because it is no longer needed"); + LOG.info("Killing Mesos task: " + mesosTracker.taskId + " on host " + mesosTracker.host + " because it is no longer needed"); killTracker(mesosTracker); } @@ -148,147 +147,10 @@ public void jobUpdated(JobChangeEvent event) { } } }; + private SchedulerDriver driver; - // TaskScheduler methods. - @Override - public synchronized void start() throws IOException { - conf = getConf(); - String taskTrackerClass = conf.get("mapred.mesos.taskScheduler", - "org.apache.hadoop.mapred.JobQueueTaskScheduler"); - - try { - taskScheduler = - (TaskScheduler) Class.forName(taskTrackerClass).newInstance(); - taskScheduler.setConf(conf); - taskScheduler.setTaskTrackerManager(taskTrackerManager); - } catch (ClassNotFoundException e) { - LOG.fatal("Failed to initialize the TaskScheduler", e); - System.exit(1); - } catch (InstantiationException e) { - LOG.fatal("Failed to initialize the TaskScheduler", e); - System.exit(1); - } catch (IllegalAccessException e) { - LOG.fatal("Failed to initialize the TaskScheduler", e); - System.exit(1); - } - - // Add the job listener to get job related updates. - taskTrackerManager.addJobInProgressListener(jobListener); - - LOG.info("Starting MesosScheduler"); - jobTracker = (JobTracker) super.taskTrackerManager; - - String master = conf.get("mapred.mesos.master", "local"); - - try { - FrameworkInfo frameworkInfo = FrameworkInfo - .newBuilder() - .setUser("") // Let Mesos fill in the user. - .setCheckpoint(conf.getBoolean("mapred.mesos.checkpoint", false)) - .setRole(conf.get("mapred.mesos.role", "*")) - .setName("Hadoop: (RPC port: " + jobTracker.port + "," - + " WebUI port: " + jobTracker.infoPort + ")").build(); - - driver = new MesosSchedulerDriver(this, frameworkInfo, master); - driver.start(); - } catch (Exception e) { - // If the MesosScheduler can't be loaded, the JobTracker won't be useful - // at all, so crash it now so that the user notices. - LOG.fatal("Failed to start MesosScheduler", e); - System.exit(1); - } - - String file = conf.get("mapred.mesos.state.file", ""); - if (!file.equals("")) { - this.stateFile = new File(file); - } - - policyIsFixed = conf.getBoolean("mapred.mesos.scheduler.policy.fixed", - policyIsFixed); - - if (policyIsFixed) { - policy = new ResourcePolicyFixed(this); - } else { - policy = new ResourcePolicyVariable(this); - } - - enableMetrics = conf.getBoolean("mapred.mesos.metrics.enabled", - enableMetrics); - - if (enableMetrics) { - metrics = new Metrics(conf); - } - - taskScheduler.start(); - } - - @Override - public synchronized void terminate() throws IOException { - try { - LOG.info("Stopping MesosScheduler"); - driver.stop(); - } catch (Exception e) { - LOG.error("Failed to stop Mesos scheduler", e); - } - - taskScheduler.terminate(); - } - - @Override - public void checkJobSubmission(JobInProgress job) throws IOException { - taskScheduler.checkJobSubmission(job); - } - - @Override - public List assignTasks(TaskTracker taskTracker) - throws IOException { - HttpHost tracker = new HttpHost(taskTracker.getStatus().getHost(), - taskTracker.getStatus().getHttpPort()); - - if (!mesosTrackers.containsKey(tracker)) { - LOG.info("Unknown/exited TaskTracker: " + tracker + ". "); - return null; - } - - MesosTracker mesosTracker = mesosTrackers.get(tracker); - - // Make sure we're not asked to assign tasks to any task trackers that have - // been stopped. This could happen while the task tracker has not been - // removed from the cluster e.g still in the heartbeat timeout period. - synchronized (this) { - if (mesosTracker.stopped) { - LOG.info("Asked to assign tasks to stopped tracker " + tracker + "."); - return null; - } - } - - // Let the underlying task scheduler do the actual task scheduling. - List tasks = taskScheduler.assignTasks(taskTracker); - - // The Hadoop Fair Scheduler is known to return null. - if (tasks == null) { - return null; - } - - // Keep track of which TaskTracker contains which tasks. - for (Task task : tasks) { - mesosTracker.jobs.add(task.getJobID()); - } - - return tasks; - } - - @Override - public synchronized Collection getJobs(String queueName) { - return taskScheduler.getJobs(queueName); - } - - @Override - public synchronized void refresh() throws IOException { - taskScheduler.refresh(); - } + // --------------------- Interface Scheduler --------------------- - // Mesos Scheduler methods. // These are synchronized, where possible. Some of these methods need to access the // JobTracker, which can lead to a deadlock: // See: https://issues.apache.org/jira/browse/MESOS-429 @@ -300,85 +162,30 @@ public synchronized void refresh() throws IOException { // state across our Scheduler and TaskScheduler implementations, and provide // atomic operations as needed. @Override - public synchronized void registered(SchedulerDriver schedulerDriver, - FrameworkID frameworkID, MasterInfo masterInfo) { - LOG.info("Registered as " + frameworkID.getValue() - + " with master " + masterInfo); + public synchronized void registered(SchedulerDriver schedulerDriver, FrameworkID frameworkID, MasterInfo masterInfo) { + LOG.info("Registered as " + frameworkID.getValue() + " with master " + masterInfo); } @Override - public synchronized void reregistered(SchedulerDriver schedulerDriver, - MasterInfo masterInfo) { + public synchronized void reregistered(SchedulerDriver schedulerDriver, MasterInfo masterInfo) { LOG.info("Re-registered with master " + masterInfo); } - public void killTracker(MesosTracker tracker) { - if (metrics != null) { - metrics.killMeter.mark(); - } - synchronized (this) { - driver.killTask(tracker.taskId); - } - tracker.stop(); - if (mesosTrackers.get(tracker.host) == tracker) { - mesosTrackers.remove(tracker.host); - } - } - - public synchronized void scheduleTimer(Runnable command, - long delay, - TimeUnit unit) { - timerScheduler.schedule(command, delay, unit); - } - - // For some reason, pendingMaps() and pendingReduces() doesn't return the - // values we expect. We observed negative values, which may be related to - // https://issues.apache.org/jira/browse/MAPREDUCE-1238. Below is the - // algorithm that is used to calculate the pending tasks within the Hadoop - // JobTracker sources (see 'printTaskSummary' in - // src/org/apache/hadoop/mapred/jobdetails_jsp.java). - public int getPendingTasks(TaskInProgress[] tasks) { - int totalTasks = tasks.length; - int runningTasks = 0; - int finishedTasks = 0; - int killedTasks = 0; - for (int i = 0; i < totalTasks; ++i) { - TaskInProgress task = tasks[i]; - if (task == null) { - continue; - } - if (task.isComplete()) { - finishedTasks += 1; - } else if (task.isRunning()) { - runningTasks += 1; - } else if (task.wasKilled()) { - killedTasks += 1; - } - } - int pendingTasks = totalTasks - runningTasks - killedTasks - finishedTasks; - return pendingTasks; - } - // This method uses explicit synchronization in order to avoid deadlocks when // accessing the JobTracker. @Override - public void resourceOffers(SchedulerDriver schedulerDriver, - List offers) { + public void resourceOffers(SchedulerDriver schedulerDriver, List offers) { policy.resourceOffers(schedulerDriver, offers); } @Override - public synchronized void offerRescinded(SchedulerDriver schedulerDriver, - OfferID offerID) { + public synchronized void offerRescinded(SchedulerDriver schedulerDriver, OfferID offerID) { LOG.warn("Rescinded offer: " + offerID.getValue()); } @Override - public synchronized void statusUpdate(SchedulerDriver schedulerDriver, - Protos.TaskStatus taskStatus) { - LOG.info("Status update of " + taskStatus.getTaskId().getValue() - + " to " + taskStatus.getState().name() - + " with message " + taskStatus.getMessage()); + public synchronized void statusUpdate(SchedulerDriver schedulerDriver, Protos.TaskStatus taskStatus) { + LOG.info("Status update of " + taskStatus.getTaskId().getValue() + " to " + taskStatus.getState().name() + " with message " + taskStatus.getMessage()); // Remove the TaskTracker if the corresponding Mesos task has reached a // terminal state. @@ -388,7 +195,7 @@ public synchronized void statusUpdate(SchedulerDriver schedulerDriver, case TASK_KILLED: case TASK_LOST: // Make a copy to iterate over keys and delete values. - Set trackers = new HashSet(mesosTrackers.keySet()); + Set trackers = new HashSet<>(mesosTrackers.keySet()); // Remove the task from the map. for (HttpHost tracker : trackers) { @@ -417,11 +224,8 @@ public synchronized void statusUpdate(SchedulerDriver schedulerDriver, } @Override - public synchronized void frameworkMessage(SchedulerDriver schedulerDriver, - ExecutorID executorID, SlaveID slaveID, byte[] bytes) { - LOG.info("Framework Message of " + bytes.length + " bytes" - + " from executor " + executorID.getValue() - + " on slave " + slaveID.getValue()); + public synchronized void frameworkMessage(SchedulerDriver schedulerDriver, ExecutorID executorID, SlaveID slaveID, byte[] bytes) { + LOG.info("Framework Message of " + bytes.length + " bytes" + " from executor " + executorID.getValue() + " on slave " + slaveID.getValue()); } @Override @@ -430,20 +234,179 @@ public synchronized void disconnected(SchedulerDriver schedulerDriver) { } @Override - public synchronized void slaveLost(SchedulerDriver schedulerDriver, - SlaveID slaveID) { + public synchronized void slaveLost(SchedulerDriver schedulerDriver, SlaveID slaveID) { LOG.warn("Slave lost: " + slaveID.getValue()); } @Override - public synchronized void executorLost(SchedulerDriver schedulerDriver, - ExecutorID executorID, SlaveID slaveID, int status) { - LOG.warn("Executor " + executorID.getValue() - + " lost with status " + status + " on slave " + slaveID); + public synchronized void executorLost(SchedulerDriver schedulerDriver, ExecutorID executorID, SlaveID slaveID, int status) { + LOG.warn("Executor " + executorID.getValue() + " lost with status " + status + " on slave " + slaveID); } @Override public synchronized void error(SchedulerDriver schedulerDriver, String s) { LOG.error("Error from scheduler driver: " + s); } + + @Override + public List assignTasks(TaskTracker taskTracker) throws IOException { + + // Let the underlying task scheduler do the actual task scheduling. + List tasks = taskScheduler.assignTasks(taskTracker); + + // The Hadoop Fair Scheduler is known to return null. + if (tasks != null) { + HttpHost tracker = new HttpHost(taskTracker.getStatus().getHost(), taskTracker.getStatus().getHttpPort()); + MesosTracker mesosTracker = mesosTrackers.get(tracker); + synchronized (this) { + if (mesosTracker != null) { + if (mesosTracker.stopped) { + // Make sure we're not asked to assign tasks to any task trackers that have + // been stopped. This could happen while the task tracker has not been + // removed from the cluster e.g still in the heartbeat timeout period. + LOG.info("Asked to assign tasks to stopped tracker " + tracker + "."); + return null; + } else { + // Keep track of which TaskTracker contains which tasks. + for (Task task : tasks) { + mesosTracker.jobs.add(task.getJobID()); + } + } + } + } + } + + return tasks; + } + + @Override + public void checkJobSubmission(JobInProgress job) throws IOException { + taskScheduler.checkJobSubmission(job); + } + + @Override + public synchronized Collection getJobs(String queueName) { + return taskScheduler.getJobs(queueName); + } + + /** + * For some reason, pendingMaps() and pendingReduces() doesn't return the values we expect. We observed negative + * values, which may be related to https://issues.apache.org/jira/browse/MAPREDUCE-1238. Below is the algorithm + * that is used to calculate the pending tasks within the Hadoop JobTracker sources (see 'printTaskSummary' in + * src/org/apache/hadoop/mapred/jobdetails_jsp.java). + * + * @param tasks + * @return + */ + public int getPendingTasks(TaskInProgress[] tasks) { + int totalTasks = tasks.length; + int runningTasks = 0; + int finishedTasks = 0; + int killedTasks = 0; + for (TaskInProgress task : tasks) { + if (task == null) { + continue; + } + if (task.isComplete()) { + finishedTasks += 1; + } else if (task.isRunning()) { + runningTasks += 1; + } else if (task.wasKilled()) { + killedTasks += 1; + } + } + return totalTasks - runningTasks - killedTasks - finishedTasks; + } + + public void killTracker(MesosTracker tracker) { + if (metrics != null) { + metrics.killMeter.mark(); + } + synchronized (this) { + driver.killTask(tracker.taskId); + } + tracker.stop(); + if (mesosTrackers.get(tracker.host) == tracker) { + mesosTrackers.remove(tracker.host); + } + } + + @Override + public synchronized void refresh() throws IOException { + taskScheduler.refresh(); + } + + public synchronized void scheduleTimer(Runnable command, long delay, TimeUnit unit) { + timerScheduler.schedule(command, delay, unit); + } + + // TaskScheduler methods. + @Override + public synchronized void start() throws IOException { + conf = getConf(); + String taskTrackerClass = conf.get("mapred.mesos.taskScheduler", "org.apache.hadoop.mapred.JobQueueTaskScheduler"); + + try { + taskScheduler = (TaskScheduler) Class.forName(taskTrackerClass).newInstance(); + taskScheduler.setConf(conf); + taskScheduler.setTaskTrackerManager(taskTrackerManager); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { + LOG.fatal("Failed to initialize the TaskScheduler", e); + System.exit(1); + } + + // Add the job listener to get job related updates. + taskTrackerManager.addJobInProgressListener(jobListener); + + LOG.info("Starting MesosScheduler"); + jobTracker = (JobTracker) super.taskTrackerManager; + + String master = conf.get("mapred.mesos.master", "local"); + + try { + FrameworkInfo frameworkInfo = FrameworkInfo.newBuilder().setUser("") // Let Mesos fill in the user. + .setCheckpoint(conf.getBoolean("mapred.mesos.checkpoint", false)).setRole(conf.get("mapred.mesos.role", "*")).setName("Hadoop: (RPC port: " + jobTracker.port + "," + " WebUI port: " + jobTracker.infoPort + ")").build(); + + driver = new MesosSchedulerDriver(this, frameworkInfo, master); + driver.start(); + } catch (Exception e) { + // If the MesosScheduler can't be loaded, the JobTracker won't be useful + // at all, so crash it now so that the user notices. + LOG.fatal("Failed to start MesosScheduler", e); + System.exit(1); + } + + String file = conf.get("mapred.mesos.state.file", ""); + if (!file.equals("")) { + this.stateFile = new File(file); + } + + policyIsFixed = conf.getBoolean("mapred.mesos.scheduler.policy.fixed", policyIsFixed); + + if (policyIsFixed) { + policy = new ResourcePolicyFixed(this); + } else { + policy = new ResourcePolicyVariable(this); + } + + enableMetrics = conf.getBoolean("mapred.mesos.metrics.enabled", enableMetrics); + + if (enableMetrics) { + metrics = new Metrics(conf); + } + + taskScheduler.start(); + } + + @Override + public synchronized void terminate() throws IOException { + try { + LOG.info("Stopping MesosScheduler"); + driver.stop(); + } catch (Exception e) { + LOG.error("Failed to stop Mesos scheduler", e); + } + + taskScheduler.terminate(); + } } diff --git a/src/main/java/org/apache/hadoop/mapred/MesosTracker.java b/src/main/java/org/apache/hadoop/mapred/MesosTracker.java index 021e0ea..7424ede 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosTracker.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosTracker.java @@ -16,7 +16,6 @@ * Used to track the our launched TaskTrackers. */ public class MesosTracker { - public static final Log LOG = LogFactory.getLog(MesosScheduler.class); public volatile HttpHost host; public TaskID taskId; @@ -91,6 +90,30 @@ public void run() { }, MesosScheduler.LAUNCH_TIMEOUT_MS, TimeUnit.MILLISECONDS); } + protected void schedulePeriodic() { + scheduler.scheduleTimer(new Runnable() { + @Override + public void run() { + if (MesosTracker.this.scheduler.mesosTrackers.containsKey(host) && + MesosTracker.this == MesosTracker.this.scheduler.mesosTrackers.get(host)) { + // Periodically check if the jobs assigned to this TaskTracker are + // still running (lazy GC). + final Set jobsCopy = new HashSet<>(MesosTracker.this.jobs); + for (JobID id : jobsCopy) { + JobStatus jobStatus = MesosTracker.this.scheduler.jobTracker.getJobStatus(id); + if (jobStatus == null || jobStatus.isJobComplete()) { + if (MesosTracker.this.scheduler.metrics != null) { + MesosTracker.this.scheduler.metrics.periodicGC.mark(); + } + MesosTracker.this.jobs.remove(id); + } + } + schedulePeriodic(); + } + } + }, MesosScheduler.PERIODIC_MS, TimeUnit.MILLISECONDS); + } + protected void scheduleIdleCheck() { scheduler.scheduleTimer(new Runnable() { @Override @@ -107,7 +130,7 @@ public void run() { return; } - boolean trackerIsIdle = false; + boolean trackerIsIdle; // We're only interested in TaskTrackers which have jobs assigned to them // but are completely idle. The MesosScheduler is in charge of destroying @@ -150,30 +173,6 @@ public void run() { }, MesosTracker.this.idleCheckInterval, TimeUnit.SECONDS); } - protected void schedulePeriodic() { - scheduler.scheduleTimer(new Runnable() { - @Override - public void run() { - if (MesosTracker.this.scheduler.mesosTrackers.containsKey(host) && - MesosTracker.this == MesosTracker.this.scheduler.mesosTrackers.get(host)) { - // Periodically check if the jobs assigned to this TaskTracker are - // still running (lazy GC). - final Set jobsCopy = new HashSet(MesosTracker.this.jobs); - for (JobID id : jobsCopy) { - JobStatus jobStatus = MesosTracker.this.scheduler.jobTracker.getJobStatus(id); - if (jobStatus == null || jobStatus.isJobComplete()) { - if (MesosTracker.this.scheduler.metrics != null) { - MesosTracker.this.scheduler.metrics.periodicGC.mark(); - } - MesosTracker.this.jobs.remove(id); - } - } - schedulePeriodic(); - } - } - }, MesosScheduler.PERIODIC_MS, TimeUnit.MILLISECONDS); - } - public void stop() { active = false; stopped = true; diff --git a/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java b/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java index 3a52888..5d1e643 100644 --- a/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java +++ b/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java @@ -17,7 +17,7 @@ import static org.apache.hadoop.util.StringUtils.join; -public class ResourcePolicy { +public abstract class ResourcePolicy { public static final Log LOG = LogFactory.getLog(ResourcePolicy.class); public volatile MesosScheduler scheduler; public int neededMapSlots; @@ -71,142 +71,13 @@ public ResourcePolicy(MesosScheduler scheduler) { containerMem = tasktrackerMem; } - public void computeNeededSlots(List jobsInProgress, - Collection taskTrackers) { - // Compute the number of pending maps and reduces. - int pendingMaps = 0; - int pendingReduces = 0; - int runningMaps = 0; - int runningReduces = 0; - - for (JobInProgress progress : jobsInProgress) { - // JobStatus.pendingMaps/Reduces may return the wrong value on - // occasion. This seems to be safer. - pendingMaps += scheduler.getPendingTasks(progress.getTasks(TaskType.MAP)); - pendingReduces += scheduler.getPendingTasks(progress.getTasks(TaskType.REDUCE)); - runningMaps += progress.runningMaps(); - runningReduces += progress.runningReduces(); - - // If the task is waiting to launch the cleanup task, let us make sure we have - // capacity to run the task. - if (!progress.isCleanupLaunched()) { - pendingMaps += scheduler.getPendingTasks(progress.getTasks(TaskType.JOB_CLEANUP)); - } - } - - // Mark active (heartbeated) TaskTrackers and compute idle slots. - int idleMapSlots = 0; - int idleReduceSlots = 0; - int unhealthyTrackers = 0; - - for (TaskTrackerStatus status : taskTrackers) { - if (!status.getHealthStatus().isNodeHealthy()) { - // Skip this node if it's unhealthy. - ++unhealthyTrackers; - continue; - } - - HttpHost host = new HttpHost(status.getHost(), status.getHttpPort()); - if (scheduler.mesosTrackers.containsKey(host)) { - scheduler.mesosTrackers.get(host).active = true; - idleMapSlots += status.getAvailableMapSlots(); - idleReduceSlots += status.getAvailableReduceSlots(); - } - } - - // Consider the TaskTrackers that have yet to become active as being idle, - // otherwise we will launch excessive TaskTrackers. - int inactiveMapSlots = 0; - int inactiveReduceSlots = 0; - for (MesosTracker tracker : scheduler.mesosTrackers.values()) { - if (!tracker.active) { - inactiveMapSlots += tracker.mapSlots; - inactiveReduceSlots += tracker.reduceSlots; - } - } - - // To ensure Hadoop jobs begin promptly, we can specify a minimum number - // of 'hot slots' to be available for use. This addresses the - // TaskTracker spin up delay that exists with Hadoop on Mesos. This can - // be a nuisance with lower latency applications, such as ad-hoc Hive - // queries. - int minimumMapSlots = scheduler.conf.getInt("mapred.mesos.total.map.slots.minimum", 0); - int minimumReduceSlots = - scheduler.conf.getInt("mapred.mesos.total.reduce.slots.minimum", 0); - - // Compute how many slots we need to allocate. - neededMapSlots = Math.max( - minimumMapSlots - (idleMapSlots + inactiveMapSlots), - pendingMaps - (idleMapSlots + inactiveMapSlots)); - neededReduceSlots = Math.max( - minimumReduceSlots - (idleReduceSlots + inactiveReduceSlots), - pendingReduces - (idleReduceSlots + inactiveReduceSlots)); - - LOG.info(join("\n", Arrays.asList( - "JobTracker Status", - " Pending Map Tasks: " + pendingMaps, - " Pending Reduce Tasks: " + pendingReduces, - " Running Map Tasks: " + runningMaps, - " Running Reduce Tasks: " + runningReduces, - " Idle Map Slots: " + idleMapSlots, - " Idle Reduce Slots: " + idleReduceSlots, - " Inactive Map Slots: " + inactiveMapSlots - + " (launched but no hearbeat yet)", - " Inactive Reduce Slots: " + inactiveReduceSlots - + " (launched but no hearbeat yet)", - " Needed Map Slots: " + neededMapSlots, - " Needed Reduce Slots: " + neededReduceSlots, - " Unhealthy Trackers: " + unhealthyTrackers))); - - if (scheduler.stateFile != null) { - // Update state file - synchronized (this) { - Set hosts = new HashSet(); - for (MesosTracker tracker : scheduler.mesosTrackers.values()) { - hosts.add(tracker.host.getHostName()); - } - try { - File tmp = new File(scheduler.stateFile.getAbsoluteFile() + ".tmp"); - FileWriter fstream = new FileWriter(tmp); - fstream.write(join("\n", Arrays.asList( - "time=" + System.currentTimeMillis(), - "pendingMaps=" + pendingMaps, - "pendingReduces=" + pendingReduces, - "runningMaps=" + runningMaps, - "runningReduces=" + runningReduces, - "idleMapSlots=" + idleMapSlots, - "idleReduceSlots=" + idleReduceSlots, - "inactiveMapSlots=" + inactiveMapSlots, - "inactiveReduceSlots=" + inactiveReduceSlots, - "neededMapSlots=" + neededMapSlots, - "neededReduceSlots=" + neededReduceSlots, - "unhealthyTrackers=" + unhealthyTrackers, - "hosts=" + join(",", hosts), - ""))); - fstream.close(); - tmp.renameTo(scheduler.stateFile); - } catch (Exception e) { - LOG.error("Can't write state file: " + e.getMessage()); - } - } - } - } - - // This method computes the number of slots to launch for this offer, and - // returns true if the offer is sufficient. - // Must be overridden. - public boolean computeSlots() { - return false; - } - - public void resourceOffers(SchedulerDriver schedulerDriver, - List offers) { - final HttpHost jobTrackerAddress = - new HttpHost(scheduler.jobTracker.getHostname(), scheduler.jobTracker.getTrackerPort()); + public void resourceOffers(SchedulerDriver schedulerDriver, List offers) { +// final HttpHost jobTrackerAddress = +// new HttpHost(scheduler.jobTracker.getHostname(), scheduler.jobTracker.getTrackerPort()); final Collection taskTrackers = scheduler.jobTracker.taskTrackers(); - final List jobsInProgress = new ArrayList(); + final List jobsInProgress = new ArrayList<>(); for (JobStatus status : scheduler.jobTracker.jobsToComplete()) { jobsInProgress.add(scheduler.jobTracker.getJob(status.getJobID())); } @@ -229,7 +100,7 @@ public void resourceOffers(SchedulerDriver schedulerDriver, mem = -1.0; disk = -1.0; Set ports = new HashSet(); - String cpuRole = new String("*"); + String cpuRole = "*"; String memRole = cpuRole; String diskRole = cpuRole; String portsRole = cpuRole; @@ -385,7 +256,7 @@ public void resourceOffers(SchedulerDriver schedulerDriver, if (master == null) { throw new RuntimeException( "Expecting configuration property 'mapred.mesos.master'"); - } else if (master == "local") { + } else if (Objects.equals(master, "local")) { throw new RuntimeException( "Can not use 'local' for 'mapred.mesos.executor'"); } @@ -435,8 +306,8 @@ public void resourceOffers(SchedulerDriver schedulerDriver, } if (containerOptions != null) { - for (int i = 0; i < containerOptions.length; i++) { - containerInfo.addOptions(containerOptions[i]); + for (String containerOption : containerOptions) { + containerInfo.addOptions(containerOption); } } @@ -559,4 +430,133 @@ public void resourceOffers(SchedulerDriver schedulerDriver, } } } + + public void computeNeededSlots(List jobsInProgress, + Collection taskTrackers) { + // Compute the number of pending maps and reduces. + int pendingMaps = 0; + int pendingReduces = 0; + int runningMaps = 0; + int runningReduces = 0; + + for (JobInProgress progress : jobsInProgress) { + // JobStatus.pendingMaps/Reduces may return the wrong value on + // occasion. This seems to be safer. + pendingMaps += scheduler.getPendingTasks(progress.getTasks(TaskType.MAP)); + pendingReduces += scheduler.getPendingTasks(progress.getTasks(TaskType.REDUCE)); + runningMaps += progress.runningMaps(); + runningReduces += progress.runningReduces(); + + // If the task is waiting to launch the cleanup task, let us make sure we have + // capacity to run the task. + if (!progress.isCleanupLaunched()) { + pendingMaps += scheduler.getPendingTasks(progress.getTasks(TaskType.JOB_CLEANUP)); + } + } + + // Mark active (heartbeated) TaskTrackers and compute idle slots. + int idleMapSlots = 0; + int idleReduceSlots = 0; + int unhealthyTrackers = 0; + + for (TaskTrackerStatus status : taskTrackers) { + if (!status.getHealthStatus().isNodeHealthy()) { + // Skip this node if it's unhealthy. + ++unhealthyTrackers; + continue; + } + + HttpHost host = new HttpHost(status.getHost(), status.getHttpPort()); + if (scheduler.mesosTrackers.containsKey(host)) { + scheduler.mesosTrackers.get(host).active = true; + idleMapSlots += status.getAvailableMapSlots(); + idleReduceSlots += status.getAvailableReduceSlots(); + } + } + + // Consider the TaskTrackers that have yet to become active as being idle, + // otherwise we will launch excessive TaskTrackers. + int inactiveMapSlots = 0; + int inactiveReduceSlots = 0; + for (MesosTracker tracker : scheduler.mesosTrackers.values()) { + if (!tracker.active) { + inactiveMapSlots += tracker.mapSlots; + inactiveReduceSlots += tracker.reduceSlots; + } + } + + // To ensure Hadoop jobs begin promptly, we can specify a minimum number + // of 'hot slots' to be available for use. This addresses the + // TaskTracker spin up delay that exists with Hadoop on Mesos. This can + // be a nuisance with lower latency applications, such as ad-hoc Hive + // queries. + int minimumMapSlots = scheduler.conf.getInt("mapred.mesos.total.map.slots.minimum", 0); + int minimumReduceSlots = + scheduler.conf.getInt("mapred.mesos.total.reduce.slots.minimum", 0); + + // Compute how many slots we need to allocate. + neededMapSlots = Math.max( + minimumMapSlots - (idleMapSlots + inactiveMapSlots), + pendingMaps - (idleMapSlots + inactiveMapSlots)); + neededReduceSlots = Math.max( + minimumReduceSlots - (idleReduceSlots + inactiveReduceSlots), + pendingReduces - (idleReduceSlots + inactiveReduceSlots)); + + LOG.info(join("\n", Arrays.asList( + "JobTracker Status", + " Pending Map Tasks: " + pendingMaps, + " Pending Reduce Tasks: " + pendingReduces, + " Running Map Tasks: " + runningMaps, + " Running Reduce Tasks: " + runningReduces, + " Idle Map Slots: " + idleMapSlots, + " Idle Reduce Slots: " + idleReduceSlots, + " Inactive Map Slots: " + inactiveMapSlots + + " (launched but no hearbeat yet)", + " Inactive Reduce Slots: " + inactiveReduceSlots + + " (launched but no hearbeat yet)", + " Needed Map Slots: " + neededMapSlots, + " Needed Reduce Slots: " + neededReduceSlots, + " Unhealthy Trackers: " + unhealthyTrackers))); + + File stateFile = scheduler.stateFile; + if (stateFile != null) { + // Update state file + synchronized (this) { + Set hosts = new HashSet<>(); + for (MesosTracker tracker : scheduler.mesosTrackers.values()) { + hosts.add(tracker.host.getHostName()); + } + try { + File tmp = new File(stateFile.getAbsoluteFile() + ".tmp"); + FileWriter fstream = new FileWriter(tmp); + fstream.write(join("\n", Arrays.asList( + "time=" + System.currentTimeMillis(), + "pendingMaps=" + pendingMaps, + "pendingReduces=" + pendingReduces, + "runningMaps=" + runningMaps, + "runningReduces=" + runningReduces, + "idleMapSlots=" + idleMapSlots, + "idleReduceSlots=" + idleReduceSlots, + "inactiveMapSlots=" + inactiveMapSlots, + "inactiveReduceSlots=" + inactiveReduceSlots, + "neededMapSlots=" + neededMapSlots, + "neededReduceSlots=" + neededReduceSlots, + "unhealthyTrackers=" + unhealthyTrackers, + "hosts=" + join(",", hosts), + ""))); + fstream.close(); + if (!tmp.renameTo(stateFile)) { + LOG.error("Can't overwrite state " + stateFile.getAbsolutePath()); + } + } catch (Exception e) { + LOG.error("Can't write state file: " + e.getMessage()); + } + } + } + } + + // This method computes the number of slots to launch for this offer, and + // returns true if the offer is sufficient. + // Must be overridden. + public abstract boolean computeSlots(); } diff --git a/src/main/java/org/apache/hadoop/mapred/ResourcePolicyFixed.java b/src/main/java/org/apache/hadoop/mapred/ResourcePolicyFixed.java index 2898c4f..a567c76 100644 --- a/src/main/java/org/apache/hadoop/mapred/ResourcePolicyFixed.java +++ b/src/main/java/org/apache/hadoop/mapred/ResourcePolicyFixed.java @@ -1,18 +1,18 @@ package org.apache.hadoop.mapred; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - +/** + * @todo What is the difference between variable and fixed resource policy? + */ public class ResourcePolicyFixed extends ResourcePolicy { - - public static final Log LOG = LogFactory.getLog(ResourcePolicyFixed.class); - public ResourcePolicyFixed(MesosScheduler scheduler) { super(scheduler); } - // This method computes the number of slots to launch for this offer, and - // returns true if the offer is sufficient. + /** + * Computes the number of slots to launch for this offer + * + * @return true if the offer is sufficient + */ @Override public boolean computeSlots() { mapSlots = mapSlotsMax; @@ -24,9 +24,6 @@ public boolean computeSlots() { slots = (int) Math.min(slots, (disk - containerDisk) / slotDisk); // Is this offer too small for even the minimum slots? - if (slots < mapSlots + reduceSlots || slots < 1) { - return false; - } - return true; + return slots >= 1 && slots >= mapSlots + reduceSlots; } } diff --git a/src/main/java/org/apache/hadoop/mapred/ResourcePolicyVariable.java b/src/main/java/org/apache/hadoop/mapred/ResourcePolicyVariable.java index 7d9888a..c7c9c10 100644 --- a/src/main/java/org/apache/hadoop/mapred/ResourcePolicyVariable.java +++ b/src/main/java/org/apache/hadoop/mapred/ResourcePolicyVariable.java @@ -1,61 +1,62 @@ package org.apache.hadoop.mapred; +/** + * @todo What is the difference between variable and fixed resource policy? + */ public class ResourcePolicyVariable extends ResourcePolicy { - public ResourcePolicyVariable(MesosScheduler scheduler) { - super(scheduler); - } + public ResourcePolicyVariable(MesosScheduler scheduler) { + super(scheduler); + } - // This method computes the number of slots to launch for this offer, and - // returns true if the offer is sufficient. - @Override - public boolean computeSlots() { - // What's the minimum number of map and reduce slots we should try to - // launch? - mapSlots = 0; - reduceSlots = 0; + /** + * Computes the number of slots to launch for this offer + * + * @return true if the offer is sufficient + */ + @Override + public boolean computeSlots() { + // What's the minimum number of map and reduce slots we should try to + // launch? + mapSlots = 0; + reduceSlots = 0; - // Determine how many slots we can allocate. - int slots = mapSlotsMax + reduceSlotsMax; - slots = (int) Math.min(slots, (cpus - containerCpus) / slotCpus); - slots = (int) Math.min(slots, (mem - containerMem) / slotMem); - slots = (int) Math.min(slots, (disk - containerDisk) / slotDisk); + // Determine how many slots we can allocate. + int slots = mapSlotsMax + reduceSlotsMax; + slots = (int) Math.min(slots, (cpus - containerCpus) / slotCpus); + slots = (int) Math.min(slots, (mem - containerMem) / slotMem); + slots = (int) Math.min(slots, (disk - containerDisk) / slotDisk); - // Is this offer too small for even the minimum slots? - if (slots < 1) { - return false; - } + // Is this offer too small for even the minimum slots? + if (slots < 1) { + return false; + } - // Is the number of slots we need sufficiently small? If so, we can - // allocate exactly the number we need. - if (slots >= neededMapSlots + neededReduceSlots && neededMapSlots < - mapSlotsMax && neededReduceSlots < reduceSlotsMax) { - mapSlots = neededMapSlots; - reduceSlots = neededReduceSlots; - } else { - // Allocate slots fairly for this resource offer. - double mapFactor = (double) neededMapSlots / (neededMapSlots + neededReduceSlots); - double reduceFactor = (double) neededReduceSlots / (neededMapSlots + neededReduceSlots); - // To avoid map/reduce slot starvation, don't allow more than 50% - // spread between map/reduce slots when we need both mappers and - // reducers. - if (neededMapSlots > 0 && neededReduceSlots > 0) { - if (mapFactor < 0.25) { - mapFactor = 0.25; - } else if (mapFactor > 0.75) { - mapFactor = 0.75; - } - if (reduceFactor < 0.25) { - reduceFactor = 0.25; - } else if (reduceFactor > 0.75) { - reduceFactor = 0.75; - } + // Is the number of slots we need sufficiently small? If so, we can + // allocate exactly the number we need. + if (slots >= neededMapSlots + neededReduceSlots + && neededMapSlots < mapSlotsMax + && neededReduceSlots < reduceSlotsMax) { + mapSlots = neededMapSlots; + reduceSlots = neededReduceSlots; + } else { + // Allocate slots fairly for this resource offer. + double mapFactor = (double) neededMapSlots / (neededMapSlots + neededReduceSlots); + // To avoid map/reduce slot starvation, don't allow more than 50% + // spread between map/reduce slots when we need both mappers and + // reducers. + if (neededMapSlots > 0 && neededReduceSlots > 0) { + if (mapFactor < 0.25) { + mapFactor = 0.25; + } else if (mapFactor > 0.75) { + mapFactor = 0.75; } - mapSlots = Math.min(Math.min((long)Math.max(Math.round(mapFactor * slots), 1), mapSlotsMax), neededMapSlots); - - // The remaining slots are allocated for reduces. - slots -= mapSlots; - reduceSlots = Math.min(Math.min(slots, reduceSlotsMax), neededReduceSlots); } - return true; + mapSlots = Math.min(Math.min(Math.max(Math.round(mapFactor * slots), 1), mapSlotsMax), neededMapSlots); + + // The remaining slots are allocated for reduces. + slots -= mapSlots; + reduceSlots = Math.min(Math.min(slots, reduceSlotsMax), neededReduceSlots); } + return true; } +} From 6e93e75a086e8c27aec559e8df4e92b834ac1931 Mon Sep 17 00:00:00 2001 From: Brian Topping Date: Sat, 9 May 2015 15:00:23 +0700 Subject: [PATCH 04/14] Fixes per PR changes. Addiiton of release plugin to follow. --- pom.xml | 1 - .../apache/hadoop/mapred/MesosExecutor.java | 82 +++++++------------ 2 files changed, 30 insertions(+), 53 deletions(-) diff --git a/pom.xml b/pom.xml index 4c82358..6633b17 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,6 @@ 1.1.3 3.1 - 2.5.0-mr1-cdh5.2.0 0.21.0 2.5.0 diff --git a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java index d2ab783..4d592be 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java @@ -112,7 +112,21 @@ public void killTask(final ExecutorDriver driver, final TaskID taskId) { LOG.info("Killing task : " + taskId.getValue()); if (taskTracker != null) { LOG.info("Revoking task tracker map/reduce slots"); - revokeSlots(); + + // terminate the task launchers + try { + killLauncher(taskTracker, "mapLauncher"); + killLauncher(taskTracker, "reduceLauncher"); + } catch (ReflectiveOperationException e) { + LOG.fatal("Failed updating map slots due to error with reflection", e); + } + + // Configure the new slot counts on the task tracker + taskTracker.setMaxMapSlots(0); + taskTracker.setMaxReduceSlots(0); + + // commit suicide when no jobs are running + scheduleSuicideTimer(); // Send the TASK_FINISHED status new Thread("TaskFinishedUpdate") { @@ -176,57 +190,21 @@ private JobConf configure(final TaskInfo task) { return conf; } - public void revokeSlots() { - if (taskTracker == null) { - LOG.error("Task tracker is not initialized"); - return; - } - - int maxMapSlots = 0; - int maxReduceSlots = 0; - - // TODO(tarnfeld): Sanity check that it's safe for us to change the slots. - // Be sure there's nothing running and nothing in the launcher queue. - - // If we expect to have no slots, let's go ahead and terminate the task launchers - // CONDITION IS ALWAYS TRUE - if (maxMapSlots == 0) { - try { - Field launcherField = taskTracker.getClass().getDeclaredField("mapLauncher"); - launcherField.setAccessible(true); - - // Kill the current map task launcher - TaskTracker.TaskLauncher launcher = ((TaskTracker.TaskLauncher) launcherField.get(taskTracker)); - launcher.notifySlots(); - launcher.interrupt(); - } catch (ReflectiveOperationException e) { - LOG.fatal("Failed updating map slots due to error with reflection", e); - } - } - - // CONDITION IS ALWAYS TRUE - if (maxReduceSlots == 0) { - try { - Field launcherField = taskTracker.getClass().getDeclaredField("reduceLauncher"); - launcherField.setAccessible(true); - - // Kill the current reduce task launcher - TaskTracker.TaskLauncher launcher = ((TaskTracker.TaskLauncher) launcherField.get(taskTracker)); - launcher.notifySlots(); - launcher.interrupt(); - } catch (ReflectiveOperationException e) { - LOG.fatal("Failed updating reduce slots due to error with reflection", e); - } - } - - // Configure the new slot counts on the task tracker - taskTracker.setMaxMapSlots(maxMapSlots); - taskTracker.setMaxReduceSlots(maxReduceSlots); - - // If we have zero slots left, commit suicide when no jobs are running - if ((maxMapSlots + maxReduceSlots) == 0) { - scheduleSuicideTimer(); - } + /** + * This is a hack to over + * @param tracker + * @param name + * @throws NoSuchFieldException + * @throws IllegalAccessException + */ + private void killLauncher(TaskTracker tracker, String name) throws NoSuchFieldException, IllegalAccessException { + Field f = tracker.getClass().getDeclaredField(name); + f.setAccessible(true); + + // Kill the current map task launcher + TaskTracker.TaskLauncher launcher = ((TaskTracker.TaskLauncher) f.get(tracker)); + launcher.notifySlots(); + launcher.interrupt(); } protected void scheduleSuicideTimer() { From ee1c8762b12e0400e742c48e1c75288e23678cb3 Mon Sep 17 00:00:00 2001 From: Brian Topping Date: Sat, 9 May 2015 15:34:57 +0700 Subject: [PATCH 05/14] Missed a comment, rolled back version change until another PR. --- pom.xml | 11 +++++++++-- .../java/org/apache/hadoop/mapred/MesosExecutor.java | 7 ++++--- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/pom.xml b/pom.xml index 6633b17..85379c4 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ org.apache.mesos hadoop-mesos - 0.1.1-SNAPSHOT + 0.1.0 UTF-8 @@ -160,7 +160,14 @@ - + + org.apache.maven.plugins + maven-release-plugin + 2.5.1 + + + + diff --git a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java index 4d592be..7bba393 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java @@ -191,9 +191,10 @@ private JobConf configure(final TaskInfo task) { } /** - * This is a hack to over - * @param tracker - * @param name + * This is a hack to overcome lack of accessibility of the launcher. Will solicit feedback from Hadoop list. + * + * @param tracker tracker with launcher we want to kill + * @param name name of the field containing the launcher * @throws NoSuchFieldException * @throws IllegalAccessException */ From 6352915d3423539a730d32ba89b6ebd47f540dd0 Mon Sep 17 00:00:00 2001 From: Brian Topping Date: Tue, 12 May 2015 16:56:43 +0700 Subject: [PATCH 06/14] Add release plugin and changes to facilitate publishing to Sonatype OSS --- pom.xml | 64 +++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 39 insertions(+), 25 deletions(-) diff --git a/pom.xml b/pom.xml index f9b0832..f75f5a3 100644 --- a/pom.xml +++ b/pom.xml @@ -1,10 +1,18 @@ - + + + oss-parent + org.sonatype.oss + 9 + + 4.0.0 - org.apache.mesos - hadoop-mesos - 0.1.0 + com.github.mesos + mesos-hadoop-mr1 + 0.1.1-SNAPSHOT + + https://github.com/mesos/hadoop UTF-8 @@ -15,8 +23,8 @@ 1.1.3 3.1 - 2.5.0-mr1-cdh5.2.0 - 0.21.0 + 2.6.0-mr1-cdh5.4.0 + 0.21.1 2.5.0 3.1.0 1.0.5 @@ -26,18 +34,6 @@ 1.9.5 - - - - cloudera - https://repository.cloudera.com/artifactory/cloudera-repos/ - - - clojars.org - http://clojars.org/repo - - - commons-logging @@ -108,23 +104,23 @@ - org.xerial.snappy - snappy-java - ${snappy-java.version} + org.xerial.snappy + snappy-java + ${snappy-java.version} - junit + junit junit ${junit.version} test + - org.apache.maven.plugins maven-compiler-plugin 3.1 @@ -134,9 +130,7 @@ true - - org.apache.maven.plugins maven-shade-plugin 2.2 @@ -160,7 +154,27 @@ + + org.apache.maven.plugins + maven-release-plugin + + + + cloudera + https://repository.cloudera.com/artifactory/cloudera-repos/ + + + clojars.org + http://clojars.org/repo + + + + + scm:git:git@github.com:mesos/hadoop.git + scm:git@github.com:mesos/hadoop.git + scm:git:git@github.com:mesos/hadoop.git + From b413b252f54b2ee25b57a0bf3a28685afc9061e2 Mon Sep 17 00:00:00 2001 From: dbjohnson1978 Date: Thu, 30 Apr 2015 23:06:12 -0400 Subject: [PATCH 07/14] Added Framework Authentication. Modified `MesosScheduler.java` and `configuration.md`. Now `mapred.mesos.framework.principal`, `mapred.mesos.framework.secretfile`, `mapred.mesos.framework.user`, and `mapred.mesos.framework.name` are configureable options. Addresses issue #53 Added Support for Framework Authentication Added Support for Framework Authentication --- configuration.md | 30 ++++++++++++++++ .../apache/hadoop/mapred/MesosScheduler.java | 35 ++++++++++++++++--- 2 files changed, 61 insertions(+), 4 deletions(-) diff --git a/configuration.md b/configuration.md index 8990d58..ba6ae37 100644 --- a/configuration.md +++ b/configuration.md @@ -141,6 +141,36 @@ default values. role configured in "mapred.mesos.role". + + mapred.mesos.framework.name + hadoop + + This is the Mesos framework name. Defaults to Hadoop plus port information. + + + + mapred.mesos.framework.principal + hadoop + + This is the Mesos framework principal. It is used for framework authentication. + Consult the Mesos documentation for details. + + + + mapred.mesos.framework.secretfile + /location/secretfile + + Location of the file holding the Mesos framework secret. It is used for framework authentication. + Consult the Mesos documentation for details. Caution: avoid newline characters, some editor place these before end of file. + + + + mapred.mesos.framework.user + hadoop + + This is the user the Mesos framework runs as. If left unset, it defaults to the user running the scheduler. + + diff --git a/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java b/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java index 6577fe1..ea9a6d6 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java @@ -1,6 +1,7 @@ package org.apache.hadoop.mapred; import com.codahale.metrics.Meter; +import com.google.protobuf.ByteString; import org.apache.commons.httpclient.HttpHost; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -14,6 +15,7 @@ import org.apache.mesos.hadoop.Metrics; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -364,10 +366,35 @@ public synchronized void start() throws IOException { String master = conf.get("mapred.mesos.master", "local"); try { - FrameworkInfo frameworkInfo = FrameworkInfo.newBuilder().setUser("") // Let Mesos fill in the user. - .setCheckpoint(conf.getBoolean("mapred.mesos.checkpoint", false)).setRole(conf.get("mapred.mesos.role", "*")).setName("Hadoop: (RPC port: " + jobTracker.port + "," + " WebUI port: " + jobTracker.infoPort + ")").build(); - - driver = new MesosSchedulerDriver(this, frameworkInfo, master); + FrameworkInfo frameworkInfo; + FrameworkInfo.Builder frameworkInfoBuilder = FrameworkInfo.newBuilder() + .setUser(conf.get("mapred.mesos.framework.user", "")) // Let Mesos fill in the user. + .setCheckpoint(conf.getBoolean("mapred.mesos.checkpoint", false)) + .setRole(conf.get("mapred.mesos.role", "*")) + .setName(conf.get("mapred.mesos.framework.name", "Hadoop: (RPC port: " + jobTracker.port + "," + + " WebUI port: " + jobTracker.infoPort + ")")); + + Credential credential = null; + + String frameworkPrincipal = conf.get("mapred.mesos.framework.principal"); + if (frameworkPrincipal != null) { + frameworkInfoBuilder.setPrincipal(frameworkPrincipal); + String secretFile = conf.get("mapred.mesos.framework.secretfile"); + if (secretFile != null) { + credential = Credential.newBuilder() + .setSecret(ByteString.readFrom(new FileInputStream(secretFile))) + .setPrincipal(frameworkPrincipal) + .build(); + } + } + if (credential == null) { + LOG.info("Creating Schedule Driver"); + driver = new MesosSchedulerDriver(this, frameworkInfoBuilder.build(), master); + } else { + LOG.info("Creatingg Schedule Driver, attempting to authenticate with Principal: " + credential.getPrincipal() + + ", secret:" + credential.getSecret()); + driver = new MesosSchedulerDriver(this, frameworkInfoBuilder.build(), master, credential); + } driver.start(); } catch (Exception e) { // If the MesosScheduler can't be loaded, the JobTracker won't be useful From d6b0d0cf48e90bd93965d5d45f103663f3c4daec Mon Sep 17 00:00:00 2001 From: Tom Arnfeld Date: Tue, 19 May 2015 10:03:26 +0100 Subject: [PATCH 08/14] Sync the tracker.stop() call to the scheduler --- .../java/org/apache/hadoop/mapred/MesosScheduler.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java b/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java index 6577fe1..c2e3584 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java @@ -324,10 +324,11 @@ public void killTracker(MesosTracker tracker) { } synchronized (this) { driver.killTask(tracker.taskId); - } - tracker.stop(); - if (mesosTrackers.get(tracker.host) == tracker) { - mesosTrackers.remove(tracker.host); + + tracker.stop(); + if (mesosTrackers.get(tracker.host) == tracker) { + mesosTrackers.remove(tracker.host); + } } } From f547cb0c9d2e17c59a755d4b283a46fa8576b72b Mon Sep 17 00:00:00 2001 From: Tom Arnfeld Date: Tue, 19 May 2015 10:03:44 +0100 Subject: [PATCH 09/14] Refactor code to determine if a TT is idle Previously the "idle check" would be run against all task trackers regardless of whether they have any jobs assigned to them or not. The main MesosScheduler is responsible for cleaning up task trackers once jobs have *finished* so this change stops us performing idle checks on trackers that have no jobs. This change fixes the observed behaviour of task trackers being killed and respawning continuously when they're waiting for jobs (e.g with the min map/reduce slot config option, or the fixed resource policy). --- .../apache/hadoop/mapred/MesosTracker.java | 87 ++++++++++--------- 1 file changed, 44 insertions(+), 43 deletions(-) diff --git a/src/main/java/org/apache/hadoop/mapred/MesosTracker.java b/src/main/java/org/apache/hadoop/mapred/MesosTracker.java index 7424ede..eb73660 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosTracker.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosTracker.java @@ -118,57 +118,58 @@ protected void scheduleIdleCheck() { scheduler.scheduleTimer(new Runnable() { @Override public void run() { - // We're not interested if the task tracker has been stopped. - if (MesosTracker.this.stopped) { - return; - } - - // If the task tracker isn't active, wait until it is active. - // TODO(tarnfeld): Do this based on some kind of lock/wait? - if (!MesosTracker.this.active) { - scheduleIdleCheck(); - return; - } + synchronized (MesosTracker.this.scheduler) { - boolean trackerIsIdle; - - // We're only interested in TaskTrackers which have jobs assigned to them - // but are completely idle. The MesosScheduler is in charge of destroying - // task trackers that are not handling any jobs, so we can leave those alone. - if (MesosTracker.this.idleCounter >= MesosTracker.this.idleCheckMax) { - LOG.info("Killing idle tasktracker: " + MesosTracker.this.host); - MesosTracker.this.scheduler.killTracker(MesosTracker.this); - scheduleIdleCheck(); - return; - } + // Stop the idle check timer if the scheduler has been stopped. + if (MesosTracker.this.stopped) { + return; + } - long idleMapSlots = 0; - long idleReduceSlots = 0; + // If the task tracker isn't active, wait until it is active. + // If the task tracker has no jobs assigned to it, ignore it. We're + // only interested in a tracker that has jobs but isn't using any of + // the slots. + if (!MesosTracker.this.active || MesosTracker.this.jobs.isEmpty()) { + scheduleIdleCheck(); + return; + } - Collection taskTrackers = scheduler.jobTracker.taskTrackers(); - for (TaskTrackerStatus status : taskTrackers) { - HttpHost host = new HttpHost(status.getHost(), status.getHttpPort()); - if (host.toString().equals(MesosTracker.this.host.toString())) { - idleMapSlots += status.getAvailableMapSlots(); - idleReduceSlots += status.getAvailableReduceSlots(); - break; + // If the tracker has been idle for too long, kill it. + if (MesosTracker.this.idleCounter >= MesosTracker.this.idleCheckMax) { + LOG.info("Killing idle tasktracker " + MesosTracker.this.host); + MesosTracker.this.scheduler.killTracker(MesosTracker.this); + scheduleIdleCheck(); + return; } - } - trackerIsIdle = idleMapSlots == MesosTracker.this.mapSlots && - idleReduceSlots == MesosTracker.this.reduceSlots; + // Calculate the number of map and reduce slots that are currently + // occupied on the task tracker. + long occupiedMapSlots = 0; + long occupiedReduceSlots = 0; + Collection taskTrackers = scheduler.jobTracker.taskTrackers(); + for (TaskTrackerStatus status : taskTrackers) { + HttpHost host = new HttpHost(status.getHost(), status.getHttpPort()); + if (host.toString().equals(MesosTracker.this.host.toString())) { + occupiedMapSlots += status.countOccupiedMapSlots(); + occupiedReduceSlots += status.countOccupiedMapSlots(); + break; + } + } - if (trackerIsIdle) { - LOG.info("TaskTracker appears idle right now: " + MesosTracker.this.host); - MesosTracker.this.idleCounter += 1; - } else { - if (MesosTracker.this.idleCounter > 0) { - LOG.info("TaskTracker is no longer idle: " + MesosTracker.this.host); + // If there are zero slots occupied (either map OR reduce slots) then + // we class the tracker as idle. + if (occupiedMapSlots == 0 && occupiedReduceSlots == 0) { + LOG.info("TaskTracker appears idle right now: " + MesosTracker.this.host); + MesosTracker.this.idleCounter += 1; + } else { + if (MesosTracker.this.idleCounter > 0) { + LOG.info("TaskTracker is no longer idle: " + MesosTracker.this.host); + } + MesosTracker.this.idleCounter = 0; } - MesosTracker.this.idleCounter = 0; - } - scheduleIdleCheck(); + scheduleIdleCheck(); + } } }, MesosTracker.this.idleCheckInterval, TimeUnit.SECONDS); } From dc3e288ccce15c43991be7ed625393575c2f271e Mon Sep 17 00:00:00 2001 From: Tom Arnfeld Date: Thu, 21 May 2015 00:44:56 +0100 Subject: [PATCH 10/14] Split MAP and REDUCE tasks into individual mesos tasks This commit splits out the resources for MAP and REDUCE slots into two Mesos tasks instead of one. This allows the idle-slot tracking to operator on MAP and REDUCE slots individually further increasing our ability to release idle resources faster. --- .../apache/hadoop/mapred/MesosExecutor.java | 228 ++++++++++++------ .../apache/hadoop/mapred/MesosScheduler.java | 75 +++--- .../apache/hadoop/mapred/MesosTracker.java | 129 ++++++---- .../apache/hadoop/mapred/ResourcePolicy.java | 112 +++++---- 4 files changed, 351 insertions(+), 193 deletions(-) diff --git a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java index 7bba393..2aa1538 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java @@ -2,6 +2,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapreduce.TaskType; import org.apache.mesos.Executor; import org.apache.mesos.ExecutorDriver; import org.apache.mesos.MesosExecutorDriver; @@ -11,8 +12,15 @@ import java.io.*; +import java.util.Map; + import java.lang.reflect.Field; -import java.lang.ReflectiveOperationException; +import java.lang.reflect.Method; +import java.lang.reflect.InvocationTargetException; + +import java.lang.IllegalAccessException; +import java.lang.NoSuchFieldException; +import java.lang.NoSuchMethodException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; @@ -24,8 +32,12 @@ public class MesosExecutor implements Executor { protected final ScheduledExecutorService timerScheduler = Executors.newScheduledThreadPool(1); + + private boolean suicideTimerScheduled = false; private SlaveInfo slaveInfo; private TaskTracker taskTracker; + private TaskID mapTaskId; + private TaskID reduceTaskId; public static void main(String[] args) { MesosExecutorDriver driver = new MesosExecutorDriver(new MesosExecutor()); @@ -55,52 +67,79 @@ public void disconnected(ExecutorDriver driver) { public void launchTask(final ExecutorDriver driver, final TaskInfo task) { LOG.info("Launching task : " + task.getTaskId().getValue()); - // Get configuration from task data (prepared by the JobTracker). - JobConf conf = configure(task); + synchronized(this) { - // NOTE: We need to manually set the context class loader here because, - // the TaskTracker is unable to find LoginModule class otherwise. - Thread.currentThread().setContextClassLoader( - TaskTracker.class.getClassLoader()); + // Keep track of all the IDs we've been asked to monitor + if (task.getTaskId().getValue().endsWith("_Map")) { + mapTaskId = task.getTaskId(); + } else if (task.getTaskId().getValue().endsWith("_Reduce")) { + reduceTaskId = task.getTaskId(); + } else { + driver.sendStatusUpdate(TaskStatus.newBuilder() + .setTaskId(task.getTaskId()) + .setState(TaskState.TASK_LOST).build()); - try { - taskTracker = new TaskTracker(conf); - } catch (IOException | InterruptedException e) { - LOG.fatal("Failed to start TaskTracker", e); - System.exit(1); - } + return; + } - // Spin up a TaskTracker in a new thread. - new Thread("TaskTracker Run Thread") { - @Override - public void run() { - try { - taskTracker.run(); + if (taskTracker == null) { + // Get configuration from task data (prepared by the JobTracker). + JobConf conf = configure(task); - // Send a TASK_FINISHED status update. - // We do this here because we want to send it in a separate thread - // than was used to call killTask(). - driver.sendStatusUpdate(TaskStatus.newBuilder() - .setTaskId(task.getTaskId()) - .setState(TaskState.TASK_FINISHED) - .build()); + // NOTE: We need to manually set the context class loader here because, + // the TaskTracker is unable to find LoginModule class otherwise. + Thread.currentThread().setContextClassLoader( + TaskTracker.class.getClassLoader()); - // Give some time for the update to reach the slave. - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - LOG.error("Failed to sleep TaskTracker thread", e); - } - - // Stop the executor. - driver.stop(); - } catch (Throwable t) { - LOG.fatal("Caught exception, committing suicide.", t); - driver.stop(); + try { + taskTracker = new TaskTracker(conf); + } catch (IOException | InterruptedException e) { + LOG.fatal("Failed to start TaskTracker", e); System.exit(1); } + + // Spin up a TaskTracker in a new thread. + new Thread("TaskTracker Run Thread") { + @Override + public void run() { + try { + taskTracker.run(); + + // Send a TASK_FINISHED status update. + // We do this here because we want to send it in a separate thread + // than was used to call killTask(). + if (mapTaskId != null) { + driver.sendStatusUpdate(TaskStatus.newBuilder() + .setTaskId(mapTaskId) + .setState(TaskState.TASK_FINISHED) + .build()); + } + + if (reduceTaskId != null) { + driver.sendStatusUpdate(TaskStatus.newBuilder() + .setTaskId(reduceTaskId) + .setState(TaskState.TASK_FINISHED) + .build()); + } + + // Give some time for the update to reach the slave. + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + LOG.error("Failed to sleep TaskTracker thread", e); + } + + // Stop the executor. + driver.stop(); + } catch (Throwable t) { + LOG.fatal("Caught exception, committing suicide.", t); + driver.stop(); + System.exit(1); + } + } + }.start(); } - }.start(); + } driver.sendStatusUpdate(TaskStatus.newBuilder() .setTaskId(task.getTaskId()) @@ -110,35 +149,50 @@ public void run() { @Override public void killTask(final ExecutorDriver driver, final TaskID taskId) { LOG.info("Killing task : " + taskId.getValue()); - if (taskTracker != null) { - LOG.info("Revoking task tracker map/reduce slots"); - - // terminate the task launchers - try { - killLauncher(taskTracker, "mapLauncher"); - killLauncher(taskTracker, "reduceLauncher"); - } catch (ReflectiveOperationException e) { - LOG.fatal("Failed updating map slots due to error with reflection", e); - } - // Configure the new slot counts on the task tracker - taskTracker.setMaxMapSlots(0); - taskTracker.setMaxReduceSlots(0); + new Thread("TaskTrackerKillThread") { + @Override + public void run() { + + // Commit suicide when no jobs are running + scheduleSuicideTimer(); - // commit suicide when no jobs are running - scheduleSuicideTimer(); + if (mapTaskId != null && taskId.equals(mapTaskId)) { + LOG.info("Revoking task tracker MAP slots"); + + // Revoke the slots from the task tracker + try { + revokeSlots(taskTracker, TaskType.MAP); + } catch (NoSuchFieldException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + LOG.error("Caught exception revoking MAP slots: ", e); + } + + driver.sendStatusUpdate(TaskStatus.newBuilder() + .setTaskId(taskId) + .setState(TaskState.TASK_FINISHED) + .build()); + + mapTaskId = null; + + } else if (reduceTaskId != null && taskId.equals(reduceTaskId)) { + LOG.info("Revoking task tracker REDUCE slots"); + + // Revoke the slots from the task tracker + try { + revokeSlots(taskTracker, TaskType.REDUCE); + } catch (NoSuchFieldException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + LOG.error("Caught exception revoking REDUCE slots: ", e); + } - // Send the TASK_FINISHED status - new Thread("TaskFinishedUpdate") { - @Override - public void run() { driver.sendStatusUpdate(TaskStatus.newBuilder() .setTaskId(taskId) .setState(TaskState.TASK_FINISHED) .build()); + + reduceTaskId = null; } - }.start(); - } + } + }.start(); } @Override @@ -191,28 +245,54 @@ private JobConf configure(final TaskInfo task) { } /** - * This is a hack to overcome lack of accessibility of the launcher. Will solicit feedback from Hadoop list. - * - * @param tracker tracker with launcher we want to kill - * @param name name of the field containing the launcher - * @throws NoSuchFieldException - * @throws IllegalAccessException + * revokeSlots will take away all slots of the given task type from + * the running task tracker and as a precaution, fail any tasks that are + * running in those slots. */ - private void killLauncher(TaskTracker tracker, String name) throws NoSuchFieldException, IllegalAccessException { - Field f = tracker.getClass().getDeclaredField(name); - f.setAccessible(true); - - // Kill the current map task launcher - TaskTracker.TaskLauncher launcher = ((TaskTracker.TaskLauncher) f.get(tracker)); - launcher.notifySlots(); - launcher.interrupt(); + private void revokeSlots(TaskTracker tracker, TaskType type) throws NoSuchFieldException, NoSuchMethodException, IllegalAccessException, InvocationTargetException { + synchronized(tracker) { + if (type == TaskType.MAP) { + taskTracker.setMaxMapSlots(0); + } else if (type == TaskType.REDUCE) { + taskTracker.setMaxReduceSlots(0); + } + + // Nasty horrible hacks to get inside the task tracker and take over some + // of the state handling. Even if we were to subclass the task tracker + // these methods are all private so we wouldn't be able to use them. + Field f = tracker.getClass().getDeclaredField("tasks"); + f.setAccessible(true); + Method m = tracker.getClass().getDeclaredMethod("purgeTask", + TaskTracker.TaskInProgress.class, boolean.class, boolean.class); + m.setAccessible(true); + + // Here we're basically asking the task tracker to purge and kill any + // currently running tasks that match the given task type. This will + // clean up all various bits of state inside the task tracker and also + // terminate the relevant task runners (which ultimately are child JVMs). + Map tasks = + (Map) f.get(tracker); + for (TaskTracker.TaskInProgress tip : tasks.values()) { + Task task = tip.getTask(); + if (type == TaskType.MAP && task instanceof MapTask) { + m.invoke(tip, false, false); + } + } + } } protected void scheduleSuicideTimer() { + + if (suicideTimerScheduled) { + return; + } + + suicideTimerScheduled = true; timerScheduler.schedule(new Runnable() { @Override public void run() { if (taskTracker == null) { + suicideTimerScheduled = false; return; } diff --git a/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java b/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java index 8015c25..1ebfa66 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java @@ -7,9 +7,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker; +import org.apache.hadoop.mapreduce.TaskType; import org.apache.mesos.MesosSchedulerDriver; import org.apache.mesos.Protos; import org.apache.mesos.Protos.*; +import org.apache.mesos.Protos.TaskID; import org.apache.mesos.Scheduler; import org.apache.mesos.SchedulerDriver; import org.apache.mesos.hadoop.Metrics; @@ -111,7 +113,7 @@ public void jobUpdated(JobChangeEvent event) { for (String hostname : flakyTrackers) { for (MesosTracker mesosTracker : mesosTrackers.values()) { if (mesosTracker.host.getHostName().startsWith(hostname)) { - LOG.info("Killing Mesos task: " + mesosTracker.taskId + " on host " + mesosTracker.host + " because it has been marked as flaky"); + LOG.info("Killing tracker on host " + mesosTracker.host + " because it has been marked as flaky"); if (metrics != null) { metrics.flakyTrackerKilledMeter.mark(); } @@ -141,7 +143,7 @@ public void jobUpdated(JobChangeEvent event) { // If the TaskTracker doesn't have any running job tasks assigned, kill it. if (mesosTracker.jobs.isEmpty() && mesosTracker.active) { - LOG.info("Killing Mesos task: " + mesosTracker.taskId + " on host " + mesosTracker.host + " because it is no longer needed"); + LOG.info("Killing tracker on host " + mesosTracker.host + " because it is no longer needed"); killTracker(mesosTracker); } @@ -196,18 +198,6 @@ public synchronized void statusUpdate(SchedulerDriver schedulerDriver, Protos.Ta case TASK_FAILED: case TASK_KILLED: case TASK_LOST: - // Make a copy to iterate over keys and delete values. - Set trackers = new HashSet<>(mesosTrackers.keySet()); - - // Remove the task from the map. - for (HttpHost tracker : trackers) { - if (mesosTrackers.get(tracker).taskId.equals(taskStatus.getTaskId())) { - LOG.info("Removing terminated TaskTracker: " + tracker); - mesosTrackers.get(tracker).stop(); - mesosTrackers.remove(tracker); - } - } - break; case TASK_STAGING: case TASK_STARTING: case TASK_RUNNING: @@ -243,6 +233,8 @@ public synchronized void slaveLost(SchedulerDriver schedulerDriver, SlaveID slav @Override public synchronized void executorLost(SchedulerDriver schedulerDriver, ExecutorID executorID, SlaveID slaveID, int status) { LOG.warn("Executor " + executorID.getValue() + " lost with status " + status + " on slave " + slaveID); + + // TODO(tarnfeld): If the executor is lost what do we do? } @Override @@ -260,19 +252,28 @@ public List assignTasks(TaskTracker taskTracker) throws IOException { if (tasks != null) { HttpHost tracker = new HttpHost(taskTracker.getStatus().getHost(), taskTracker.getStatus().getHttpPort()); MesosTracker mesosTracker = mesosTrackers.get(tracker); - synchronized (this) { - if (mesosTracker != null) { - if (mesosTracker.stopped) { - // Make sure we're not asked to assign tasks to any task trackers that have - // been stopped. This could happen while the task tracker has not been - // removed from the cluster e.g still in the heartbeat timeout period. - LOG.info("Asked to assign tasks to stopped tracker " + tracker + "."); - return null; - } else { - // Keep track of which TaskTracker contains which tasks. - for (Task task : tasks) { - mesosTracker.jobs.add(task.getJobID()); + if (mesosTracker != null) { + synchronized (this) { + for (Iterator iterator = tasks.iterator(); iterator.hasNext();) { + + // Throw away any task types that don't match up with the current known + // slot allocation of the tracker. We do this here because when we change + // the slot allocation of a running Task Tracker it can take time for + // this information to propagate around the system and we can preemptively + // avoid scheduling tasks to task trackers we know not to have capacity. + Task task = iterator.next(); + if (task instanceof MapTask && mesosTracker.mapSlots == 0) { + LOG.debug("Removed map task from TT assignment due to mismatching slots"); + iterator.remove(); + continue; + } else if (task instanceof ReduceTask && mesosTracker.reduceSlots == 0) { + LOG.debug("Removed reduce task from TT assignment due to mismatching slots"); + iterator.remove(); + continue; } + + // Keep track of which TaskTracker contains which tasks. + mesosTracker.jobs.add(task.getJobID()); } } } @@ -321,15 +322,27 @@ public int getPendingTasks(TaskInProgress[] tasks) { } public void killTracker(MesosTracker tracker) { + killTrackerSlots(tracker, TaskType.MAP); + killTrackerSlots(tracker, TaskType.REDUCE); + } + + // killTrackerSlots will ask the given MesosTraker to revoke + // the allocated task slots, for the given type of slot (MAP/REDUCE). + public void killTrackerSlots(MesosTracker tracker, TaskType type) { if (metrics != null) { metrics.killMeter.mark(); } - synchronized (this) { - driver.killTask(tracker.taskId); - tracker.stop(); - if (mesosTrackers.get(tracker.host) == tracker) { - mesosTrackers.remove(tracker.host); + synchronized (this) { + TaskID taskId = tracker.getTaskId(type); + if (taskId != null) { + driver.killTask(taskId); + + if (type == TaskType.MAP) { + tracker.mapSlots = 0; + } else if (type == TaskType.REDUCE) { + tracker.reduceSlots = 0; + } } } } diff --git a/src/main/java/org/apache/hadoop/mapred/MesosTracker.java b/src/main/java/org/apache/hadoop/mapred/MesosTracker.java index eb73660..2c9d520 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosTracker.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosTracker.java @@ -3,6 +3,7 @@ import org.apache.commons.httpclient.HttpHost; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapreduce.TaskType; import org.apache.mesos.Protos.TaskID; import java.util.Collection; @@ -18,23 +19,27 @@ public class MesosTracker { public static final Log LOG = LogFactory.getLog(MesosScheduler.class); public volatile HttpHost host; - public TaskID taskId; + public TaskID mapTaskId; + public TaskID reduceTaskId; public long mapSlots; public long reduceSlots; - public volatile long idleCounter = 0; + // Number of idle check cycles all map slots are idle + public volatile long idleMapCounter = 0; + // Number of idle check cycles all reduce slots are idle + public volatile long idleReduceCounter = 0; public volatile long idleCheckInterval = 0; public volatile long idleCheckMax = 0; public volatile boolean active = false; // Set once tracked by the JobTracker. - public volatile boolean stopped = false; public volatile MesosScheduler scheduler; // Tracks Hadoop jobs running on the tracker. public Set jobs = Collections.newSetFromMap(new ConcurrentHashMap()); public com.codahale.metrics.Timer.Context context; - public MesosTracker(HttpHost host, TaskID taskId, long mapSlots, - long reduceSlots, MesosScheduler scheduler) { + public MesosTracker(HttpHost host, TaskID mapTaskId, TaskID reduceTaskId, + long mapSlots, long reduceSlots, MesosScheduler scheduler) { this.host = host; - this.taskId = taskId; + this.mapTaskId = mapTaskId; + this.reduceTaskId = reduceTaskId; this.mapSlots = mapSlots; this.reduceSlots = reduceSlots; this.scheduler = scheduler; @@ -54,6 +59,16 @@ public MesosTracker(HttpHost host, TaskID taskId, long mapSlots, } } + public TaskID getTaskId(TaskType type) { + if (type == TaskType.MAP) { + return mapTaskId; + } else if (type == TaskType.REDUCE) { + return reduceTaskId; + } + + return null; + } + protected void scheduleStartupTimer() { scheduler.scheduleTimer(new Runnable() { @Override @@ -83,8 +98,14 @@ public void run() { if (MesosTracker.this.scheduler.metrics != null) { MesosTracker.this.scheduler.metrics.launchTimeout.mark(); } + LOG.warn("Tracker " + MesosTracker.this.host + " failed to launch within " + MesosScheduler.LAUNCH_TIMEOUT_MS / 1000 + " seconds, killing it"); + + // Kill the MAP and REDUCE slot tasks. This doesn't directly kill the + // task tracker but it will result in the task tracker receiving no + // tasks and ultimately lead to it's death. Best case the task is broken + // and it will never come up on Mesos. MesosTracker.this.scheduler.killTracker(MesosTracker.this); } }, MesosScheduler.LAUNCH_TIMEOUT_MS, TimeUnit.MILLISECONDS); @@ -120,11 +141,6 @@ protected void scheduleIdleCheck() { public void run() { synchronized (MesosTracker.this.scheduler) { - // Stop the idle check timer if the scheduler has been stopped. - if (MesosTracker.this.stopped) { - return; - } - // If the task tracker isn't active, wait until it is active. // If the task tracker has no jobs assigned to it, ignore it. We're // only interested in a tracker that has jobs but isn't using any of @@ -134,38 +150,13 @@ public void run() { return; } - // If the tracker has been idle for too long, kill it. - if (MesosTracker.this.idleCounter >= MesosTracker.this.idleCheckMax) { - LOG.info("Killing idle tasktracker " + MesosTracker.this.host); - MesosTracker.this.scheduler.killTracker(MesosTracker.this); - scheduleIdleCheck(); - return; + // Perform the idle checks for map and reduce slots + if (MesosTracker.this.mapSlots > 0) { + idleMapCheck(); } - // Calculate the number of map and reduce slots that are currently - // occupied on the task tracker. - long occupiedMapSlots = 0; - long occupiedReduceSlots = 0; - Collection taskTrackers = scheduler.jobTracker.taskTrackers(); - for (TaskTrackerStatus status : taskTrackers) { - HttpHost host = new HttpHost(status.getHost(), status.getHttpPort()); - if (host.toString().equals(MesosTracker.this.host.toString())) { - occupiedMapSlots += status.countOccupiedMapSlots(); - occupiedReduceSlots += status.countOccupiedMapSlots(); - break; - } - } - - // If there are zero slots occupied (either map OR reduce slots) then - // we class the tracker as idle. - if (occupiedMapSlots == 0 && occupiedReduceSlots == 0) { - LOG.info("TaskTracker appears idle right now: " + MesosTracker.this.host); - MesosTracker.this.idleCounter += 1; - } else { - if (MesosTracker.this.idleCounter > 0) { - LOG.info("TaskTracker is no longer idle: " + MesosTracker.this.host); - } - MesosTracker.this.idleCounter = 0; + if (MesosTracker.this.reduceSlots > 0) { + idleReduceCheck(); } scheduleIdleCheck(); @@ -174,11 +165,57 @@ public void run() { }, MesosTracker.this.idleCheckInterval, TimeUnit.SECONDS); } - public void stop() { - active = false; - stopped = true; - if (context != null) { - context.stop(); + protected void idleMapCheck() { + + // If the map slots has been idle for too long, kill them. + if (this.idleMapCounter >= MesosTracker.this.idleCheckMax) { + LOG.info("Killing MAP slots on idle Task Tracker " + MesosTracker.this.host); + MesosTracker.this.scheduler.killTrackerSlots(MesosTracker.this, TaskType.MAP); + return; + } + + long occupiedMapSlots = 0; + Collection taskTrackers = scheduler.jobTracker.taskTrackers(); + for (TaskTrackerStatus status : taskTrackers) { + HttpHost host = new HttpHost(status.getHost(), status.getHttpPort()); + if (host.toString().equals(MesosTracker.this.host.toString())) { + occupiedMapSlots += status.countOccupiedMapSlots(); + break; + } + } + + if (occupiedMapSlots == 0) { + LOG.info("TaskTracker MAP slots appear idle right now: " + MesosTracker.this.host); + MesosTracker.this.idleMapCounter += 1; + } else { + MesosTracker.this.idleMapCounter = 0; + } + } + + protected void idleReduceCheck() { + + // If the reduce slots has been idle for too long, kill them. + if (this.idleReduceCounter >= MesosTracker.this.idleCheckMax) { + LOG.info("Killing REDUCE slots on idle Task Tracker " + MesosTracker.this.host); + MesosTracker.this.scheduler.killTrackerSlots(MesosTracker.this, TaskType.REDUCE); + return; + } + + long occupiedReduceSlots = 0; + Collection taskTrackers = scheduler.jobTracker.taskTrackers(); + for (TaskTrackerStatus status : taskTrackers) { + HttpHost host = new HttpHost(status.getHost(), status.getHttpPort()); + if (host.toString().equals(MesosTracker.this.host.toString())) { + occupiedReduceSlots += status.countOccupiedReduceSlots(); + break; + } + } + + if (occupiedReduceSlots == 0) { + LOG.info("TaskTracker REDUCE slots appear idle right now: " + MesosTracker.this.host); + MesosTracker.this.idleReduceCounter += 1; + } else { + MesosTracker.this.idleReduceCounter = 0; } } } diff --git a/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java b/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java index 5d1e643..95b4b37 100644 --- a/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java +++ b/src/main/java/org/apache/hadoop/mapred/ResourcePolicy.java @@ -331,7 +331,7 @@ public void resourceOffers(SchedulerDriver schedulerDriver, List offers) ExecutorInfo executor = ExecutorInfo .newBuilder() .setExecutorId(ExecutorID.newBuilder().setValue( - "executor_" + taskId.getValue())) + "Executor_" + taskId.getValue())) .setName("Hadoop TaskTracker") .setSource(taskId.getValue()) .addResources( @@ -370,51 +370,34 @@ public void resourceOffers(SchedulerDriver schedulerDriver, List offers) continue; } - // Create the TaskTracker TaskInfo - TaskInfo trackerTaskInfo = TaskInfo - .newBuilder() - .setName(taskId.getValue()) - .setTaskId(taskId) - .setSlaveId(offer.getSlaveId()) - .addResources( - Resource - .newBuilder() - .setName("ports") - .setType(Value.Type.RANGES) - .setRole(portsRole) - .setRanges( - Value.Ranges - .newBuilder() - .addRange(Value.Range.newBuilder() - .setBegin(httpAddress.getPort()) - .setEnd(httpAddress.getPort())) - .addRange(Value.Range.newBuilder() - .setBegin(reportAddress.getPort()) - .setEnd(reportAddress.getPort())))) - .addResources( - Resource - .newBuilder() - .setName("cpus") - .setType(Value.Type.SCALAR) - .setRole(cpuRole) - .setScalar(Value.Scalar.newBuilder().setValue(taskCpus - containerCpus))) - .addResources( - Resource - .newBuilder() - .setName("mem") - .setType(Value.Type.SCALAR) - .setRole(memRole) - .setScalar(Value.Scalar.newBuilder().setValue(taskMem - containerCpus))) - .setData(taskData) - .setExecutor(executor) - .build(); + List tasks = new ArrayList(); + TaskID mapTaskId = null; + TaskID reduceTaskId = null; + + if (mapSlots > 0) { + TaskInfo mapTask = buildTaskInfo(executor, taskId.getValue() + "_Map", offer, + httpAddress.getPort(), reportAddress.getPort(), mapSlots, taskData, + portsRole, cpuRole, memRole); + + mapTaskId = mapTask.getTaskId(); + tasks.add(mapTask); + } + + if (reduceSlots > 0) { + TaskInfo reduceTask = buildTaskInfo(executor, taskId.getValue() + "_Reduce", offer, + httpAddress.getPort(), reportAddress.getPort(), reduceSlots, taskData, + portsRole, cpuRole, memRole); + + reduceTaskId = reduceTask.getTaskId(); + tasks.add(reduceTask); + } // Add this tracker to Mesos tasks. - scheduler.mesosTrackers.put(httpAddress, new MesosTracker(httpAddress, taskId, - mapSlots, reduceSlots, scheduler)); + scheduler.mesosTrackers.put(httpAddress, new MesosTracker(httpAddress, + mapTaskId, reduceTaskId, mapSlots, reduceSlots, scheduler)); // Launch the task - schedulerDriver.launchTasks(Arrays.asList(offer.getId()), Arrays.asList(trackerTaskInfo)); + schedulerDriver.launchTasks(Arrays.asList(offer.getId()), tasks); neededMapSlots -= mapSlots; neededReduceSlots -= reduceSlots; @@ -431,6 +414,51 @@ public void resourceOffers(SchedulerDriver schedulerDriver, List offers) } } + protected TaskInfo buildTaskInfo(ExecutorInfo executor, String taskId, Offer offer, + Integer httpPort, Integer reportPort, long slots, ByteString taskData, + String portsRole, String cpuRole, String memRole) { + + TaskInfo taskInfo = TaskInfo + .newBuilder() + .setName(taskId) + .setTaskId(TaskID.newBuilder().setValue(taskId)) + .setSlaveId(offer.getSlaveId()) + .addResources( + Resource + .newBuilder() + .setName("ports") + .setType(Value.Type.RANGES) + .setRole(portsRole) + .setRanges( + Value.Ranges + .newBuilder() + .addRange(Value.Range.newBuilder() + .setBegin(httpPort) + .setEnd(httpPort)) + .addRange(Value.Range.newBuilder() + .setBegin(reportPort) + .setEnd(reportPort)))) + .addResources( + Resource + .newBuilder() + .setName("cpus") + .setType(Value.Type.SCALAR) + .setRole(cpuRole) + .setScalar(Value.Scalar.newBuilder().setValue(slotCpus * slots))) + .addResources( + Resource + .newBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setRole(memRole) + .setScalar(Value.Scalar.newBuilder().setValue(slotMem * slots))) + .setData(taskData) + .setExecutor(executor) + .build(); + + return taskInfo; + } + public void computeNeededSlots(List jobsInProgress, Collection taskTrackers) { // Compute the number of pending maps and reduces. From 05b7acd9780a95b52dfd5981cba8af6d59102542 Mon Sep 17 00:00:00 2001 From: Tom Arnfeld Date: Thu, 21 May 2015 02:27:08 +0100 Subject: [PATCH 11/14] Remove killTasks() synchronized block to avoid a deadlock In one of the code paths (related to flaky trackers) we were using synchronized() in nested function calls, agains the same object. This is not needed, and causes a deadlock. --- .../apache/hadoop/mapred/MesosScheduler.java | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java b/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java index 1ebfa66..2cf9b6a 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java @@ -333,16 +333,14 @@ public void killTrackerSlots(MesosTracker tracker, TaskType type) { metrics.killMeter.mark(); } - synchronized (this) { - TaskID taskId = tracker.getTaskId(type); - if (taskId != null) { - driver.killTask(taskId); - - if (type == TaskType.MAP) { - tracker.mapSlots = 0; - } else if (type == TaskType.REDUCE) { - tracker.reduceSlots = 0; - } + TaskID taskId = tracker.getTaskId(type); + if (taskId != null) { + driver.killTask(taskId); + + if (type == TaskType.MAP) { + tracker.mapSlots = 0; + } else if (type == TaskType.REDUCE) { + tracker.reduceSlots = 0; } } } From b9ff37696a2271e690d231dda30a473567482e38 Mon Sep 17 00:00:00 2001 From: Tom Arnfeld Date: Thu, 21 May 2015 02:43:24 +0100 Subject: [PATCH 12/14] Don't synchronized() inside the idle check callback If we synchronized() against the scheduler here and we grab hold of the lock, at the same time in another thread a callback from Mesos comes in and that too also calls synchronized(). This behaviour casuses the Mesos Scheduler driver to lock up because it's single threaded. In the event that the former then decides to kill a mesos task, we'll see a deadlock because the killTask() message can't be sent while the driver is waiting on another callback. --- .../apache/hadoop/mapred/MesosTracker.java | 35 +++++++++---------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/apache/hadoop/mapred/MesosTracker.java b/src/main/java/org/apache/hadoop/mapred/MesosTracker.java index 2c9d520..c215729 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosTracker.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosTracker.java @@ -139,28 +139,25 @@ protected void scheduleIdleCheck() { scheduler.scheduleTimer(new Runnable() { @Override public void run() { - synchronized (MesosTracker.this.scheduler) { - - // If the task tracker isn't active, wait until it is active. - // If the task tracker has no jobs assigned to it, ignore it. We're - // only interested in a tracker that has jobs but isn't using any of - // the slots. - if (!MesosTracker.this.active || MesosTracker.this.jobs.isEmpty()) { - scheduleIdleCheck(); - return; - } - - // Perform the idle checks for map and reduce slots - if (MesosTracker.this.mapSlots > 0) { - idleMapCheck(); - } + // If the task tracker isn't active, wait until it is active. + // If the task tracker has no jobs assigned to it, ignore it. We're + // only interested in a tracker that has jobs but isn't using any of + // the slots. + if (!MesosTracker.this.active || MesosTracker.this.jobs.isEmpty()) { + scheduleIdleCheck(); + return; + } - if (MesosTracker.this.reduceSlots > 0) { - idleReduceCheck(); - } + // Perform the idle checks for map and reduce slots + if (MesosTracker.this.mapSlots > 0) { + idleMapCheck(); + } - scheduleIdleCheck(); + if (MesosTracker.this.reduceSlots > 0) { + idleReduceCheck(); } + + scheduleIdleCheck(); } }, MesosTracker.this.idleCheckInterval, TimeUnit.SECONDS); } From 0d54030a8a6b10f53ef76af1b4a1d767aea81e06 Mon Sep 17 00:00:00 2001 From: Tom Arnfeld Date: Thu, 21 May 2015 11:08:36 +0100 Subject: [PATCH 13/14] Revert back to using a simpler TaskLauncher interrupt model --- .../apache/hadoop/mapred/MesosExecutor.java | 43 +++++++++---------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java index 2aa1538..ccaa059 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java @@ -16,11 +16,9 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; -import java.lang.reflect.InvocationTargetException; import java.lang.IllegalAccessException; import java.lang.NoSuchFieldException; -import java.lang.NoSuchMethodException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; @@ -163,7 +161,7 @@ public void run() { // Revoke the slots from the task tracker try { revokeSlots(taskTracker, TaskType.MAP); - } catch (NoSuchFieldException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + } catch (NoSuchFieldException | IllegalAccessException e) { LOG.error("Caught exception revoking MAP slots: ", e); } @@ -180,7 +178,7 @@ public void run() { // Revoke the slots from the task tracker try { revokeSlots(taskTracker, TaskType.REDUCE); - } catch (NoSuchFieldException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + } catch (NoSuchFieldException | IllegalAccessException e) { LOG.error("Caught exception revoking REDUCE slots: ", e); } @@ -249,35 +247,36 @@ private JobConf configure(final TaskInfo task) { * the running task tracker and as a precaution, fail any tasks that are * running in those slots. */ - private void revokeSlots(TaskTracker tracker, TaskType type) throws NoSuchFieldException, NoSuchMethodException, IllegalAccessException, InvocationTargetException { + private void revokeSlots(TaskTracker tracker, TaskType type) throws NoSuchFieldException, IllegalAccessException { synchronized(tracker) { + + String launcherProp = ""; + int maxSlots = 0; + if (type == TaskType.MAP) { taskTracker.setMaxMapSlots(0); + launcherProp = "mapLauncher"; + maxSlots = tracker.getMaxCurrentMapTasks(); } else if (type == TaskType.REDUCE) { taskTracker.setMaxReduceSlots(0); + launcherProp = "reduceLauncher"; + maxSlots = tracker.getMaxCurrentReduceTasks(); } // Nasty horrible hacks to get inside the task tracker and take over some // of the state handling. Even if we were to subclass the task tracker // these methods are all private so we wouldn't be able to use them. - Field f = tracker.getClass().getDeclaredField("tasks"); + Field f = tracker.getClass().getDeclaredField(launcherProp); f.setAccessible(true); - Method m = tracker.getClass().getDeclaredMethod("purgeTask", - TaskTracker.TaskInProgress.class, boolean.class, boolean.class); - m.setAccessible(true); - - // Here we're basically asking the task tracker to purge and kill any - // currently running tasks that match the given task type. This will - // clean up all various bits of state inside the task tracker and also - // terminate the relevant task runners (which ultimately are child JVMs). - Map tasks = - (Map) f.get(tracker); - for (TaskTracker.TaskInProgress tip : tasks.values()) { - Task task = tip.getTask(); - if (type == TaskType.MAP && task instanceof MapTask) { - m.invoke(tip, false, false); - } - } + + TaskTracker.TaskLauncher launcher = + (TaskTracker.TaskLauncher) f.get(tracker); + + // Here we add a negative amount of slots (bringing the launcher to zero) + // which causes the launcher to clean up any tasks in the launch queue + // and then we kill the thread to stop it doing anything else. + launcher.addFreeSlots(-maxSlots); + launcher.interrupt(); } } From 3f8c161a99694a9265a14114cace31f2ae032235 Mon Sep 17 00:00:00 2001 From: Tom Arnfeld Date: Thu, 21 May 2015 12:39:47 +0100 Subject: [PATCH 14/14] Ensure the suicide timer repeats itself --- .../java/org/apache/hadoop/mapred/MesosExecutor.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java index ccaa059..bd9b95b 100644 --- a/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java +++ b/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java @@ -153,7 +153,10 @@ public void killTask(final ExecutorDriver driver, final TaskID taskId) { public void run() { // Commit suicide when no jobs are running - scheduleSuicideTimer(); + if (!suicideTimerScheduled) { + scheduleSuicideTimer(); + suicideTimerScheduled = true; + } if (mapTaskId != null && taskId.equals(mapTaskId)) { LOG.info("Revoking task tracker MAP slots"); @@ -281,12 +284,6 @@ private void revokeSlots(TaskTracker tracker, TaskType type) throws NoSuchFieldE } protected void scheduleSuicideTimer() { - - if (suicideTimerScheduled) { - return; - } - - suicideTimerScheduled = true; timerScheduler.schedule(new Runnable() { @Override public void run() {