aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala29
3 files changed, 24 insertions, 17 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 6176e25898..4926cafaed 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -423,7 +423,8 @@ private[spark] class ExecutorAllocationManager(
executorsPendingToRemove.add(executorId)
true
} else {
- logWarning(s"Unable to reach the cluster manager to kill executor $executorId!")
+ logWarning(s"Unable to reach the cluster manager to kill executor $executorId," +
+ s"or no executor eligible to kill!")
false
}
}
@@ -524,7 +525,6 @@ private[spark] class ExecutorAllocationManager(
private def onExecutorBusy(executorId: String): Unit = synchronized {
logDebug(s"Clearing idle timer for $executorId because it is now running a task")
removeTimes.remove(executorId)
- executorsPendingToRemove.remove(executorId)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 7efe16749e..2279e8cad7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -471,7 +471,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
/**
* Request that the cluster manager kill the specified executors.
- * @return whether the kill request is acknowledged.
+ * @return whether the kill request is acknowledged. If list to kill is empty, it will return
+ * false.
*/
final override def killExecutors(executorIds: Seq[String]): Boolean = synchronized {
killExecutors(executorIds, replace = false, force = false)
@@ -487,7 +488,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* @param executorIds identifiers of executors to kill
* @param replace whether to replace the killed executors with new ones
* @param force whether to force kill busy executors
- * @return whether the kill request is acknowledged.
+ * @return whether the kill request is acknowledged. If list to kill is empty, it will return
+ * false.
*/
final def killExecutors(
executorIds: Seq[String],
@@ -516,7 +518,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
numPendingExecutors += knownExecutors.size
}
- doKillExecutors(executorsToKill)
+ !executorsToKill.isEmpty && doKillExecutors(executorsToKill)
}
/**
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 2fa795f846..314517d296 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -365,7 +365,7 @@ class StandaloneDynamicAllocationSuite
val executors = getExecutorIds(sc)
assert(executors.size === 2)
assert(sc.killExecutor(executors.head))
- assert(sc.killExecutor(executors.head))
+ assert(!sc.killExecutor(executors.head))
val apps = getApplications()
assert(apps.head.executors.size === 1)
// The limit should not be lowered twice
@@ -386,23 +386,28 @@ class StandaloneDynamicAllocationSuite
// the driver refuses to kill executors it does not know about
syncExecutors(sc)
val executors = getExecutorIds(sc)
+ val executorIdsBefore = executors.toSet
assert(executors.size === 2)
- // kill executor 1, and replace it
+ // kill and replace an executor
assert(sc.killAndReplaceExecutor(executors.head))
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
assert(apps.head.executors.size === 2)
+ val executorIdsAfter = getExecutorIds(sc).toSet
+ // make sure the executor was killed and replaced
+ assert(executorIdsBefore != executorIdsAfter)
}
- var apps = getApplications()
- // kill executor 1
- assert(sc.killExecutor(executors.head))
- apps = getApplications()
- assert(apps.head.executors.size === 2)
- assert(apps.head.getExecutorLimit === 2)
- // kill executor 2
- assert(sc.killExecutor(executors(1)))
- apps = getApplications()
+ // kill old executor (which is killedAndReplaced) should fail
+ assert(!sc.killExecutor(executors.head))
+
+ // refresh executors list
+ val newExecutors = getExecutorIds(sc)
+ syncExecutors(sc)
+
+ // kill newly created executor and do not replace it
+ assert(sc.killExecutor(newExecutors(1)))
+ val apps = getApplications()
assert(apps.head.executors.size === 1)
assert(apps.head.getExecutorLimit === 1)
}
@@ -430,7 +435,7 @@ class StandaloneDynamicAllocationSuite
val executorIdToTaskCount = taskScheduler invokePrivate getMap()
executorIdToTaskCount(executors.head) = 1
// kill the busy executor without force; this should fail
- assert(killExecutor(sc, executors.head, force = false))
+ assert(!killExecutor(sc, executors.head, force = false))
apps = getApplications()
assert(apps.head.executors.size === 2)