aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorHong Shen <hongshen@tencent.com>2015-02-26 18:43:23 -0800
committerAndrew Or <andrew@databricks.com>2015-02-26 18:43:23 -0800
commit18f2098433e0bfef9497bacd601fdf098ed03eab (patch)
tree34d00b026b1ad7af3cc51fdc1dd414ce47af3851 /core
parentfbc469473dd529eb72046186b85dd8fc2b7c5bb5 (diff)
downloadspark-18f2098433e0bfef9497bacd601fdf098ed03eab.tar.gz
spark-18f2098433e0bfef9497bacd601fdf098ed03eab.tar.bz2
spark-18f2098433e0bfef9497bacd601fdf098ed03eab.zip
[SPARK-5529][CORE]Add expireDeadHosts in HeartbeatReceiver
If a blockManager has not send heartBeat more than 120s, BlockManagerMasterActor will remove it. But coarseGrainedSchedulerBackend can only remove executor after an DisassociatedEvent. We should expireDeadHosts at HeartbeatReceiver. Author: Hong Shen <hongshen@tencent.com> Closes #4363 from shenh062326/my_change3 and squashes the following commits: 2c9a46a [Hong Shen] Change some code style. 1a042ff [Hong Shen] Change some code style. 2dc456e [Hong Shen] Change some code style. d221493 [Hong Shen] Fix test failed 7448ac6 [Hong Shen] A minor change in sparkContext and heartbeatReceiver b904aed [Hong Shen] Fix failed test 52725af [Hong Shen] Remove assert in SparkContext.killExecutors 5bedcb8 [Hong Shen] Remove assert in SparkContext.killExecutors a858fb5 [Hong Shen] A minor change in HeartbeatReceiver 3e221d9 [Hong Shen] A minor change in HeartbeatReceiver 6bab7aa [Hong Shen] Change a code style. 07952f3 [Hong Shen] Change configs name and code style. ce9257e [Hong Shen] Fix test failed bccd515 [Hong Shen] Fix test failed 8e77408 [Hong Shen] Fix test failed c1dfda1 [Hong Shen] Fix test failed e197e20 [Hong Shen] Fix test failed fb5df97 [Hong Shen] Remove ExpireDeadHosts in BlockManagerMessages b5c0441 [Hong Shen] Remove expireDeadHosts in BlockManagerMasterActor c922cb0 [Hong Shen] Add expireDeadHosts in HeartbeatReceiver
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala65
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala2
7 files changed, 79 insertions, 49 deletions
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 83ae57b7f1..69178da1a7 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -17,33 +17,86 @@
package org.apache.spark
-import akka.actor.Actor
+import scala.concurrent.duration._
+import scala.collection.mutable
+
+import akka.actor.{Actor, Cancellable}
+
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.scheduler.TaskScheduler
+import org.apache.spark.scheduler.{SlaveLost, TaskScheduler}
import org.apache.spark.util.ActorLogReceive
/**
* A heartbeat from executors to the driver. This is a shared message used by several internal
- * components to convey liveness or execution information for in-progress tasks.
+ * components to convey liveness or execution information for in-progress tasks. It will also
+ * expire the hosts that have not heartbeated for more than spark.network.timeout.
*/
private[spark] case class Heartbeat(
executorId: String,
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
blockManagerId: BlockManagerId)
+private[spark] case object ExpireDeadHosts
+
private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
/**
* Lives in the driver to receive heartbeats from executors..
*/
-private[spark] class HeartbeatReceiver(scheduler: TaskScheduler)
+private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler)
extends Actor with ActorLogReceive with Logging {
+ // executor ID -> timestamp of when the last heartbeat from this executor was received
+ private val executorLastSeen = new mutable.HashMap[String, Long]
+
+ private val executorTimeoutMs = sc.conf.getLong("spark.network.timeout",
+ sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120)) * 1000
+
+ private val checkTimeoutIntervalMs = sc.conf.getLong("spark.network.timeoutInterval",
+ sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60)) * 1000
+
+ private var timeoutCheckingTask: Cancellable = null
+
+ override def preStart(): Unit = {
+ import context.dispatcher
+ timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
+ checkTimeoutIntervalMs.milliseconds, self, ExpireDeadHosts)
+ super.preStart()
+ }
+
override def receiveWithLogging = {
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
- val response = HeartbeatResponse(
- !scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))
+ val unknownExecutor = !scheduler.executorHeartbeatReceived(
+ executorId, taskMetrics, blockManagerId)
+ val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
+ executorLastSeen(executorId) = System.currentTimeMillis()
sender ! response
+ case ExpireDeadHosts =>
+ expireDeadHosts()
+ }
+
+ private def expireDeadHosts(): Unit = {
+ logTrace("Checking for hosts with no recent heartbeats in HeartbeatReceiver.")
+ val now = System.currentTimeMillis()
+ for ((executorId, lastSeenMs) <- executorLastSeen) {
+ if (now - lastSeenMs > executorTimeoutMs) {
+ logWarning(s"Removing executor $executorId with no recent heartbeats: " +
+ s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
+ scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " +
+ "timed out after ${now - lastSeenMs} ms"))
+ if (sc.supportDynamicAllocation) {
+ sc.killExecutor(executorId)
+ }
+ executorLastSeen.remove(executorId)
+ }
+ }
+ }
+
+ override def postStop(): Unit = {
+ if (timeoutCheckingTask != null) {
+ timeoutCheckingTask.cancel()
+ }
+ super.postStop()
}
}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 930d4bea47..d3948d4e6d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -351,7 +351,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] var (schedulerBackend, taskScheduler) =
SparkContext.createTaskScheduler(this, master)
private val heartbeatReceiver = env.actorSystem.actorOf(
- Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
+ Props(new HeartbeatReceiver(this, taskScheduler)), "HeartbeatReceiver")
@volatile private[spark] var dagScheduler: DAGScheduler = _
try {
dagScheduler = new DAGScheduler(this)
@@ -398,7 +398,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private val dynamicAllocationTesting = conf.getBoolean("spark.dynamicAllocation.testing", false)
private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] =
if (dynamicAllocationEnabled) {
- assert(master.contains("yarn") || dynamicAllocationTesting,
+ assert(supportDynamicAllocation,
"Dynamic allocation of executors is currently only supported in YARN mode")
Some(new ExecutorAllocationManager(this, listenerBus, conf))
} else {
@@ -1123,6 +1123,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
/**
+ * Return whether dynamically adjusting the amount of resources allocated to
+ * this application is supported. This is currently only available for YARN.
+ */
+ private[spark] def supportDynamicAllocation =
+ master.contains("yarn") || dynamicAllocationTesting
+
+ /**
* :: DeveloperApi ::
* Register a listener to receive up-calls from events that happen during execution.
*/
@@ -1155,7 +1162,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
@DeveloperApi
override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
- assert(master.contains("yarn") || dynamicAllocationTesting,
+ assert(supportDynamicAllocation,
"Requesting executors is currently only supported in YARN mode")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
@@ -1173,7 +1180,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
@DeveloperApi
override def killExecutors(executorIds: Seq[String]): Boolean = {
- assert(master.contains("yarn") || dynamicAllocationTesting,
+ assert(supportDynamicAllocation,
"Killing executors is currently only supported in YARN mode")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index f095915352..ed3418676e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -73,5 +73,9 @@ private[spark] trait TaskScheduler {
* @return An application ID
*/
def applicationId(): String = appId
-
+
+ /**
+ * Process a lost executor
+ */
+ def executorLost(executorId: String, reason: ExecutorLossReason): Unit
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 54f8fcfc41..7a9cf1c2e7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -436,7 +436,7 @@ private[spark] class TaskSchedulerImpl(
}
}
- def executorLost(executorId: String, reason: ExecutorLossReason) {
+ override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {
var failedExecutor: Option[String] = None
synchronized {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 64133464d8..787b0f96be 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConversions._
import scala.concurrent.Future
import scala.concurrent.duration._
-import akka.actor.{Actor, ActorRef, Cancellable}
+import akka.actor.{Actor, ActorRef}
import akka.pattern.ask
import org.apache.spark.{Logging, SparkConf, SparkException}
@@ -52,19 +52,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
private val akkaTimeout = AkkaUtils.askTimeout(conf)
- val slaveTimeout = conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120 * 1000)
-
- val checkTimeoutInterval = conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000)
-
- var timeoutCheckingTask: Cancellable = null
-
- override def preStart() {
- import context.dispatcher
- timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
- checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
- super.preStart()
- }
-
override def receiveWithLogging = {
case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
register(blockManagerId, maxMemSize, slaveActor)
@@ -118,14 +105,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
case StopBlockManagerMaster =>
sender ! true
- if (timeoutCheckingTask != null) {
- timeoutCheckingTask.cancel()
- }
context.stop(self)
- case ExpireDeadHosts =>
- expireDeadHosts()
-
case BlockManagerHeartbeat(blockManagerId) =>
sender ! heartbeatReceived(blockManagerId)
@@ -207,21 +188,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
logInfo(s"Removing block manager $blockManagerId")
}
- private def expireDeadHosts() {
- logTrace("Checking for hosts with no recent heart beats in BlockManagerMaster.")
- val now = System.currentTimeMillis()
- val minSeenTime = now - slaveTimeout
- val toRemove = new mutable.HashSet[BlockManagerId]
- for (info <- blockManagerInfo.values) {
- if (info.lastSeenMs < minSeenTime && !info.blockManagerId.isDriver) {
- logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: "
- + (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
- toRemove += info.blockManagerId
- }
- }
- toRemove.foreach(removeBlockManager)
- }
-
private def removeExecutor(execId: String) {
logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 3f32099d08..48247453ed 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -109,6 +109,4 @@ private[spark] object BlockManagerMessages {
extends ToBlockManagerMaster
case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
-
- case object ExpireDeadHosts extends ToBlockManagerMaster
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 4bf7f9e647..30119ce5d4 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -96,6 +96,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
}
override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
override def defaultParallelism() = 2
+ override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
}
/** Length of time to wait while draining listener events. */
@@ -386,6 +387,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
override def defaultParallelism() = 2
override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId): Boolean = true
+ override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
}
val noKillScheduler = new DAGScheduler(
sc,