aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2011-10-30 14:10:56 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2011-10-30 14:10:56 -0700
commit157279e9eb4d732c94cf4c5a7cfcd840b0da300c (patch)
tree4a89d8cc4b3d98904852af16a887e6d1b75da1ef
parent3a0e6c436317457c8483c7106d5ba6551b6367c4 (diff)
downloadspark-157279e9eb4d732c94cf4c5a7cfcd840b0da300c.tar.gz
spark-157279e9eb4d732c94cf4c5a7cfcd840b0da300c.tar.bz2
spark-157279e9eb4d732c94cf4c5a7cfcd840b0da300c.zip
Update Spark to work with the latest Mesos API
-rw-r--r--core/lib/mesos.jarbin126006 -> 147412 bytes
-rw-r--r--core/src/main/scala/spark/Job.scala2
-rw-r--r--core/src/main/scala/spark/MesosScheduler.scala21
-rw-r--r--core/src/main/scala/spark/SimpleJob.scala2
4 files changed, 12 insertions, 13 deletions
diff --git a/core/lib/mesos.jar b/core/lib/mesos.jar
index b347426692..941966c46a 100644
--- a/core/lib/mesos.jar
+++ b/core/lib/mesos.jar
Binary files differ
diff --git a/core/src/main/scala/spark/Job.scala b/core/src/main/scala/spark/Job.scala
index acff8ce561..2200fb0c5d 100644
--- a/core/src/main/scala/spark/Job.scala
+++ b/core/src/main/scala/spark/Job.scala
@@ -8,7 +8,7 @@ import org.apache.mesos.Protos._
* job by implementing various callbacks.
*/
abstract class Job(jobId: Int) {
- def slaveOffer(s: SlaveOffer, availableCpus: Double): Option[TaskDescription]
+ def slaveOffer(s: Offer, availableCpus: Double): Option[TaskDescription]
def statusUpdate(t: TaskStatus): Unit
diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala
index 35cf6ae539..14c17123a3 100644
--- a/core/src/main/scala/spark/MesosScheduler.scala
+++ b/core/src/main/scala/spark/MesosScheduler.scala
@@ -91,9 +91,9 @@ extends MScheduler with DAGScheduler with Logging
setDaemon(true)
override def run {
val sched = MesosScheduler.this
- sched.driver = new MesosSchedulerDriver(sched, master)
+ driver = new MesosSchedulerDriver(sched, frameworkName, getExecutorInfo, master)
try {
- val ret = sched.driver.run()
+ val ret = driver.run()
logInfo("driver.run() returned with code " + ret)
} catch {
case e: Exception =>
@@ -103,9 +103,7 @@ extends MScheduler with DAGScheduler with Logging
}.start
}
- override def getFrameworkName(d: SchedulerDriver): String = frameworkName
-
- override def getExecutorInfo(d: SchedulerDriver): ExecutorInfo = {
+ def getExecutorInfo(): ExecutorInfo = {
val sparkHome = sc.getSparkHome match {
case Some(path) => path
case None =>
@@ -183,9 +181,9 @@ extends MScheduler with DAGScheduler with Logging
* our active jobs for tasks in FIFO order. We fill each node with tasks in
* a round-robin manner so that tasks are balanced across the cluster.
*/
- override def resourceOffer(d: SchedulerDriver, oid: OfferID, offers: JList[SlaveOffer]) {
+ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
synchronized {
- val tasks = new JArrayList[TaskDescription]
+ val tasks = offers.map(o => new JArrayList[TaskDescription])
val availableCpus = offers.map(o => getResource(o.getResourcesList(), "cpus"))
val enoughMem = offers.map(o => {
val mem = getResource(o.getResourcesList(), "mem")
@@ -199,7 +197,7 @@ extends MScheduler with DAGScheduler with Logging
for (i <- 0 until offers.size if enoughMem(i)) {
job.slaveOffer(offers(i), availableCpus(i)) match {
case Some(task) =>
- tasks.add(task)
+ tasks(i).add(task)
val tid = task.getTaskId.getValue
val sid = offers(i).getSlaveId.getValue
taskIdToJobId(tid) = job.getId
@@ -213,9 +211,10 @@ extends MScheduler with DAGScheduler with Logging
}
} while (launchedTask)
}
- val params = new JHashMap[String, String]
- params.put("timeout", "1")
- d.replyToOffer(oid, tasks, params) // TODO: use smaller timeout?
+ val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
+ for (i <- 0 until offers.size if tasks(i).size > 0) {
+ d.launchTasks(offers(i).getId(), tasks(i), filters)
+ }
}
}
diff --git a/core/src/main/scala/spark/SimpleJob.scala b/core/src/main/scala/spark/SimpleJob.scala
index 9eee747cfd..d982a75ba0 100644
--- a/core/src/main/scala/spark/SimpleJob.scala
+++ b/core/src/main/scala/spark/SimpleJob.scala
@@ -128,7 +128,7 @@ extends Job(jobId) with Logging
}
// Respond to an offer of a single slave from the scheduler by finding a task
- def slaveOffer(offer: SlaveOffer, availableCpus: Double): Option[TaskDescription] = {
+ def slaveOffer(offer: Offer, availableCpus: Double): Option[TaskDescription] = {
if (tasksLaunched < numTasks && availableCpus >= CPUS_PER_TASK) {
val time = System.currentTimeMillis
val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT)