aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala7
1 files changed, 7 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index f7790fccc6..c3159188d9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -90,6 +90,8 @@ private[spark] class TaskSchedulerImpl(
// Number of tasks running on each executor
private val executorIdToTaskCount = new HashMap[String, Int]
+ def runningTasksByExecutors(): Map[String, Int] = executorIdToTaskCount.toMap
+
// The set of executors we have on each host; this is used to compute hostsAlive, which
// in turn is used to decide when we can attain data locality on a given host
protected val executorsByHost = new HashMap[String, HashSet[String]]
@@ -569,6 +571,11 @@ private[spark] class TaskSchedulerImpl(
return
}
while (!backend.isReady) {
+ // Might take a while for backend to be ready if it is waiting on resources.
+ if (sc.stopped.get) {
+ // For example: the master removes the application for some reason
+ throw new IllegalStateException("Spark context stopped while waiting for backend")
+ }
synchronized {
this.wait(100)
}