aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-03-29 22:12:35 -0400
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-03-29 22:12:35 -0400
commitdfa3b6b54492c944e40de8149917e5194b99a82b (patch)
treee3bc04857c9fddf19ea5669a2e9a2485137f39d2
parent4d52cc67388876bcbddd70a2875b59695a5bd437 (diff)
downloadspark-dfa3b6b54492c944e40de8149917e5194b99a82b.tar.gz
spark-dfa3b6b54492c944e40de8149917e5194b99a82b.tar.bz2
spark-dfa3b6b54492c944e40de8149917e5194b99a82b.zip
Fixes to work with the very latest Mesos 0.9 API
-rw-r--r--core/lib/mesos-0.9.0.jarbin186497 -> 193496 bytes
-rw-r--r--core/src/main/scala/spark/Executor.scala26
-rw-r--r--core/src/main/scala/spark/Job.scala4
-rw-r--r--core/src/main/scala/spark/MesosScheduler.scala28
-rw-r--r--core/src/main/scala/spark/SimpleJob.scala9
5 files changed, 41 insertions, 26 deletions
diff --git a/core/lib/mesos-0.9.0.jar b/core/lib/mesos-0.9.0.jar
index d075085191..9f9d375bf8 100644
--- a/core/lib/mesos-0.9.0.jar
+++ b/core/lib/mesos-0.9.0.jar
Binary files differ
diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala
index 1583c083a7..de45137a4f 100644
--- a/core/src/main/scala/spark/Executor.scala
+++ b/core/src/main/scala/spark/Executor.scala
@@ -27,9 +27,7 @@ class Executor extends org.apache.mesos.Executor with Logging {
override def registered(
driver: ExecutorDriver,
executorInfo: ExecutorInfo,
- frameworkId: FrameworkID,
frameworkInfo: FrameworkInfo,
- slaveId: SlaveID,
slaveInfo: SlaveInfo) {
// Read spark.* system properties from executor arg
val props = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
@@ -54,25 +52,29 @@ class Executor extends org.apache.mesos.Executor with Logging {
threadPool = new ThreadPoolExecutor(
1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
}
+
+ override def disconnected(d: ExecutorDriver) {}
+
+ override def reregistered(d: ExecutorDriver, s: SlaveInfo) {}
- override def launchTask(d: ExecutorDriver, task: TaskDescription) {
+ override def launchTask(d: ExecutorDriver, task: TaskInfo) {
threadPool.execute(new TaskRunner(task, d))
}
- class TaskRunner(desc: TaskDescription, d: ExecutorDriver)
+ class TaskRunner(info: TaskInfo, d: ExecutorDriver)
extends Runnable {
override def run() = {
- val tid = desc.getTaskId.getValue
+ val tid = info.getTaskId.getValue
logInfo("Running task ID " + tid)
d.sendStatusUpdate(TaskStatus.newBuilder()
- .setTaskId(desc.getTaskId)
+ .setTaskId(info.getTaskId)
.setState(TaskState.TASK_RUNNING)
.build())
try {
SparkEnv.set(env)
Thread.currentThread.setContextClassLoader(classLoader)
Accumulators.clear
- val task = Utils.deserialize[Task[Any]](desc.getData.toByteArray, classLoader)
+ val task = Utils.deserialize[Task[Any]](info.getData.toByteArray, classLoader)
for (gen <- task.generation) {// Update generation if any is set
env.mapOutputTracker.updateGeneration(gen)
}
@@ -80,7 +82,7 @@ class Executor extends org.apache.mesos.Executor with Logging {
val accumUpdates = Accumulators.values
val result = new TaskResult(value, accumUpdates)
d.sendStatusUpdate(TaskStatus.newBuilder()
- .setTaskId(desc.getTaskId)
+ .setTaskId(info.getTaskId)
.setState(TaskState.TASK_FINISHED)
.setData(ByteString.copyFrom(Utils.serialize(result)))
.build())
@@ -89,7 +91,7 @@ class Executor extends org.apache.mesos.Executor with Logging {
case ffe: FetchFailedException => {
val reason = ffe.toTaskEndReason
d.sendStatusUpdate(TaskStatus.newBuilder()
- .setTaskId(desc.getTaskId)
+ .setTaskId(info.getTaskId)
.setState(TaskState.TASK_FAILED)
.setData(ByteString.copyFrom(Utils.serialize(reason)))
.build())
@@ -97,7 +99,7 @@ class Executor extends org.apache.mesos.Executor with Logging {
case t: Throwable => {
val reason = ExceptionFailure(t)
d.sendStatusUpdate(TaskStatus.newBuilder()
- .setTaskId(desc.getTaskId)
+ .setTaskId(info.getTaskId)
.setState(TaskState.TASK_FAILED)
.setData(ByteString.copyFrom(Utils.serialize(reason)))
.build())
@@ -160,8 +162,8 @@ class Executor extends org.apache.mesos.Executor with Logging {
Utils.copyStream(in, out, true)
}
- override def error(d: ExecutorDriver, code: Int, message: String) {
- logError("Error from Mesos: %s (code %d)".format(message, code))
+ override def error(d: ExecutorDriver, message: String) {
+ logError("Error from Mesos: " + message)
}
override def killTask(d: ExecutorDriver, t: TaskID) {
diff --git a/core/src/main/scala/spark/Job.scala b/core/src/main/scala/spark/Job.scala
index 0d68470c03..b7b0361c62 100644
--- a/core/src/main/scala/spark/Job.scala
+++ b/core/src/main/scala/spark/Job.scala
@@ -8,9 +8,9 @@ import org.apache.mesos.Protos._
* callbacks.
*/
abstract class Job(val runId: Int, val jobId: Int) {
- def slaveOffer(s: Offer, availableCpus: Double): Option[TaskDescription]
+ def slaveOffer(s: Offer, availableCpus: Double): Option[TaskInfo]
def statusUpdate(t: TaskStatus): Unit
- def error(code: Int, message: String): Unit
+ def error(message: String): Unit
}
diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala
index 71b0c29162..6543e1112a 100644
--- a/core/src/main/scala/spark/MesosScheduler.scala
+++ b/core/src/main/scala/spark/MesosScheduler.scala
@@ -76,6 +76,9 @@ private class MesosScheduler(
// URIs of JARs to pass to executor
var jarUris: String = ""
+ // Create an ExecutorInfo for our tasks
+ val executorInfo = createExecutorInfo()
+
// Sorts jobs in reverse order of run ID for use in our priority queue (so lower IDs run first)
private val jobOrdering = new Ordering[Job] {
override def compare(j1: Job, j2: Job): Int = {
@@ -105,7 +108,8 @@ private class MesosScheduler(
setDaemon(true)
override def run {
val sched = MesosScheduler.this
- driver = new MesosSchedulerDriver(sched, frameworkName, getExecutorInfo, master)
+ val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(frameworkName).build()
+ driver = new MesosSchedulerDriver(sched, fwInfo, master)
try {
val ret = driver.run()
logInfo("driver.run() returned with code " + ret)
@@ -116,7 +120,7 @@ private class MesosScheduler(
}.start
}
- def getExecutorInfo(): ExecutorInfo = {
+ def createExecutorInfo(): ExecutorInfo = {
val sparkHome = sc.getSparkHome match {
case Some(path) => path
case None =>
@@ -174,7 +178,7 @@ private class MesosScheduler(
}
}
- override def registered(d: SchedulerDriver, frameworkId: FrameworkID) {
+ override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
logInfo("Registered as framework ID " + frameworkId.getValue)
registeredLock.synchronized {
isRegistered = true
@@ -190,6 +194,10 @@ private class MesosScheduler(
}
}
+ override def disconnected(d: SchedulerDriver) {}
+
+ override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
+
/**
* Method called by Mesos to offer resources on slaves. We resond by asking our active jobs for
* tasks in FIFO order. We fill each node with tasks in a round-robin manner so that tasks are
@@ -197,7 +205,7 @@ private class MesosScheduler(
*/
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
synchronized {
- val tasks = offers.map(o => new JArrayList[TaskDescription])
+ val tasks = offers.map(o => new JArrayList[TaskInfo])
val availableCpus = offers.map(o => getResource(o.getResourcesList(), "cpus"))
val enoughMem = offers.map(o => {
val mem = getResource(o.getResourcesList(), "mem")
@@ -280,14 +288,14 @@ private class MesosScheduler(
}
}
- override def error(d: SchedulerDriver, code: Int, message: String) {
- logError("Mesos error: %s (error code: %d)".format(message, code))
+ override def error(d: SchedulerDriver, message: String) {
+ logError("Mesos error: " + message)
synchronized {
if (activeJobs.size > 0) {
// Have each job throw a SparkException with the error
for ((jobId, activeJob) <- activeJobs) {
try {
- activeJob.error(code, message)
+ activeJob.error(message)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
@@ -366,14 +374,18 @@ private class MesosScheduler(
override def frameworkMessage(
d: SchedulerDriver,
- s: SlaveID,
e: ExecutorID,
+ s: SlaveID,
b: Array[Byte]) {}
override def slaveLost(d: SchedulerDriver, s: SlaveID) {
slavesWithExecutors.remove(s.getValue)
}
+ override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) {
+ slavesWithExecutors.remove(s.getValue)
+ }
+
override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
/**
diff --git a/core/src/main/scala/spark/SimpleJob.scala b/core/src/main/scala/spark/SimpleJob.scala
index 2050558285..796498cfe4 100644
--- a/core/src/main/scala/spark/SimpleJob.scala
+++ b/core/src/main/scala/spark/SimpleJob.scala
@@ -141,7 +141,7 @@ class SimpleJob(
}
// Respond to an offer of a single slave from the scheduler by finding a task
- def slaveOffer(offer: Offer, availableCpus: Double): Option[TaskDescription] = {
+ def slaveOffer(offer: Offer, availableCpus: Double): Option[TaskInfo] = {
if (tasksLaunched < numTasks && availableCpus >= CPUS_PER_TASK) {
val time = System.currentTimeMillis
val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT)
@@ -173,9 +173,10 @@ class SimpleJob(
val serializedTask = Utils.serialize(task)
logDebug("Serialized size: " + serializedTask.size)
val taskName = "task %d:%d".format(jobId, index)
- return Some(TaskDescription.newBuilder()
+ return Some(TaskInfo.newBuilder()
.setTaskId(taskId)
.setSlaveId(offer.getSlaveId)
+ .setExecutor(sched.executorInfo)
.setName(taskName)
.addResources(cpuRes)
.setData(ByteString.copyFrom(serializedTask))
@@ -290,9 +291,9 @@ class SimpleJob(
}
}
- def error(code: Int, message: String) {
+ def error(message: String) {
// Save the error message
- abort("Mesos error: %s (error code: %d)".format(message, code))
+ abort("Mesos error: " + message)
}
def abort(message: String) {