aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorGrace <jie.huang@intel.com>2015-12-18 16:04:42 -0800
committerAndrew Or <andrew@databricks.com>2015-12-18 16:04:42 -0800
commit60da0e11f6724d86df16795a7a1166879215d547 (patch)
treea03dbe4fc5a55a507ba1530b60ff51180c84e7a3 /core
parent2377b707f25449f4557bf048bb384c743d9008e5 (diff)
downloadspark-60da0e11f6724d86df16795a7a1166879215d547.tar.gz
spark-60da0e11f6724d86df16795a7a1166879215d547.tar.bz2
spark-60da0e11f6724d86df16795a7a1166879215d547.zip
[SPARK-9552] Return "false" while nothing to kill in killExecutors
In discussion (SPARK-9552), we proposed a force kill in `killExecutors`. But if there is nothing to kill, it will return back with true (acknowledgement). And then, it causes the certain executor(s) (which is not eligible to kill) adding to pendingToRemove list for further actions. In this patch, we'd like to change the return semantics. If there is nothing to kill, we will return "false". and therefore all those non-eligible executors won't be added to the pendingToRemove list. vanzin andrewor14 As the follow up of PR#7888, please let me know your comments. Author: Grace <jie.huang@intel.com> Author: Jie Huang <hjie@fosun.com> Author: Andrew Or <andrew@databricks.com> Closes #9796 from GraceH/emptyPendingToRemove.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala29
3 files changed, 24 insertions, 17 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 6176e25898..4926cafaed 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -423,7 +423,8 @@ private[spark] class ExecutorAllocationManager(
executorsPendingToRemove.add(executorId)
true
} else {
- logWarning(s"Unable to reach the cluster manager to kill executor $executorId!")
+ logWarning(s"Unable to reach the cluster manager to kill executor $executorId," +
+ s"or no executor eligible to kill!")
false
}
}
@@ -524,7 +525,6 @@ private[spark] class ExecutorAllocationManager(
private def onExecutorBusy(executorId: String): Unit = synchronized {
logDebug(s"Clearing idle timer for $executorId because it is now running a task")
removeTimes.remove(executorId)
- executorsPendingToRemove.remove(executorId)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 7efe16749e..2279e8cad7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -471,7 +471,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
/**
* Request that the cluster manager kill the specified executors.
- * @return whether the kill request is acknowledged.
+ * @return whether the kill request is acknowledged. If list to kill is empty, it will return
+ * false.
*/
final override def killExecutors(executorIds: Seq[String]): Boolean = synchronized {
killExecutors(executorIds, replace = false, force = false)
@@ -487,7 +488,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* @param executorIds identifiers of executors to kill
* @param replace whether to replace the killed executors with new ones
* @param force whether to force kill busy executors
- * @return whether the kill request is acknowledged.
+ * @return whether the kill request is acknowledged. If list to kill is empty, it will return
+ * false.
*/
final def killExecutors(
executorIds: Seq[String],
@@ -516,7 +518,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
numPendingExecutors += knownExecutors.size
}
- doKillExecutors(executorsToKill)
+ !executorsToKill.isEmpty && doKillExecutors(executorsToKill)
}
/**
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 2fa795f846..314517d296 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -365,7 +365,7 @@ class StandaloneDynamicAllocationSuite
val executors = getExecutorIds(sc)
assert(executors.size === 2)
assert(sc.killExecutor(executors.head))
- assert(sc.killExecutor(executors.head))
+ assert(!sc.killExecutor(executors.head))
val apps = getApplications()
assert(apps.head.executors.size === 1)
// The limit should not be lowered twice
@@ -386,23 +386,28 @@ class StandaloneDynamicAllocationSuite
// the driver refuses to kill executors it does not know about
syncExecutors(sc)
val executors = getExecutorIds(sc)
+ val executorIdsBefore = executors.toSet
assert(executors.size === 2)
- // kill executor 1, and replace it
+ // kill and replace an executor
assert(sc.killAndReplaceExecutor(executors.head))
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
assert(apps.head.executors.size === 2)
+ val executorIdsAfter = getExecutorIds(sc).toSet
+ // make sure the executor was killed and replaced
+ assert(executorIdsBefore != executorIdsAfter)
}
- var apps = getApplications()
- // kill executor 1
- assert(sc.killExecutor(executors.head))
- apps = getApplications()
- assert(apps.head.executors.size === 2)
- assert(apps.head.getExecutorLimit === 2)
- // kill executor 2
- assert(sc.killExecutor(executors(1)))
- apps = getApplications()
+ // kill old executor (which is killedAndReplaced) should fail
+ assert(!sc.killExecutor(executors.head))
+
+ // refresh executors list
+ val newExecutors = getExecutorIds(sc)
+ syncExecutors(sc)
+
+ // kill newly created executor and do not replace it
+ assert(sc.killExecutor(newExecutors(1)))
+ val apps = getApplications()
assert(apps.head.executors.size === 1)
assert(apps.head.getExecutorLimit === 1)
}
@@ -430,7 +435,7 @@ class StandaloneDynamicAllocationSuite
val executorIdToTaskCount = taskScheduler invokePrivate getMap()
executorIdToTaskCount(executors.head) = 1
// kill the busy executor without force; this should fail
- assert(killExecutor(sc, executors.head, force = false))
+ assert(!killExecutor(sc, executors.head, force = false))
apps = getApplications()
assert(apps.head.executors.size === 2)