aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorGrace <jie.huang@intel.com>2015-11-17 15:43:35 -0800
committerAndrew Or <andrew@databricks.com>2015-11-17 15:43:35 -0800
commit965245d087c18edc6c3d5baddeaf83163e32e330 (patch)
treeba3980a0975a8bbaa527d42892379056346f8318 /core
parent928d631625297857fb6998fbeb0696917fbfd60f (diff)
downloadspark-965245d087c18edc6c3d5baddeaf83163e32e330.tar.gz
spark-965245d087c18edc6c3d5baddeaf83163e32e330.tar.bz2
spark-965245d087c18edc6c3d5baddeaf83163e32e330.zip
[SPARK-9552] Add force control for killExecutors to avoid false killing for those busy executors
By using the dynamic allocation, sometimes it occurs false killing for those busy executors. Some executors with assignments will be killed because of being idle for enough time (say 60 seconds). The root cause is that the Task-Launch listener event is asynchronized. For example, some executors are under assigning tasks, but not sending out the listener notification yet. Meanwhile, the dynamic allocation's executor idle time is up (e.g., 60 seconds). It will trigger killExecutor event at the same time. 1. the timer expiration starts before the listener event arrives. 2. Then, the task is going to run on top of that killed/killing executor. It will lead to task failure finally. Here is the proposal to fix it. We can add the force control for killExecutor. If the force control is not set (i.e., false), we'd better to check if the executor under killing is idle or busy. If the current executor has some assignment, we should not kill that executor and return back false (to indicate killing failure). In dynamic allocation, we'd better to turn off force killing (i.e., force = false), we will meet killing failure if tries to kill a busy executor. And then, the executor timer won't be invalid. Later on, the task assignment event arrives, we can remove the idle timer accordingly. So that we can avoid false killing for those busy executors in dynamic allocation. For the rest of usages, the end users can decide if to use force killing or not by themselves. If to turn on that option, the killExecutor will do the action without any status checking. Author: Grace <jie.huang@intel.com> Author: Andrew Or <andrew@databricks.com> Author: Jie Huang <jie.huang@intel.com> Closes #7888 from GraceH/forcekill.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala13
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala52
5 files changed, 82 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index b93536e653..6419218f47 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -509,6 +509,7 @@ private[spark] class ExecutorAllocationManager(
private def onExecutorBusy(executorId: String): Unit = synchronized {
logDebug(s"Clearing idle timer for $executorId because it is now running a task")
removeTimes.remove(executorId)
+ executorsPendingToRemove.remove(executorId)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4bbd0b038c..b5645b08f9 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1461,7 +1461,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
override def killExecutors(executorIds: Seq[String]): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
- b.killExecutors(executorIds)
+ b.killExecutors(executorIds, replace = false, force = true)
case _ =>
logWarning("Killing executors is only supported in coarse-grained mode")
false
@@ -1499,7 +1499,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
- b.killExecutors(Seq(executorId), replace = true)
+ b.killExecutors(Seq(executorId), replace = true, force = true)
case _ =>
logWarning("Killing executors is only supported in coarse-grained mode")
false
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 5f136690f4..bf0419db1f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -87,8 +87,8 @@ private[spark] class TaskSchedulerImpl(
// Incrementing task IDs
val nextTaskId = new AtomicLong(0)
- // Which executor IDs we have executors on
- val activeExecutorIds = new HashSet[String]
+ // Number of tasks running on each executor
+ private val executorIdToTaskCount = new HashMap[String, Int]
// The set of executors we have on each host; this is used to compute hostsAlive, which
// in turn is used to decide when we can attain data locality on a given host
@@ -254,6 +254,7 @@ private[spark] class TaskSchedulerImpl(
val tid = task.taskId
taskIdToTaskSetManager(tid) = taskSet
taskIdToExecutorId(tid) = execId
+ executorIdToTaskCount(execId) += 1
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
@@ -282,7 +283,7 @@ private[spark] class TaskSchedulerImpl(
var newExecAvail = false
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
- activeExecutorIds += o.executorId
+ executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
@@ -331,7 +332,8 @@ private[spark] class TaskSchedulerImpl(
if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
// We lost this entire executor, so remember that it's gone
val execId = taskIdToExecutorId(tid)
- if (activeExecutorIds.contains(execId)) {
+
+ if (executorIdToTaskCount.contains(execId)) {
removeExecutor(execId,
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
failedExecutor = Some(execId)
@@ -341,7 +343,11 @@ private[spark] class TaskSchedulerImpl(
case Some(taskSet) =>
if (TaskState.isFinished(state)) {
taskIdToTaskSetManager.remove(tid)
- taskIdToExecutorId.remove(tid)
+ taskIdToExecutorId.remove(tid).foreach { execId =>
+ if (executorIdToTaskCount.contains(execId)) {
+ executorIdToTaskCount(execId) -= 1
+ }
+ }
}
if (state == TaskState.FINISHED) {
taskSet.removeRunningTask(tid)
@@ -462,7 +468,7 @@ private[spark] class TaskSchedulerImpl(
var failedExecutor: Option[String] = None
synchronized {
- if (activeExecutorIds.contains(executorId)) {
+ if (executorIdToTaskCount.contains(executorId)) {
val hostPort = executorIdToHost(executorId)
logError("Lost executor %s on %s: %s".format(executorId, hostPort, reason))
removeExecutor(executorId, reason)
@@ -498,7 +504,8 @@ private[spark] class TaskSchedulerImpl(
* of any running tasks, since the loss reason defines whether we'll fail those tasks.
*/
private def removeExecutor(executorId: String, reason: ExecutorLossReason) {
- activeExecutorIds -= executorId
+ executorIdToTaskCount -= executorId
+
val host = executorIdToHost(executorId)
val execs = executorsByHost.getOrElse(host, new HashSet)
execs -= executorId
@@ -535,7 +542,11 @@ private[spark] class TaskSchedulerImpl(
}
def isExecutorAlive(execId: String): Boolean = synchronized {
- activeExecutorIds.contains(execId)
+ executorIdToTaskCount.contains(execId)
+ }
+
+ def isExecutorBusy(execId: String): Boolean = synchronized {
+ executorIdToTaskCount.getOrElse(execId, -1) > 0
}
// By default, rack is unknown
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 3373caf0d1..6f0c910c00 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -453,7 +453,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* @return whether the kill request is acknowledged.
*/
final override def killExecutors(executorIds: Seq[String]): Boolean = synchronized {
- killExecutors(executorIds, replace = false)
+ killExecutors(executorIds, replace = false, force = false)
}
/**
@@ -461,9 +461,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
*
* @param executorIds identifiers of executors to kill
* @param replace whether to replace the killed executors with new ones
+ * @param force whether to force kill busy executors
* @return whether the kill request is acknowledged.
*/
- final def killExecutors(executorIds: Seq[String], replace: Boolean): Boolean = synchronized {
+ final def killExecutors(
+ executorIds: Seq[String],
+ replace: Boolean,
+ force: Boolean): Boolean = synchronized {
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains)
unknownExecutors.foreach { id =>
@@ -471,7 +475,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
// If an executor is already pending to be removed, do not kill it again (SPARK-9795)
- val executorsToKill = knownExecutors.filter { id => !executorsPendingToRemove.contains(id) }
+ // If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552)
+ val executorsToKill = knownExecutors
+ .filter { id => !executorsPendingToRemove.contains(id) }
+ .filter { id => force || !scheduler.isExecutorBusy(id) }
executorsPendingToRemove ++= executorsToKill
// If we do not wish to replace the executors we kill, sync the target number of executors
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index d145e78834..2fa795f846 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -17,10 +17,11 @@
package org.apache.spark.deploy
+import scala.collection.mutable
import scala.concurrent.duration._
import org.mockito.Mockito.{mock, when}
-import org.scalatest.BeforeAndAfterAll
+import org.scalatest.{BeforeAndAfterAll, PrivateMethodTester}
import org.scalatest.concurrent.Eventually._
import org.apache.spark._
@@ -29,6 +30,7 @@ import org.apache.spark.deploy.master.ApplicationInfo
import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
+import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor
@@ -38,7 +40,8 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterE
class StandaloneDynamicAllocationSuite
extends SparkFunSuite
with LocalSparkContext
- with BeforeAndAfterAll {
+ with BeforeAndAfterAll
+ with PrivateMethodTester {
private val numWorkers = 2
private val conf = new SparkConf()
@@ -404,6 +407,41 @@ class StandaloneDynamicAllocationSuite
assert(apps.head.getExecutorLimit === 1)
}
+ test("disable force kill for busy executors (SPARK-9552)") {
+ sc = new SparkContext(appConf)
+ val appId = sc.applicationId
+ eventually(timeout(10.seconds), interval(10.millis)) {
+ val apps = getApplications()
+ assert(apps.size === 1)
+ assert(apps.head.id === appId)
+ assert(apps.head.executors.size === 2)
+ assert(apps.head.getExecutorLimit === Int.MaxValue)
+ }
+ var apps = getApplications()
+ // sync executors between the Master and the driver, needed because
+ // the driver refuses to kill executors it does not know about
+ syncExecutors(sc)
+ val executors = getExecutorIds(sc)
+ assert(executors.size === 2)
+
+ // simulate running a task on the executor
+ val getMap = PrivateMethod[mutable.HashMap[String, Int]]('executorIdToTaskCount)
+ val taskScheduler = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl]
+ val executorIdToTaskCount = taskScheduler invokePrivate getMap()
+ executorIdToTaskCount(executors.head) = 1
+ // kill the busy executor without force; this should fail
+ assert(killExecutor(sc, executors.head, force = false))
+ apps = getApplications()
+ assert(apps.head.executors.size === 2)
+
+ // force kill busy executor
+ assert(killExecutor(sc, executors.head, force = true))
+ apps = getApplications()
+ // kill executor successfully
+ assert(apps.head.executors.size === 1)
+
+ }
+
// ===============================
// | Utility methods for testing |
// ===============================
@@ -455,6 +493,16 @@ class StandaloneDynamicAllocationSuite
sc.killExecutors(getExecutorIds(sc).take(n))
}
+ /** Kill the given executor, specifying whether to force kill it. */
+ private def killExecutor(sc: SparkContext, executorId: String, force: Boolean): Boolean = {
+ syncExecutors(sc)
+ sc.schedulerBackend match {
+ case b: CoarseGrainedSchedulerBackend =>
+ b.killExecutors(Seq(executorId), replace = false, force)
+ case _ => fail("expected coarse grained scheduler")
+ }
+ }
+
/**
* Return a list of executor IDs belonging to this application.
*