aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-04-05 22:32:37 -0700
committerAndrew Or <andrew@databricks.com>2016-04-05 22:32:37 -0700
commit48467f4eb02209a884adbcf052670a057a75fcbd (patch)
treee64b8b6f4ef8565540c1e2600c945f34c580fb33
parentadbfdb878dd1029738db3d1955d08b33de1aa8a9 (diff)
downloadspark-48467f4eb02209a884adbcf052670a057a75fcbd.tar.gz
spark-48467f4eb02209a884adbcf052670a057a75fcbd.tar.bz2
spark-48467f4eb02209a884adbcf052670a057a75fcbd.zip
[SPARK-14416][CORE] Add thread-safe comments for CoarseGrainedSchedulerBackend's fields
## What changes were proposed in this pull request? While I was reviewing #12078, I found most of CoarseGrainedSchedulerBackend's mutable fields doesn't have any comments about the thread-safe assumptions and it's hard for people to figure out which part of codes should be protected by the lock. This PR just added comments/annotations for them and also added strict access modifiers for some fields. ## How was this patch tested? Existing unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #12188 from zsxwing/comments.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala37
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala9
2 files changed, 30 insertions, 16 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 70470cc6d2..f71bfd489d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
+import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
@@ -43,24 +44,30 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
extends ExecutorAllocationClient with SchedulerBackend with Logging
{
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
- var totalCoreCount = new AtomicInteger(0)
+ protected val totalCoreCount = new AtomicInteger(0)
// Total number of executors that are currently registered
- var totalRegisteredExecutors = new AtomicInteger(0)
- val conf = scheduler.sc.conf
+ protected val totalRegisteredExecutors = new AtomicInteger(0)
+ protected val conf = scheduler.sc.conf
private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
// Submit tasks only after (registered resources / total expected resources)
// is equal to at least this value, that is double between 0 and 1.
- var minRegisteredRatio =
+ private val _minRegisteredRatio =
math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))
// Submit tasks after maxRegisteredWaitingTime milliseconds
// if minRegisteredRatio has not yet been reached
- val maxRegisteredWaitingTimeMs =
+ private val maxRegisteredWaitingTimeMs =
conf.getTimeAsMs("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s")
- val createTime = System.currentTimeMillis()
+ private val createTime = System.currentTimeMillis()
+ // Accessing `executorDataMap` in `DriverEndpoint.receive/receiveAndReply` doesn't need any
+ // protection. But accessing `executorDataMap` out of `DriverEndpoint.receive/receiveAndReply`
+ // must be protected by `CoarseGrainedSchedulerBackend.this`. Besides, `executorDataMap` should
+ // only be modified in `DriverEndpoint.receive/receiveAndReply` with protection by
+ // `CoarseGrainedSchedulerBackend.this`.
private val executorDataMap = new HashMap[String, ExecutorData]
// Number of executors requested from the cluster manager that have not registered yet
+ @GuardedBy("CoarseGrainedSchedulerBackend.this")
private var numPendingExecutors = 0
private val listenerBus = scheduler.sc.listenerBus
@@ -68,23 +75,26 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Executors we have requested the cluster manager to kill that have not died yet; maps
// the executor ID to whether it was explicitly killed by the driver (and thus shouldn't
// be considered an app-related failure).
+ @GuardedBy("CoarseGrainedSchedulerBackend.this")
private val executorsPendingToRemove = new HashMap[String, Boolean]
// A map to store hostname with its possible task number running on it
+ @GuardedBy("CoarseGrainedSchedulerBackend.this")
protected var hostToLocalTaskCount: Map[String, Int] = Map.empty
// The number of pending tasks which is locality required
+ @GuardedBy("CoarseGrainedSchedulerBackend.this")
protected var localityAwareTasks = 0
- // Executors that have been lost, but for which we don't yet know the real exit reason.
- protected val executorsPendingLossReason = new HashSet[String]
-
// The num of current max ExecutorId used to re-register appMaster
- protected var currentExecutorIdCounter = 0
+ @volatile protected var currentExecutorIdCounter = 0
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {
+ // Executors that have been lost, but for which we don't yet know the real exit reason.
+ protected val executorsPendingLossReason = new HashSet[String]
+
// If this DriverEndpoint is changed to support multiple threads,
// then this may need to be changed so that we don't share the serializer
// instance across threads
@@ -261,7 +271,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
// Remove a disconnected slave from the cluster
- def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
+ private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
// This must be synchronized because variables mutated
@@ -313,7 +323,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
var driverEndpoint: RpcEndpointRef = null
- val taskIdsOnSlave = new HashMap[String, HashSet[String]]
+
+ protected def minRegisteredRatio: Double = _minRegisteredRatio
override def start() {
val properties = new ArrayBuffer[(String, String)]
@@ -417,7 +428,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
/**
* Return the number of executors currently registered with this backend.
*/
- def numExistingExecutors: Int = executorDataMap.size
+ private def numExistingExecutors: Int = executorDataMap.size
/**
* Request an additional number of executors from the cluster manager.
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 5aeaf44732..8720ee57fe 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -39,9 +39,12 @@ private[spark] abstract class YarnSchedulerBackend(
sc: SparkContext)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
- if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
- minRegisteredRatio = 0.8
- }
+ override val minRegisteredRatio =
+ if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+ 0.8
+ } else {
+ super.minRegisteredRatio
+ }
protected var totalExpectedExecutors = 0