aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCharles Reiss <charles@eecs.berkeley.edu>2013-01-29 14:03:05 -0800
committerCharles Reiss <charles@eecs.berkeley.edu>2013-01-30 09:38:57 -0800
commitf7de6978c14a331683e4a341fccd6e4c5e9fa523 (patch)
tree9079d2f2cd45228030f77ba4f265a9f1160c6d1a /core
parent64ba6a8c2c5f46e6de6deb6a6fd576a55cb3b198 (diff)
downloadspark-f7de6978c14a331683e4a341fccd6e4c5e9fa523.tar.gz
spark-f7de6978c14a331683e4a341fccd6e4c5e9fa523.tar.bz2
spark-f7de6978c14a331683e4a341fccd6e4c5e9fa523.zip
Use Mesos ExecutorIDs to hold SlaveIDs. Then we can safely use
the Mesos ExecutorID as a Spark ExecutorID.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/executor/MesosExecutorBackend.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala30
2 files changed, 21 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
index 1ef88075ad..b981b26916 100644
--- a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
@@ -32,7 +32,11 @@ private[spark] class MesosExecutorBackend(executor: Executor)
logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
this.driver = driver
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
- executor.initialize(executorInfo.getExecutorId.getValue, slaveInfo.getHostname, properties)
+ executor.initialize(
+ slaveInfo.getId.getValue + "-" + executorInfo.getExecutorId.getValue,
+ slaveInfo.getHostname,
+ properties
+ )
}
override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index f3467db86b..eab1c60e0b 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -51,7 +51,7 @@ private[spark] class MesosSchedulerBackend(
val taskIdToSlaveId = new HashMap[Long, String]
// An ExecutorInfo for our tasks
- var executorInfo: ExecutorInfo = null
+ var execArgs: Array[Byte] = null
override def start() {
synchronized {
@@ -70,12 +70,11 @@ private[spark] class MesosSchedulerBackend(
}
}.start()
- executorInfo = createExecutorInfo()
waitForRegister()
}
}
- def createExecutorInfo(): ExecutorInfo = {
+ def createExecutorInfo(execId: String): ExecutorInfo = {
val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
"Spark home is not set; set it through the spark.home system " +
"property, the SPARK_HOME environment variable or the SparkContext constructor"))
@@ -97,7 +96,7 @@ private[spark] class MesosSchedulerBackend(
.setEnvironment(environment)
.build()
ExecutorInfo.newBuilder()
- .setExecutorId(ExecutorID.newBuilder().setValue("default").build())
+ .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
.setCommand(command)
.setData(ByteString.copyFrom(createExecArg()))
.addResources(memory)
@@ -109,17 +108,20 @@ private[spark] class MesosSchedulerBackend(
* containing all the spark.* system properties in the form of (String, String) pairs.
*/
private def createExecArg(): Array[Byte] = {
- val props = new HashMap[String, String]
- val iterator = System.getProperties.entrySet.iterator
- while (iterator.hasNext) {
- val entry = iterator.next
- val (key, value) = (entry.getKey.toString, entry.getValue.toString)
- if (key.startsWith("spark.")) {
- props(key) = value
+ if (execArgs == null) {
+ val props = new HashMap[String, String]
+ val iterator = System.getProperties.entrySet.iterator
+ while (iterator.hasNext) {
+ val entry = iterator.next
+ val (key, value) = (entry.getKey.toString, entry.getValue.toString)
+ if (key.startsWith("spark.")) {
+ props(key) = value
+ }
}
+ // Serialize the map as an array of (String, String) pairs
+ execArgs = Utils.serialize(props.toArray)
}
- // Serialize the map as an array of (String, String) pairs
- return Utils.serialize(props.toArray)
+ return execArgs
}
override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
@@ -216,7 +218,7 @@ private[spark] class MesosSchedulerBackend(
return MesosTaskInfo.newBuilder()
.setTaskId(taskId)
.setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
- .setExecutor(executorInfo)
+ .setExecutor(createExecutorInfo(slaveId))
.setName(task.name)
.addResources(cpuResource)
.setData(ByteString.copyFrom(task.serializedTask))