aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDale <tigerquoll@outlook.com>2014-09-27 22:08:10 -0700
committerReynold Xin <rxin@apache.org>2014-09-27 22:08:10 -0700
commit9966d1a8aaed3d8cfed93855959705ea3c677215 (patch)
tree59849a51ef39cc0cfb7be65065aa81fbcc9f90c6
parent248232936e1bead7f102e59eb8faf3126c582d9d (diff)
downloadspark-9966d1a8aaed3d8cfed93855959705ea3c677215.tar.gz
spark-9966d1a8aaed3d8cfed93855959705ea3c677215.tar.bz2
spark-9966d1a8aaed3d8cfed93855959705ea3c677215.zip
SPARK-CORE [SPARK-3651] Group common CoarseGrainedSchedulerBackend variables together
from [SPARK-3651] In CoarseGrainedSchedulerBackend, we have: private val executorActor = new HashMap[String, ActorRef] private val executorAddress = new HashMap[String, Address] private val executorHost = new HashMap[String, String] private val freeCores = new HashMap[String, Int] private val totalCores = new HashMap[String, Int] We only ever put / remove stuff from these maps together. It would simplify the code if we consolidate these all into one map as we have done in JobProgressListener in https://issues.apache.org/jira/browse/SPARK-2299. Author: Dale <tigerquoll@outlook.com> Closes #2533 from tigerquoll/SPARK-3651 and squashes the following commits: d1be0a9 [Dale] [SPARK-3651] implemented suggested changes. Changed a reference from executorInfo to executorData to be consistent with other usages 6890663 [Dale] [SPARK-3651] implemented suggested changes 7d671cf [Dale] [SPARK-3651] Grouped variables under a ExecutorDataObject, and reference them via a map entry as they are all retrieved under the same key
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala68
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala38
2 files changed, 68 insertions, 38 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 9a0cb1c6c6..59e15edc75 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -62,15 +62,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
val createTime = System.currentTimeMillis()
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor with ActorLogReceive {
-
override protected def log = CoarseGrainedSchedulerBackend.this.log
-
- private val executorActor = new HashMap[String, ActorRef]
- private val executorAddress = new HashMap[String, Address]
- private val executorHost = new HashMap[String, String]
- private val freeCores = new HashMap[String, Int]
- private val totalCores = new HashMap[String, Int]
private val addressToExecutorId = new HashMap[Address, String]
+ private val executorDataMap = new HashMap[String, ExecutorData]
override def preStart() {
// Listen for remote client disconnection events, since they don't go through Akka's watch()
@@ -85,16 +79,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
def receiveWithLogging = {
case RegisterExecutor(executorId, hostPort, cores) =>
Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
- if (executorActor.contains(executorId)) {
+ if (executorDataMap.contains(executorId)) {
sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
} else {
logInfo("Registered executor: " + sender + " with ID " + executorId)
sender ! RegisteredExecutor
- executorActor(executorId) = sender
- executorHost(executorId) = Utils.parseHostPort(hostPort)._1
- totalCores(executorId) = cores
- freeCores(executorId) = cores
- executorAddress(executorId) = sender.path.address
+ executorDataMap.put(executorId, new ExecutorData(sender, sender.path.address,
+ Utils.parseHostPort(hostPort)._1, cores, cores))
+
addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
@@ -104,13 +96,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
case StatusUpdate(executorId, taskId, state, data) =>
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
- if (executorActor.contains(executorId)) {
- freeCores(executorId) += scheduler.CPUS_PER_TASK
- makeOffers(executorId)
- } else {
- // Ignoring the update since we don't know about the executor.
- val msg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s"
- logWarning(msg.format(taskId, state, sender, executorId))
+ executorDataMap.get(executorId) match {
+ case Some(executorInfo) =>
+ executorInfo.freeCores += scheduler.CPUS_PER_TASK
+ makeOffers(executorId)
+ case None =>
+ // Ignoring the update since we don't know about the executor.
+ logWarning(s"Ignored task status update ($taskId state $state) " +
+ "from unknown executor $sender with ID $executorId")
}
}
@@ -118,7 +111,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
makeOffers()
case KillTask(taskId, executorId, interruptThread) =>
- executorActor(executorId) ! KillTask(taskId, executorId, interruptThread)
+ executorDataMap(executorId).executorActor ! KillTask(taskId, executorId, interruptThread)
case StopDriver =>
sender ! true
@@ -126,8 +119,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
case StopExecutors =>
logInfo("Asking each executor to shut down")
- for (executor <- executorActor.values) {
- executor ! StopExecutor
+ for ((_, executorData) <- executorDataMap) {
+ executorData.executorActor ! StopExecutor
}
sender ! true
@@ -138,6 +131,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
case AddWebUIFilter(filterName, filterParams, proxyBase) =>
addWebUIFilter(filterName, filterParams, proxyBase)
sender ! true
+
case DisassociatedEvent(_, address, _) =>
addressToExecutorId.get(address).foreach(removeExecutor(_,
"remote Akka client disassociated"))
@@ -149,13 +143,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
// Make fake resource offers on all executors
def makeOffers() {
launchTasks(scheduler.resourceOffers(
- executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
+ executorDataMap.map {case (id, executorData) =>
+ new WorkerOffer(id, executorData.executorHost, executorData.freeCores)}.toSeq))
}
// Make fake resource offers on just one executor
def makeOffers(executorId: String) {
+ val executorData = executorDataMap(executorId)
launchTasks(scheduler.resourceOffers(
- Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
+ Seq(new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))))
}
// Launch tasks returned by a set of resource offers
@@ -179,25 +175,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
}
}
else {
- freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
- executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))
+ val executorData = executorDataMap(task.executorId)
+ executorData.freeCores -= scheduler.CPUS_PER_TASK
+ executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask))
}
}
}
// Remove a disconnected slave from the cluster
def removeExecutor(executorId: String, reason: String) {
- if (executorActor.contains(executorId)) {
- logInfo("Executor " + executorId + " disconnected, so removing it")
- val numCores = totalCores(executorId)
- executorActor -= executorId
- executorHost -= executorId
- addressToExecutorId -= executorAddress(executorId)
- executorAddress -= executorId
- totalCores -= executorId
- freeCores -= executorId
- totalCoreCount.addAndGet(-numCores)
- scheduler.executorLost(executorId, SlaveLost(reason))
+ executorDataMap.get(executorId) match {
+ case Some(executorInfo) =>
+ executorDataMap -= executorId
+ totalCoreCount.addAndGet(-executorInfo.totalCores)
+ scheduler.executorLost(executorId, SlaveLost(reason))
+ case None => logError(s"Asked to remove non existant executor $executorId")
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
new file mode 100644
index 0000000000..74a92985b6
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import akka.actor.{Address, ActorRef}
+
+/**
+ * Grouping of data that is accessed by a CourseGrainedScheduler. This class
+ * is stored in a Map keyed by an executorID
+ *
+ * @param executorActor The actorRef representing this executor
+ * @param executorAddress The network address of this executor
+ * @param executorHost The hostname that this executor is running on
+ * @param freeCores The current number of cores available for work on the executor
+ * @param totalCores The total number of cores available to the executor
+ */
+private[cluster] class ExecutorData(
+ val executorActor: ActorRef,
+ val executorAddress: Address,
+ val executorHost: String ,
+ var freeCores: Int,
+ val totalCores: Int
+)