aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/lib/mesos.jarbin124677 -> 126006 bytes
-rw-r--r--core/src/main/scala/spark/Executor.scala13
-rw-r--r--core/src/main/scala/spark/Job.scala3
-rw-r--r--core/src/main/scala/spark/MesosScheduler.scala103
-rw-r--r--core/src/main/scala/spark/SimpleJob.scala26
-rwxr-xr-xrun3
6 files changed, 94 insertions, 54 deletions
diff --git a/core/lib/mesos.jar b/core/lib/mesos.jar
index 171a5ac02c..f1fde967c4 100644
--- a/core/lib/mesos.jar
+++ b/core/lib/mesos.jar
Binary files differ
diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala
index 80c95dcb99..a2af70989c 100644
--- a/core/src/main/scala/spark/Executor.scala
+++ b/core/src/main/scala/spark/Executor.scala
@@ -53,7 +53,8 @@ class Executor extends org.apache.mesos.Executor with Logging {
class TaskRunner(desc: TaskDescription, d: ExecutorDriver)
extends Runnable {
override def run() = {
- logInfo("Running task ID " + desc.getTaskId)
+ val tid = desc.getTaskId.getValue
+ logInfo("Running task ID " + tid)
d.sendStatusUpdate(TaskStatus.newBuilder()
.setTaskId(desc.getTaskId)
.setState(TaskState.TASK_RUNNING)
@@ -65,7 +66,7 @@ class Executor extends org.apache.mesos.Executor with Logging {
val task = Utils.deserialize[Task[Any]](desc.getData.toByteArray, classLoader)
for (gen <- task.generation) // Update generation if any is set
env.mapOutputTracker.updateGeneration(gen)
- val value = task.run(desc.getTaskId.getValue.toInt)
+ val value = task.run(tid.toInt)
val accumUpdates = Accumulators.values
val result = new TaskResult(value, accumUpdates)
d.sendStatusUpdate(TaskStatus.newBuilder()
@@ -73,7 +74,7 @@ class Executor extends org.apache.mesos.Executor with Logging {
.setState(TaskState.TASK_FINISHED)
.setData(ByteString.copyFrom(Utils.serialize(result)))
.build())
- logInfo("Finished task ID " + desc.getTaskId)
+ logInfo("Finished task ID " + tid)
} catch {
case ffe: FetchFailedException => {
val reason = ffe.toTaskEndReason
@@ -85,7 +86,7 @@ class Executor extends org.apache.mesos.Executor with Logging {
}
case t: Throwable => {
// TODO: Handle errors in tasks less dramatically
- logError("Exception in task ID " + desc.getTaskId, t)
+ logError("Exception in task ID " + tid, t)
System.exit(1)
}
}
@@ -144,8 +145,8 @@ class Executor extends org.apache.mesos.Executor with Logging {
logError("Error from Mesos: %s (code %d)".format(message, code))
}
- override def killTask(d: ExecutorDriver, tid: TaskID) {
- logWarning("Mesos asked us to kill task " + tid + "; ignoring (not yet implemented)")
+ override def killTask(d: ExecutorDriver, t: TaskID) {
+ logWarning("Mesos asked us to kill task " + t.getValue + "; ignoring (not yet implemented)")
}
override def shutdown(d: ExecutorDriver) {}
diff --git a/core/src/main/scala/spark/Job.scala b/core/src/main/scala/spark/Job.scala
index 9d75c08398..acff8ce561 100644
--- a/core/src/main/scala/spark/Job.scala
+++ b/core/src/main/scala/spark/Job.scala
@@ -8,8 +8,7 @@ import org.apache.mesos.Protos._
* job by implementing various callbacks.
*/
abstract class Job(jobId: Int) {
- def slaveOffer(s: SlaveOffer, availableCpus: Double, availableMem: Double)
- : Option[TaskDescription]
+ def slaveOffer(s: SlaveOffer, availableCpus: Double): Option[TaskDescription]
def statusUpdate(t: TaskStatus): Unit
diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala
index 7f43fffebf..9ca316d953 100644
--- a/core/src/main/scala/spark/MesosScheduler.scala
+++ b/core/src/main/scala/spark/MesosScheduler.scala
@@ -33,15 +33,25 @@ extends MScheduler with DAGScheduler with Logging
"SPARK_LIBRARY_PATH"
)
+ // Memory used by each executor (in megabytes)
+ val EXECUTOR_MEMORY = {
+ if (System.getenv("SPARK_MEM") != null)
+ memoryStringToMb(System.getenv("SPARK_MEM"))
+ // TODO: Might need to add some extra memory for the non-heap parts of the JVM
+ else
+ 512
+ }
+
// Lock used to wait for scheduler to be registered
private var isRegistered = false
private val registeredLock = new Object()
- private var activeJobs = new HashMap[Int, Job]
- private var activeJobsQueue = new Queue[Job]
+ private val activeJobs = new HashMap[Int, Job]
+ private val activeJobsQueue = new Queue[Job]
- private var taskIdToJobId = new HashMap[String, Int]
- private var jobTasks = new HashMap[Int, HashSet[String]]
+ private val taskIdToJobId = new HashMap[String, Int]
+ private val taskIdToSlaveId = new HashMap[String, String]
+ private val jobTasks = new HashMap[Int, HashSet[String]]
// Incrementing job and task IDs
private var nextJobId = 0
@@ -50,6 +60,9 @@ extends MScheduler with DAGScheduler with Logging
// Driver for talking to Mesos
var driver: SchedulerDriver = null
+ // Which nodes we have executors on
+ private val slavesWithExecutors = new HashSet[String]
+
// JAR server, if any JARs were added by the user to the SparkContext
var jarServer: HttpServer = null
@@ -110,11 +123,18 @@ extends MScheduler with DAGScheduler with Logging
.build())
}
}
+ val memory = Resource.newBuilder()
+ .setName("mem")
+ .setType(Resource.Type.SCALAR)
+ .setScalar(Resource.Scalar.newBuilder()
+ .setValue(EXECUTOR_MEMORY).build())
+ .build()
ExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue("default").build())
.setUri(execScript)
.setData(ByteString.copyFrom(createExecArg()))
.setParams(params.build())
+ .addResources(memory)
.build()
}
@@ -138,6 +158,7 @@ extends MScheduler with DAGScheduler with Logging
activeJobs -= job.getId
activeJobsQueue.dequeueAll(x => (x == job))
taskIdToJobId --= jobTasks(job.getId)
+ taskIdToSlaveId --= jobTasks(job.getId)
jobTasks.remove(job.getId)
}
}
@@ -162,30 +183,32 @@ extends MScheduler with DAGScheduler with Logging
* our active jobs for tasks in FIFO order. We fill each node with tasks in
* a round-robin manner so that tasks are balanced across the cluster.
*/
- override def resourceOffer(
- d: SchedulerDriver, oid: OfferID, offers: JList[SlaveOffer]) {
+ override def resourceOffer(d: SchedulerDriver, oid: OfferID, offers: JList[SlaveOffer]) {
synchronized {
val tasks = new JArrayList[TaskDescription]
val availableCpus = offers.map(o => getResource(o.getResourcesList(), "cpus"))
- val availableMem = offers.map(o => getResource(o.getResourcesList(), "mem"))
+ val enoughMem = offers.map(o => {
+ val mem = getResource(o.getResourcesList(), "mem")
+ val slaveId = o.getSlaveId.getValue
+ mem > EXECUTOR_MEMORY || slavesWithExecutors.contains(slaveId)
+ })
var launchedTask = false
for (job <- activeJobsQueue) {
do {
launchedTask = false
- for (i <- 0 until offers.size.toInt) {
- try {
- job.slaveOffer(offers(i), availableCpus(i), availableMem(i)) match {
- case Some(task) =>
- tasks.add(task)
- taskIdToJobId(task.getTaskId.getValue) = job.getId
- jobTasks(job.getId) += task.getTaskId.getValue
- availableCpus(i) -= getResource(task.getResourcesList(), "cpus")
- availableMem(i) -= getResource(task.getResourcesList(), "mem")
- launchedTask = true
- case None => {}
- }
- } catch {
- case e: Exception => logError("Exception in resourceOffer", e)
+ for (i <- 0 until offers.size if enoughMem(i)) {
+ job.slaveOffer(offers(i), availableCpus(i)) match {
+ case Some(task) =>
+ tasks.add(task)
+ val tid = task.getTaskId.getValue
+ val sid = offers(i).getSlaveId.getValue
+ taskIdToJobId(tid) = job.getId
+ jobTasks(job.getId) += tid
+ taskIdToSlaveId(tid) = sid
+ slavesWithExecutors += sid
+ availableCpus(i) -= getResource(task.getResourcesList(), "cpus")
+ launchedTask = true
+ case None => {}
}
}
} while (launchedTask)
@@ -214,19 +237,24 @@ extends MScheduler with DAGScheduler with Logging
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
synchronized {
try {
- taskIdToJobId.get(status.getTaskId.getValue) match {
+ val tid = status.getTaskId.getValue
+ if (status.getState == TaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
+ // We lost the executor on this slave, so remember that it's gone
+ slavesWithExecutors -= taskIdToSlaveId(tid)
+ }
+ taskIdToJobId.get(tid) match {
case Some(jobId) =>
if (activeJobs.contains(jobId)) {
activeJobs(jobId).statusUpdate(status)
}
if (isFinished(status.getState)) {
- taskIdToJobId.remove(status.getTaskId.getValue)
+ taskIdToJobId.remove(tid)
if (jobTasks.contains(jobId))
- jobTasks(jobId) -= status.getTaskId.getValue
+ jobTasks(jobId) -= tid
+ taskIdToSlaveId.remove(tid)
}
case None =>
- logInfo("Ignoring update from TID " + status.getTaskId.getValue +
- " because its job is gone")
+ logInfo("Ignoring update from TID " + tid + " because its job is gone")
}
} catch {
case e: Exception => logError("Exception in statusUpdate", e)
@@ -320,7 +348,28 @@ extends MScheduler with DAGScheduler with Logging
override def frameworkMessage(d: SchedulerDriver, s: SlaveID, e: ExecutorID, b: Array[Byte]) {}
- override def slaveLost(d: SchedulerDriver, s: SlaveID) {}
+ override def slaveLost(d: SchedulerDriver, s: SlaveID) {
+ slavesWithExecutors.remove(s.getValue)
+ }
override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
+
+ /**
+ * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a
+ * number of megabytes. This is used to figure out how much memory to claim
+ * from Mesos based on the SPARK_MEM environment variable.
+ */
+ def memoryStringToMb(str: String): Int = {
+ val lower = str.toLowerCase
+ if (lower.endsWith("k"))
+ (lower.substring(0, lower.length-1).toLong / 1024).toInt
+ else if (lower.endsWith("m"))
+ lower.substring(0, lower.length-1).toInt
+ else if (lower.endsWith("g"))
+ lower.substring(0, lower.length-1).toInt * 1024
+ else if (lower.endsWith("t"))
+ lower.substring(0, lower.length-1).toInt * 1024 * 1024
+ else // no suffix, so it's just a number in bytes
+ (lower.toLong / 1024 / 1024).toInt
+ }
}
diff --git a/core/src/main/scala/spark/SimpleJob.scala b/core/src/main/scala/spark/SimpleJob.scala
index 811725b055..2001205878 100644
--- a/core/src/main/scala/spark/SimpleJob.scala
+++ b/core/src/main/scala/spark/SimpleJob.scala
@@ -21,9 +21,8 @@ extends Job(jobId) with Logging
// Maximum time to wait to run a task in a preferred location (in ms)
val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "5000").toLong
- // CPUs and memory to request per task
+ // CPUs to request per task
val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toDouble
- val MEM_PER_TASK = System.getProperty("spark.task.mem", "512").toDouble
// Maximum times a task is allowed to fail before failing the job
val MAX_TASK_FAILURES = 4
@@ -129,10 +128,8 @@ extends Job(jobId) with Logging
}
// Respond to an offer of a single slave from the scheduler by finding a task
- def slaveOffer(offer: SlaveOffer, availableCpus: Double, availableMem: Double)
- : Option[TaskDescription] = {
- if (tasksLaunched < numTasks && availableCpus >= CPUS_PER_TASK &&
- availableMem >= MEM_PER_TASK) {
+ def slaveOffer(offer: SlaveOffer, availableCpus: Double): Option[TaskDescription] = {
+ if (tasksLaunched < numTasks && availableCpus >= CPUS_PER_TASK) {
val time = System.currentTimeMillis
val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT)
val host = offer.getHostname
@@ -146,7 +143,7 @@ extends Job(jobId) with Logging
val prefStr = if(preferred) "preferred" else "non-preferred"
val message =
"Starting task %d:%d as TID %s on slave %s: %s (%s)".format(
- jobId, index, taskId, offer.getSlaveId, host, prefStr)
+ jobId, index, taskId.getValue, offer.getSlaveId.getValue, host, prefStr)
logInfo(message)
// Do various bookkeeping
tidToIndex(taskId.getValue) = index
@@ -161,12 +158,6 @@ extends Job(jobId) with Logging
.setScalar(Resource.Scalar.newBuilder()
.setValue(CPUS_PER_TASK).build())
.build()
- val memRes = Resource.newBuilder()
- .setName("mem")
- .setType(Resource.Type.SCALAR)
- .setScalar(Resource.Scalar.newBuilder()
- .setValue(MEM_PER_TASK).build())
- .build()
val serializedTask = Utils.serialize(task)
logDebug("Serialized size: " + serializedTask.size)
val taskName = "task %d:%d".format(jobId, index)
@@ -175,7 +166,6 @@ extends Job(jobId) with Logging
.setSlaveId(offer.getSlaveId)
.setName(taskName)
.addResources(cpuRes)
- .addResources(memRes)
.setData(ByteString.copyFrom(serializedTask))
.build())
}
@@ -200,8 +190,8 @@ extends Job(jobId) with Logging
}
def taskFinished(status: TaskStatus) {
- val tid = status.getTaskId
- val index = tidToIndex(tid.getValue)
+ val tid = status.getTaskId.getValue
+ val index = tidToIndex(tid)
if (!finished(index)) {
tasksFinished += 1
logInfo("Finished TID %s (progress: %d/%d)".format(
@@ -220,8 +210,8 @@ extends Job(jobId) with Logging
}
def taskLost(status: TaskStatus) {
- val tid = status.getTaskId
- val index = tidToIndex(tid.getValue)
+ val tid = status.getTaskId.getValue
+ val index = tidToIndex(tid)
if (!finished(index)) {
logInfo("Lost TID %s (task %d:%d)".format(tid, jobId, index))
launched(index) = false
diff --git a/run b/run
index 73c4c84bd6..f7e5a82a92 100755
--- a/run
+++ b/run
@@ -22,8 +22,9 @@ if [ "x$MESOS_HOME" != "x" ] ; then
fi
if [ "x$SPARK_MEM" == "x" ] ; then
- SPARK_MEM="300m"
+ SPARK_MEM="512m"
fi
+export SPARK_MEM # So that the process sees it and can report it to Mesos
# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$SPARK_JAVA_OPTS"