From 4050d661c5f0a48a5a043ec932d98707f4606dd5 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Mon, 1 Aug 2011 13:54:48 -0700 Subject: Updated to newest Mesos API, which includes better memory accounting by specifying per-executor memory. --- core/lib/mesos.jar | Bin 124677 -> 126006 bytes core/src/main/scala/spark/Executor.scala | 13 ++-- core/src/main/scala/spark/Job.scala | 3 +- core/src/main/scala/spark/MesosScheduler.scala | 103 ++++++++++++++++++------- core/src/main/scala/spark/SimpleJob.scala | 26 ++----- run | 3 +- 6 files changed, 94 insertions(+), 54 deletions(-) diff --git a/core/lib/mesos.jar b/core/lib/mesos.jar index 171a5ac02c..f1fde967c4 100644 Binary files a/core/lib/mesos.jar and b/core/lib/mesos.jar differ diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala index 80c95dcb99..a2af70989c 100644 --- a/core/src/main/scala/spark/Executor.scala +++ b/core/src/main/scala/spark/Executor.scala @@ -53,7 +53,8 @@ class Executor extends org.apache.mesos.Executor with Logging { class TaskRunner(desc: TaskDescription, d: ExecutorDriver) extends Runnable { override def run() = { - logInfo("Running task ID " + desc.getTaskId) + val tid = desc.getTaskId.getValue + logInfo("Running task ID " + tid) d.sendStatusUpdate(TaskStatus.newBuilder() .setTaskId(desc.getTaskId) .setState(TaskState.TASK_RUNNING) @@ -65,7 +66,7 @@ class Executor extends org.apache.mesos.Executor with Logging { val task = Utils.deserialize[Task[Any]](desc.getData.toByteArray, classLoader) for (gen <- task.generation) // Update generation if any is set env.mapOutputTracker.updateGeneration(gen) - val value = task.run(desc.getTaskId.getValue.toInt) + val value = task.run(tid.toInt) val accumUpdates = Accumulators.values val result = new TaskResult(value, accumUpdates) d.sendStatusUpdate(TaskStatus.newBuilder() @@ -73,7 +74,7 @@ class Executor extends org.apache.mesos.Executor with Logging { .setState(TaskState.TASK_FINISHED) .setData(ByteString.copyFrom(Utils.serialize(result))) .build()) - logInfo("Finished task ID " + desc.getTaskId) + logInfo("Finished task ID " + tid) } catch { case ffe: FetchFailedException => { val reason = ffe.toTaskEndReason @@ -85,7 +86,7 @@ class Executor extends org.apache.mesos.Executor with Logging { } case t: Throwable => { // TODO: Handle errors in tasks less dramatically - logError("Exception in task ID " + desc.getTaskId, t) + logError("Exception in task ID " + tid, t) System.exit(1) } } @@ -144,8 +145,8 @@ class Executor extends org.apache.mesos.Executor with Logging { logError("Error from Mesos: %s (code %d)".format(message, code)) } - override def killTask(d: ExecutorDriver, tid: TaskID) { - logWarning("Mesos asked us to kill task " + tid + "; ignoring (not yet implemented)") + override def killTask(d: ExecutorDriver, t: TaskID) { + logWarning("Mesos asked us to kill task " + t.getValue + "; ignoring (not yet implemented)") } override def shutdown(d: ExecutorDriver) {} diff --git a/core/src/main/scala/spark/Job.scala b/core/src/main/scala/spark/Job.scala index 9d75c08398..acff8ce561 100644 --- a/core/src/main/scala/spark/Job.scala +++ b/core/src/main/scala/spark/Job.scala @@ -8,8 +8,7 @@ import org.apache.mesos.Protos._ * job by implementing various callbacks. */ abstract class Job(jobId: Int) { - def slaveOffer(s: SlaveOffer, availableCpus: Double, availableMem: Double) - : Option[TaskDescription] + def slaveOffer(s: SlaveOffer, availableCpus: Double): Option[TaskDescription] def statusUpdate(t: TaskStatus): Unit diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala index 7f43fffebf..9ca316d953 100644 --- a/core/src/main/scala/spark/MesosScheduler.scala +++ b/core/src/main/scala/spark/MesosScheduler.scala @@ -33,15 +33,25 @@ extends MScheduler with DAGScheduler with Logging "SPARK_LIBRARY_PATH" ) + // Memory used by each executor (in megabytes) + val EXECUTOR_MEMORY = { + if (System.getenv("SPARK_MEM") != null) + memoryStringToMb(System.getenv("SPARK_MEM")) + // TODO: Might need to add some extra memory for the non-heap parts of the JVM + else + 512 + } + // Lock used to wait for scheduler to be registered private var isRegistered = false private val registeredLock = new Object() - private var activeJobs = new HashMap[Int, Job] - private var activeJobsQueue = new Queue[Job] + private val activeJobs = new HashMap[Int, Job] + private val activeJobsQueue = new Queue[Job] - private var taskIdToJobId = new HashMap[String, Int] - private var jobTasks = new HashMap[Int, HashSet[String]] + private val taskIdToJobId = new HashMap[String, Int] + private val taskIdToSlaveId = new HashMap[String, String] + private val jobTasks = new HashMap[Int, HashSet[String]] // Incrementing job and task IDs private var nextJobId = 0 @@ -50,6 +60,9 @@ extends MScheduler with DAGScheduler with Logging // Driver for talking to Mesos var driver: SchedulerDriver = null + // Which nodes we have executors on + private val slavesWithExecutors = new HashSet[String] + // JAR server, if any JARs were added by the user to the SparkContext var jarServer: HttpServer = null @@ -110,11 +123,18 @@ extends MScheduler with DAGScheduler with Logging .build()) } } + val memory = Resource.newBuilder() + .setName("mem") + .setType(Resource.Type.SCALAR) + .setScalar(Resource.Scalar.newBuilder() + .setValue(EXECUTOR_MEMORY).build()) + .build() ExecutorInfo.newBuilder() .setExecutorId(ExecutorID.newBuilder().setValue("default").build()) .setUri(execScript) .setData(ByteString.copyFrom(createExecArg())) .setParams(params.build()) + .addResources(memory) .build() } @@ -138,6 +158,7 @@ extends MScheduler with DAGScheduler with Logging activeJobs -= job.getId activeJobsQueue.dequeueAll(x => (x == job)) taskIdToJobId --= jobTasks(job.getId) + taskIdToSlaveId --= jobTasks(job.getId) jobTasks.remove(job.getId) } } @@ -162,30 +183,32 @@ extends MScheduler with DAGScheduler with Logging * our active jobs for tasks in FIFO order. We fill each node with tasks in * a round-robin manner so that tasks are balanced across the cluster. */ - override def resourceOffer( - d: SchedulerDriver, oid: OfferID, offers: JList[SlaveOffer]) { + override def resourceOffer(d: SchedulerDriver, oid: OfferID, offers: JList[SlaveOffer]) { synchronized { val tasks = new JArrayList[TaskDescription] val availableCpus = offers.map(o => getResource(o.getResourcesList(), "cpus")) - val availableMem = offers.map(o => getResource(o.getResourcesList(), "mem")) + val enoughMem = offers.map(o => { + val mem = getResource(o.getResourcesList(), "mem") + val slaveId = o.getSlaveId.getValue + mem > EXECUTOR_MEMORY || slavesWithExecutors.contains(slaveId) + }) var launchedTask = false for (job <- activeJobsQueue) { do { launchedTask = false - for (i <- 0 until offers.size.toInt) { - try { - job.slaveOffer(offers(i), availableCpus(i), availableMem(i)) match { - case Some(task) => - tasks.add(task) - taskIdToJobId(task.getTaskId.getValue) = job.getId - jobTasks(job.getId) += task.getTaskId.getValue - availableCpus(i) -= getResource(task.getResourcesList(), "cpus") - availableMem(i) -= getResource(task.getResourcesList(), "mem") - launchedTask = true - case None => {} - } - } catch { - case e: Exception => logError("Exception in resourceOffer", e) + for (i <- 0 until offers.size if enoughMem(i)) { + job.slaveOffer(offers(i), availableCpus(i)) match { + case Some(task) => + tasks.add(task) + val tid = task.getTaskId.getValue + val sid = offers(i).getSlaveId.getValue + taskIdToJobId(tid) = job.getId + jobTasks(job.getId) += tid + taskIdToSlaveId(tid) = sid + slavesWithExecutors += sid + availableCpus(i) -= getResource(task.getResourcesList(), "cpus") + launchedTask = true + case None => {} } } } while (launchedTask) @@ -214,19 +237,24 @@ extends MScheduler with DAGScheduler with Logging override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { synchronized { try { - taskIdToJobId.get(status.getTaskId.getValue) match { + val tid = status.getTaskId.getValue + if (status.getState == TaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) { + // We lost the executor on this slave, so remember that it's gone + slavesWithExecutors -= taskIdToSlaveId(tid) + } + taskIdToJobId.get(tid) match { case Some(jobId) => if (activeJobs.contains(jobId)) { activeJobs(jobId).statusUpdate(status) } if (isFinished(status.getState)) { - taskIdToJobId.remove(status.getTaskId.getValue) + taskIdToJobId.remove(tid) if (jobTasks.contains(jobId)) - jobTasks(jobId) -= status.getTaskId.getValue + jobTasks(jobId) -= tid + taskIdToSlaveId.remove(tid) } case None => - logInfo("Ignoring update from TID " + status.getTaskId.getValue + - " because its job is gone") + logInfo("Ignoring update from TID " + tid + " because its job is gone") } } catch { case e: Exception => logError("Exception in statusUpdate", e) @@ -320,7 +348,28 @@ extends MScheduler with DAGScheduler with Logging override def frameworkMessage(d: SchedulerDriver, s: SlaveID, e: ExecutorID, b: Array[Byte]) {} - override def slaveLost(d: SchedulerDriver, s: SlaveID) {} + override def slaveLost(d: SchedulerDriver, s: SlaveID) { + slavesWithExecutors.remove(s.getValue) + } override def offerRescinded(d: SchedulerDriver, o: OfferID) {} + + /** + * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a + * number of megabytes. This is used to figure out how much memory to claim + * from Mesos based on the SPARK_MEM environment variable. + */ + def memoryStringToMb(str: String): Int = { + val lower = str.toLowerCase + if (lower.endsWith("k")) + (lower.substring(0, lower.length-1).toLong / 1024).toInt + else if (lower.endsWith("m")) + lower.substring(0, lower.length-1).toInt + else if (lower.endsWith("g")) + lower.substring(0, lower.length-1).toInt * 1024 + else if (lower.endsWith("t")) + lower.substring(0, lower.length-1).toInt * 1024 * 1024 + else // no suffix, so it's just a number in bytes + (lower.toLong / 1024 / 1024).toInt + } } diff --git a/core/src/main/scala/spark/SimpleJob.scala b/core/src/main/scala/spark/SimpleJob.scala index 811725b055..2001205878 100644 --- a/core/src/main/scala/spark/SimpleJob.scala +++ b/core/src/main/scala/spark/SimpleJob.scala @@ -21,9 +21,8 @@ extends Job(jobId) with Logging // Maximum time to wait to run a task in a preferred location (in ms) val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "5000").toLong - // CPUs and memory to request per task + // CPUs to request per task val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toDouble - val MEM_PER_TASK = System.getProperty("spark.task.mem", "512").toDouble // Maximum times a task is allowed to fail before failing the job val MAX_TASK_FAILURES = 4 @@ -129,10 +128,8 @@ extends Job(jobId) with Logging } // Respond to an offer of a single slave from the scheduler by finding a task - def slaveOffer(offer: SlaveOffer, availableCpus: Double, availableMem: Double) - : Option[TaskDescription] = { - if (tasksLaunched < numTasks && availableCpus >= CPUS_PER_TASK && - availableMem >= MEM_PER_TASK) { + def slaveOffer(offer: SlaveOffer, availableCpus: Double): Option[TaskDescription] = { + if (tasksLaunched < numTasks && availableCpus >= CPUS_PER_TASK) { val time = System.currentTimeMillis val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT) val host = offer.getHostname @@ -146,7 +143,7 @@ extends Job(jobId) with Logging val prefStr = if(preferred) "preferred" else "non-preferred" val message = "Starting task %d:%d as TID %s on slave %s: %s (%s)".format( - jobId, index, taskId, offer.getSlaveId, host, prefStr) + jobId, index, taskId.getValue, offer.getSlaveId.getValue, host, prefStr) logInfo(message) // Do various bookkeeping tidToIndex(taskId.getValue) = index @@ -161,12 +158,6 @@ extends Job(jobId) with Logging .setScalar(Resource.Scalar.newBuilder() .setValue(CPUS_PER_TASK).build()) .build() - val memRes = Resource.newBuilder() - .setName("mem") - .setType(Resource.Type.SCALAR) - .setScalar(Resource.Scalar.newBuilder() - .setValue(MEM_PER_TASK).build()) - .build() val serializedTask = Utils.serialize(task) logDebug("Serialized size: " + serializedTask.size) val taskName = "task %d:%d".format(jobId, index) @@ -175,7 +166,6 @@ extends Job(jobId) with Logging .setSlaveId(offer.getSlaveId) .setName(taskName) .addResources(cpuRes) - .addResources(memRes) .setData(ByteString.copyFrom(serializedTask)) .build()) } @@ -200,8 +190,8 @@ extends Job(jobId) with Logging } def taskFinished(status: TaskStatus) { - val tid = status.getTaskId - val index = tidToIndex(tid.getValue) + val tid = status.getTaskId.getValue + val index = tidToIndex(tid) if (!finished(index)) { tasksFinished += 1 logInfo("Finished TID %s (progress: %d/%d)".format( @@ -220,8 +210,8 @@ extends Job(jobId) with Logging } def taskLost(status: TaskStatus) { - val tid = status.getTaskId - val index = tidToIndex(tid.getValue) + val tid = status.getTaskId.getValue + val index = tidToIndex(tid) if (!finished(index)) { logInfo("Lost TID %s (task %d:%d)".format(tid, jobId, index)) launched(index) = false diff --git a/run b/run index 73c4c84bd6..f7e5a82a92 100755 --- a/run +++ b/run @@ -22,8 +22,9 @@ if [ "x$MESOS_HOME" != "x" ] ; then fi if [ "x$SPARK_MEM" == "x" ] ; then - SPARK_MEM="300m" + SPARK_MEM="512m" fi +export SPARK_MEM # So that the process sees it and can report it to Mesos # Set JAVA_OPTS to be able to load native libraries and to set heap size JAVA_OPTS="$SPARK_JAVA_OPTS" -- cgit v1.2.3