aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJustin Ma <jtma@eecs.berkeley.edu>2010-09-07 14:03:59 -0700
committerJustin Ma <jtma@eecs.berkeley.edu>2010-09-07 14:03:59 -0700
commit6f0d2c1cbcd04fb72334e77eb3809b472e657985 (patch)
tree66b191d16883359db946c5cefaab3f40a00adc3a /src
parente9ffe6caabc0bb763f0ac758149fb8c0336731bb (diff)
downloadspark-6f0d2c1cbcd04fb72334e77eb3809b472e657985.tar.gz
spark-6f0d2c1cbcd04fb72334e77eb3809b472e657985.tar.bz2
spark-6f0d2c1cbcd04fb72334e77eb3809b472e657985.zip
round robin scheduling of tasks has been added
Diffstat (limited to 'src')
-rw-r--r--src/scala/spark/HdfsFile.scala2
-rw-r--r--src/scala/spark/MesosScheduler.scala34
-rw-r--r--src/scala/spark/SparkContext.scala2
3 files changed, 25 insertions, 13 deletions
diff --git a/src/scala/spark/HdfsFile.scala b/src/scala/spark/HdfsFile.scala
index 1c007c679a..595386fceb 100644
--- a/src/scala/spark/HdfsFile.scala
+++ b/src/scala/spark/HdfsFile.scala
@@ -25,7 +25,7 @@ extends RDD[String](sc) {
ConfigureLock.synchronized { inputFormat.configure(conf) }
@transient val splits_ =
- inputFormat.getSplits(conf, 2).map(new HdfsSplit(_)).toArray
+ inputFormat.getSplits(conf, sc.scheduler.numCores).map(new HdfsSplit(_)).toArray
override def splits = splits_.asInstanceOf[Array[Split]]
diff --git a/src/scala/spark/MesosScheduler.scala b/src/scala/spark/MesosScheduler.scala
index b8533e0bd9..7f82c49348 100644
--- a/src/scala/spark/MesosScheduler.scala
+++ b/src/scala/spark/MesosScheduler.scala
@@ -3,6 +3,7 @@ package spark
import java.io.File
import scala.collection.mutable.Map
+import scala.collection.JavaConversions._
import mesos.{Scheduler => NScheduler}
import mesos._
@@ -105,10 +106,20 @@ extends NScheduler with spark.Scheduler
val tasks = new java.util.ArrayList[TaskDescription]
if (activeOp != null) {
try {
- for (i <- 0 until offers.size.toInt) {
- activeOp.slaveOffer(offers.get(i)) match {
- case Some(task) => tasks.add(task)
- case None => {}
+ val availableCpus = offers.map(_.getParams.get("cpus").toInt)
+ val availableMem = offers.map(_.getParams.get("mem").toInt)
+ var resourcesAvailable = true
+ while (resourcesAvailable) {
+ resourcesAvailable = false
+ for (i <- 0 until offers.size.toInt) {
+ activeOp.slaveOffer(offers.get(i), availableCpus(i), availableMem(i)) match {
+ case Some(task) =>
+ tasks.add(task)
+ availableCpus(i) -= task.getParams.get("cpus").toInt
+ availableMem(i) -= task.getParams.get("mem").toInt
+ resourcesAvailable = resourcesAvailable || true
+ case None => {}
+ }
}
}
} catch {
@@ -162,7 +173,7 @@ extends NScheduler with spark.Scheduler
// Trait representing an object that manages a parallel operation by
// implementing various scheduler callbacks.
trait ParallelOperation {
- def slaveOffer(s: SlaveOffer): Option[TaskDescription]
+ def slaveOffer(s: SlaveOffer, availableCpus: Int, availableMem: Int): Option[TaskDescription]
def statusUpdate(t: TaskStatus): Unit
def error(code: Int, message: String): Unit
}
@@ -207,7 +218,7 @@ extends ParallelOperation
}
}
- def slaveOffer(offer: SlaveOffer): Option[TaskDescription] = {
+ def slaveOffer(offer: SlaveOffer, availableCpus: Int, availableMem: Int): Option[TaskDescription] = {
if (tasksLaunched < numTasks) {
var checkPrefVals: Array[Boolean] = Array(true)
val time = System.currentTimeMillis
@@ -215,9 +226,8 @@ extends ParallelOperation
checkPrefVals = Array(true, false) // Allow non-preferred tasks
// TODO: Make desiredCpus and desiredMem configurable
val desiredCpus = 1
- val desiredMem = 750
- if (offer.getParams.get("cpus").toInt < desiredCpus ||
- offer.getParams.get("mem").toInt < desiredMem)
+ val desiredMem = 500
+ if ((availableCpus < desiredCpus) || (availableMem < desiredMem))
return None
for (checkPref <- checkPrefVals; i <- 0 until numTasks) {
if (!launched(i) && (!checkPref ||
@@ -264,7 +274,7 @@ extends ParallelOperation
def taskFinished(status: TaskStatus) {
val tid = status.getTaskId
- println("Finished TID " + tid)
+ print("Finished TID " + tid)
if (!finished(tidToIndex(tid))) {
// Deserialize task result
val result = Utils.deserialize[TaskResult[T]](status.getData)
@@ -274,10 +284,12 @@ extends ParallelOperation
// Mark finished and stop if we've finished all the tasks
finished(tidToIndex(tid)) = true
tasksFinished += 1
+
+ println(", finished " + tasksFinished + "/" + numTasks)
if (tasksFinished == numTasks)
setAllFinished()
} else {
- printf("Task %s had already finished, so ignoring it\n", tidToIndex(tid))
+ printf("... Task %s had already finished, so ignoring it\n", tidToIndex(tid))
}
}
diff --git a/src/scala/spark/SparkContext.scala b/src/scala/spark/SparkContext.scala
index b1ddd80f87..1188367bdd 100644
--- a/src/scala/spark/SparkContext.scala
+++ b/src/scala/spark/SparkContext.scala
@@ -25,7 +25,7 @@ class SparkContext(master: String, frameworkName: String) {
val LOCAL_REGEX = """local\[([0-9]+)\]""".r
- private var scheduler: Scheduler = master match {
+ private[spark] var scheduler: Scheduler = master match {
case "local" => new LocalScheduler(1)
case LOCAL_REGEX(threads) => new LocalScheduler(threads.toInt)
case _ => { System.loadLibrary("mesos");