diff options
author | Charles Reiss <charles@eecs.berkeley.edu> | 2013-01-29 14:03:05 -0800 |
---|---|---|
committer | Charles Reiss <charles@eecs.berkeley.edu> | 2013-01-30 09:38:57 -0800 |
commit | f7de6978c14a331683e4a341fccd6e4c5e9fa523 (patch) | |
tree | 9079d2f2cd45228030f77ba4f265a9f1160c6d1a | |
parent | 64ba6a8c2c5f46e6de6deb6a6fd576a55cb3b198 (diff) | |
download | spark-f7de6978c14a331683e4a341fccd6e4c5e9fa523.tar.gz spark-f7de6978c14a331683e4a341fccd6e4c5e9fa523.tar.bz2 spark-f7de6978c14a331683e4a341fccd6e4c5e9fa523.zip |
Use Mesos ExecutorIDs to hold SlaveIDs. Then we can safely use
the Mesos ExecutorID as a Spark ExecutorID.
-rw-r--r-- | core/src/main/scala/spark/executor/MesosExecutorBackend.scala | 6 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala | 30 |
2 files changed, 21 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala index 1ef88075ad..b981b26916 100644 --- a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala @@ -32,7 +32,11 @@ private[spark] class MesosExecutorBackend(executor: Executor) logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue) this.driver = driver val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) - executor.initialize(executorInfo.getExecutorId.getValue, slaveInfo.getHostname, properties) + executor.initialize( + slaveInfo.getId.getValue + "-" + executorInfo.getExecutorId.getValue, + slaveInfo.getHostname, + properties + ) } override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) { diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala index f3467db86b..eab1c60e0b 100644 --- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala @@ -51,7 +51,7 @@ private[spark] class MesosSchedulerBackend( val taskIdToSlaveId = new HashMap[Long, String] // An ExecutorInfo for our tasks - var executorInfo: ExecutorInfo = null + var execArgs: Array[Byte] = null override def start() { synchronized { @@ -70,12 +70,11 @@ private[spark] class MesosSchedulerBackend( } }.start() - executorInfo = createExecutorInfo() waitForRegister() } } - def createExecutorInfo(): ExecutorInfo = { + def createExecutorInfo(execId: String): ExecutorInfo = { val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException( "Spark home is not set; set it through the spark.home system " + "property, the SPARK_HOME environment variable or the SparkContext constructor")) @@ -97,7 +96,7 @@ private[spark] class MesosSchedulerBackend( .setEnvironment(environment) .build() ExecutorInfo.newBuilder() - .setExecutorId(ExecutorID.newBuilder().setValue("default").build()) + .setExecutorId(ExecutorID.newBuilder().setValue(execId).build()) .setCommand(command) .setData(ByteString.copyFrom(createExecArg())) .addResources(memory) @@ -109,17 +108,20 @@ private[spark] class MesosSchedulerBackend( * containing all the spark.* system properties in the form of (String, String) pairs. */ private def createExecArg(): Array[Byte] = { - val props = new HashMap[String, String] - val iterator = System.getProperties.entrySet.iterator - while (iterator.hasNext) { - val entry = iterator.next - val (key, value) = (entry.getKey.toString, entry.getValue.toString) - if (key.startsWith("spark.")) { - props(key) = value + if (execArgs == null) { + val props = new HashMap[String, String] + val iterator = System.getProperties.entrySet.iterator + while (iterator.hasNext) { + val entry = iterator.next + val (key, value) = (entry.getKey.toString, entry.getValue.toString) + if (key.startsWith("spark.")) { + props(key) = value + } } + // Serialize the map as an array of (String, String) pairs + execArgs = Utils.serialize(props.toArray) } - // Serialize the map as an array of (String, String) pairs - return Utils.serialize(props.toArray) + return execArgs } override def offerRescinded(d: SchedulerDriver, o: OfferID) {} @@ -216,7 +218,7 @@ private[spark] class MesosSchedulerBackend( return MesosTaskInfo.newBuilder() .setTaskId(taskId) .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build()) - .setExecutor(executorInfo) + .setExecutor(createExecutorInfo(slaveId)) .setName(task.name) .addResources(cpuResource) .setData(ByteString.copyFrom(task.serializedTask)) |