aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala70
1 files changed, 45 insertions, 25 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index b7919efc4b..8896391f97 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
+import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
@@ -43,24 +44,30 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
extends ExecutorAllocationClient with SchedulerBackend with Logging
{
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
- var totalCoreCount = new AtomicInteger(0)
+ protected val totalCoreCount = new AtomicInteger(0)
// Total number of executors that are currently registered
- var totalRegisteredExecutors = new AtomicInteger(0)
- val conf = scheduler.sc.conf
+ protected val totalRegisteredExecutors = new AtomicInteger(0)
+ protected val conf = scheduler.sc.conf
private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
// Submit tasks only after (registered resources / total expected resources)
// is equal to at least this value, that is double between 0 and 1.
- var minRegisteredRatio =
+ private val _minRegisteredRatio =
math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))
// Submit tasks after maxRegisteredWaitingTime milliseconds
// if minRegisteredRatio has not yet been reached
- val maxRegisteredWaitingTimeMs =
+ private val maxRegisteredWaitingTimeMs =
conf.getTimeAsMs("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s")
- val createTime = System.currentTimeMillis()
+ private val createTime = System.currentTimeMillis()
+ // Accessing `executorDataMap` in `DriverEndpoint.receive/receiveAndReply` doesn't need any
+ // protection. But accessing `executorDataMap` out of `DriverEndpoint.receive/receiveAndReply`
+ // must be protected by `CoarseGrainedSchedulerBackend.this`. Besides, `executorDataMap` should
+ // only be modified in `DriverEndpoint.receive/receiveAndReply` with protection by
+ // `CoarseGrainedSchedulerBackend.this`.
private val executorDataMap = new HashMap[String, ExecutorData]
// Number of executors requested from the cluster manager that have not registered yet
+ @GuardedBy("CoarseGrainedSchedulerBackend.this")
private var numPendingExecutors = 0
private val listenerBus = scheduler.sc.listenerBus
@@ -68,20 +75,26 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Executors we have requested the cluster manager to kill that have not died yet; maps
// the executor ID to whether it was explicitly killed by the driver (and thus shouldn't
// be considered an app-related failure).
+ @GuardedBy("CoarseGrainedSchedulerBackend.this")
private val executorsPendingToRemove = new HashMap[String, Boolean]
// A map to store hostname with its possible task number running on it
+ @GuardedBy("CoarseGrainedSchedulerBackend.this")
protected var hostToLocalTaskCount: Map[String, Int] = Map.empty
// The number of pending tasks which is locality required
+ @GuardedBy("CoarseGrainedSchedulerBackend.this")
protected var localityAwareTasks = 0
- // Executors that have been lost, but for which we don't yet know the real exit reason.
- protected val executorsPendingLossReason = new HashSet[String]
+ // The num of current max ExecutorId used to re-register appMaster
+ @volatile protected var currentExecutorIdCounter = 0
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {
+ // Executors that have been lost, but for which we don't yet know the real exit reason.
+ protected val executorsPendingLossReason = new HashSet[String]
+
// If this DriverEndpoint is changed to support multiple threads,
// then this may need to be changed so that we don't share the serializer
// instance across threads
@@ -137,7 +150,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
case RegisterExecutor(executorId, executorRef, cores, logUrls) =>
if (executorDataMap.contains(executorId)) {
- context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
+ executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
+ context.reply(true)
} else {
// If the executor's rpc env is not listening for incoming connections, `hostPort`
// will be null, and the client connection should be used to contact the executor.
@@ -156,13 +170,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
+ if (currentExecutorIdCounter < executorId.toInt) {
+ currentExecutorIdCounter = executorId.toInt
+ }
if (numPendingExecutors > 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
+ executorRef.send(RegisteredExecutor(executorAddress.host))
// Note: some tests expect the reply to come after we put the executor in the map
- context.reply(RegisteredExecutor(executorAddress.host))
+ context.reply(true)
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
makeOffers()
@@ -255,7 +273,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
// Remove a disconnected slave from the cluster
- def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
+ private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
// This must be synchronized because variables mutated
@@ -307,7 +325,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
var driverEndpoint: RpcEndpointRef = null
- val taskIdsOnSlave = new HashMap[String, HashSet[String]]
+
+ protected def minRegisteredRatio: Double = _minRegisteredRatio
override def start() {
val properties = new ArrayBuffer[(String, String)]
@@ -356,20 +375,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
/**
* Reset the state of CoarseGrainedSchedulerBackend to the initial state. Currently it will only
- * be called in the yarn-client mode when AM re-registers after a failure, also dynamic
- * allocation is enabled.
+ * be called in the yarn-client mode when AM re-registers after a failure.
* */
protected def reset(): Unit = synchronized {
- if (Utils.isDynamicAllocationEnabled(conf)) {
- numPendingExecutors = 0
- executorsPendingToRemove.clear()
-
- // Remove all the lingering executors that should be removed but not yet. The reason might be
- // because (1) disconnected event is not yet received; (2) executors die silently.
- executorDataMap.toMap.foreach { case (eid, _) =>
- driverEndpoint.askWithRetry[Boolean](
- RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered.")))
- }
+ numPendingExecutors = 0
+ executorsPendingToRemove.clear()
+
+ // Remove all the lingering executors that should be removed but not yet. The reason might be
+ // because (1) disconnected event is not yet received; (2) executors die silently.
+ executorDataMap.toMap.foreach { case (eid, _) =>
+ driverEndpoint.askWithRetry[Boolean](
+ RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered.")))
}
}
@@ -414,7 +430,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
/**
* Return the number of executors currently registered with this backend.
*/
- def numExistingExecutors: Int = executorDataMap.size
+ private def numExistingExecutors: Int = executorDataMap.size
+
+ override def getExecutorIds(): Seq[String] = {
+ executorDataMap.keySet.toSeq
+ }
/**
* Request an additional number of executors from the cluster manager.