From a9481c3514b33389de7a7db5e5c0ac8291c11f1c Mon Sep 17 00:00:00 2001 From: root Date: Fri, 13 Aug 2010 07:39:36 +0000 Subject: Update to work with latest Mesos API changes --- run | 7 ++++++- src/scala/spark/MesosScheduler.scala | 18 +++++++++--------- third_party/mesos.jar | Bin 43915 -> 36494 bytes 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/run b/run index 7353311522..7e044eb021 100755 --- a/run +++ b/run @@ -8,7 +8,12 @@ if [ -e $FWDIR/conf/spark-env.sh ] ; then . $FWDIR/conf/spark-env.sh fi -if [ "$SPARK_MEM" == "" ] ; then +if [ "x$MESOS_HOME" != "x" ] ; then + SPARK_CLASSPATH="$MESOS_HOME/lib/java/mesos.jar:$SPARK_CLASSPATH" + SPARK_LIBRARY_PATH="$MESOS_HOME/lib/java:$SPARK_LIBARY_PATH" +fi + +if [ "x$SPARK_MEM" == "x" ] ; then SPARK_MEM="300m" fi diff --git a/src/scala/spark/MesosScheduler.scala b/src/scala/spark/MesosScheduler.scala index 84b9d9af68..74c1d8dfb5 100644 --- a/src/scala/spark/MesosScheduler.scala +++ b/src/scala/spark/MesosScheduler.scala @@ -100,9 +100,9 @@ extends NScheduler with spark.Scheduler } override def resourceOffer( - d: SchedulerDriver, oid: String, offers: SlaveOfferVector) { + d: SchedulerDriver, oid: String, offers: java.util.List[SlaveOffer]) { synchronized { - val tasks = new TaskDescriptionVector + val tasks = new java.util.ArrayList[TaskDescription] if (activeOp != null) { try { for (i <- 0 until offers.size.toInt) { @@ -115,8 +115,8 @@ extends NScheduler with spark.Scheduler case e: Exception => e.printStackTrace } } - val params = new StringMap - params.set("timeout", "1") + val params = new java.util.HashMap[String, String] + params.put("timeout", "1") d.replyToOffer(oid, tasks, params) // TODO: use smaller timeout } } @@ -212,9 +212,9 @@ extends ParallelOperation checkPrefVals = Array(true, false) // Allow non-preferred tasks // TODO: Make desiredCpus and desiredMem configurable val desiredCpus = 1 - val desiredMem = 750L * 1024L * 1024L + val desiredMem = 750 if (offer.getParams.get("cpus").toInt < desiredCpus || - offer.getParams.get("mem").toLong < desiredMem) + offer.getParams.get("mem").toInt < desiredMem) return None for (checkPref <- checkPrefVals; i <- 0 until numTasks) { if (!launched(i) && (!checkPref || @@ -231,9 +231,9 @@ extends ParallelOperation tasksLaunched += 1 if (checkPref) lastPreferredLaunchTime = time - val params = new StringMap - params.set("cpus", "" + desiredCpus) - params.set("mem", "" + desiredMem) + val params = new java.util.HashMap[String, String] + params.put("cpus", "" + desiredCpus) + params.put("mem", "" + desiredMem) val serializedTask = Utils.serialize(tasks(i)) return Some(new TaskDescription(taskId, offer.getSlaveId, "task_" + taskId, params, serializedTask)) diff --git a/third_party/mesos.jar b/third_party/mesos.jar index 1751d85756..e9530161f2 100644 Binary files a/third_party/mesos.jar and b/third_party/mesos.jar differ -- cgit v1.2.3