aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorBertrand Bossy <bertrand.bossy@teralytics.net>2016-03-14 12:22:57 -0700
committerAndrew Or <andrew@databricks.com>2016-03-14 12:22:57 -0700
commit310981d49a332bd329303f610b150bbe02cf5f87 (patch)
treec0ee456d9ffa8923ce6d522071acf7bf12157b5d /core/src
parent07cb323e7a128b87ef265ddc66f033365d9de463 (diff)
downloadspark-310981d49a332bd329303f610b150bbe02cf5f87.tar.gz
spark-310981d49a332bd329303f610b150bbe02cf5f87.tar.bz2
spark-310981d49a332bd329303f610b150bbe02cf5f87.zip
[SPARK-12583][MESOS] Mesos shuffle service: Don't delete shuffle files before application has stopped
## Problem description: Mesos shuffle service is completely unusable since Spark 1.6.0 . The problem seems to occur since the move from akka to netty in the networking layer. Until now, a connection from the driver to each shuffle service was used as a signal for the shuffle service to determine, whether the driver is still running. Since 1.6.0, this connection is closed after spark.shuffle.io.connectionTimeout (or spark.network.timeout if the former is not set) due to it being idle. The shuffle service interprets this as a signal that the driver has stopped, despite the driver still being alive. Thus, shuffle files are deleted before the application has stopped. ### Context and analysis: spark shuffle fails with mesos after 2mins: https://issues.apache.org/jira/browse/SPARK-12583 External shuffle service broken w/ Mesos: https://issues.apache.org/jira/browse/SPARK-13159 This is a follow up on #11207 . ## What changes were proposed in this pull request? This PR adds a heartbeat signal from the Driver (in MesosExternalShuffleClient) to all registered external mesos shuffle service instances. In MesosExternalShuffleBlockHandler, a thread periodically checks whether a driver has timed out and cleans an application's shuffle files if this is the case. ## How was the this patch tested? This patch has been tested on a small mesos test cluster using the spark-shell. Log output from mesos shuffle service: ``` 16/02/19 15:13:45 INFO mesos.MesosExternalShuffleBlockHandler: Received registration request from app 294def07-3249-4e0f-8d71-bf8c83c58a50-0018 (remote address /xxx.xxx.xxx.xxx:52391, heartbeat timeout 120000 ms). 16/02/19 15:13:47 INFO shuffle.ExternalShuffleBlockResolver: Registered executor AppExecId{appId=294def07-3249-4e0f-8d71-bf8c83c58a50-0018, execId=3} with ExecutorShuffleInfo{localDirs=[/foo/blockmgr-c84c0697-a3f9-4f61-9c64-4d3ee227c047], subDirsPerLocalDir=64, shuffleManager=sort} 16/02/19 15:13:47 INFO shuffle.ExternalShuffleBlockResolver: Registered executor AppExecId{appId=294def07-3249-4e0f-8d71-bf8c83c58a50-0018, execId=7} with ExecutorShuffleInfo{localDirs=[/foo/blockmgr-bf46497a-de80-47b9-88f9-563123b59e03], subDirsPerLocalDir=64, shuffleManager=sort} 16/02/19 15:16:02 INFO mesos.MesosExternalShuffleBlockHandler: Application 294def07-3249-4e0f-8d71-bf8c83c58a50-0018 timed out. Removing shuffle files. 16/02/19 15:16:02 INFO shuffle.ExternalShuffleBlockResolver: Application 294def07-3249-4e0f-8d71-bf8c83c58a50-0018 removed, cleanupLocalDirs = true 16/02/19 15:16:02 INFO shuffle.ExternalShuffleBlockResolver: Cleaning up executor AppExecId{appId=294def07-3249-4e0f-8d71-bf8c83c58a50-0018, execId=3}'s 1 local dirs 16/02/19 15:16:02 INFO shuffle.ExternalShuffleBlockResolver: Cleaning up executor AppExecId{appId=294def07-3249-4e0f-8d71-bf8c83c58a50-0018, execId=7}'s 1 local dirs ``` Note: there are 2 executors running on this slave. Author: Bertrand Bossy <bertrand.bossy@teralytics.net> Closes #11272 from bbossy/SPARK-12583-mesos-shuffle-service-heartbeat.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala87
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala3
3 files changed, 65 insertions, 35 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
index 4172d924c8..c0f9129a42 100644
--- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
@@ -17,69 +17,89 @@
package org.apache.spark.deploy.mesos
-import java.net.SocketAddress
import java.nio.ByteBuffer
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
-import scala.collection.mutable
+import scala.collection.JavaConverters._
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.ExternalShuffleService
import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage
-import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver
+import org.apache.spark.network.shuffle.protocol.mesos.{RegisterDriver, ShuffleServiceHeartbeat}
import org.apache.spark.network.util.TransportConf
+import org.apache.spark.util.ThreadUtils
/**
* An RPC endpoint that receives registration requests from Spark drivers running on Mesos.
* It detects driver termination and calls the cleanup callback to [[ExternalShuffleService]].
*/
-private[mesos] class MesosExternalShuffleBlockHandler(transportConf: TransportConf)
+private[mesos] class MesosExternalShuffleBlockHandler(
+ transportConf: TransportConf,
+ cleanerIntervalS: Long)
extends ExternalShuffleBlockHandler(transportConf, null) with Logging {
- // Stores a map of driver socket addresses to app ids
- private val connectedApps = new mutable.HashMap[SocketAddress, String]
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("shuffle-cleaner-watcher")
+ .scheduleAtFixedRate(new CleanerThread(), 0, cleanerIntervalS, TimeUnit.SECONDS)
+
+ // Stores a map of app id to app state (timeout value and last heartbeat)
+ private val connectedApps = new ConcurrentHashMap[String, AppState]()
protected override def handleMessage(
message: BlockTransferMessage,
client: TransportClient,
callback: RpcResponseCallback): Unit = {
message match {
- case RegisterDriverParam(appId) =>
+ case RegisterDriverParam(appId, appState) =>
val address = client.getSocketAddress
- logDebug(s"Received registration request from app $appId (remote address $address).")
- if (connectedApps.contains(address)) {
- val existingAppId = connectedApps(address)
- if (!existingAppId.equals(appId)) {
- logError(s"A new app '$appId' has connected to existing address $address, " +
- s"removing previously registered app '$existingAppId'.")
- applicationRemoved(existingAppId, true)
- }
+ val timeout = appState.heartbeatTimeout
+ logInfo(s"Received registration request from app $appId (remote address $address, " +
+ s"heartbeat timeout $timeout ms).")
+ if (connectedApps.containsKey(appId)) {
+ logWarning(s"Received a registration request from app $appId, but it was already " +
+ s"registered")
}
- connectedApps(address) = appId
+ connectedApps.put(appId, appState)
callback.onSuccess(ByteBuffer.allocate(0))
+ case Heartbeat(appId) =>
+ val address = client.getSocketAddress
+ Option(connectedApps.get(appId)) match {
+ case Some(existingAppState) =>
+ logTrace(s"Received ShuffleServiceHeartbeat from app '$appId' (remote " +
+ s"address $address).")
+ existingAppState.lastHeartbeat = System.nanoTime()
+ case None =>
+ logWarning(s"Received ShuffleServiceHeartbeat from an unknown app (remote " +
+ s"address $address, appId '$appId').")
+ }
case _ => super.handleMessage(message, client, callback)
}
}
- /**
- * On connection termination, clean up shuffle files written by the associated application.
- */
- override def channelInactive(client: TransportClient): Unit = {
- val address = client.getSocketAddress
- if (connectedApps.contains(address)) {
- val appId = connectedApps(address)
- logInfo(s"Application $appId disconnected (address was $address).")
- applicationRemoved(appId, true /* cleanupLocalDirs */)
- connectedApps.remove(address)
- } else {
- logWarning(s"Unknown $address disconnected.")
- }
- }
-
/** An extractor object for matching [[RegisterDriver]] message. */
private object RegisterDriverParam {
- def unapply(r: RegisterDriver): Option[String] = Some(r.getAppId)
+ def unapply(r: RegisterDriver): Option[(String, AppState)] =
+ Some((r.getAppId, new AppState(r.getHeartbeatTimeoutMs, System.nanoTime())))
+ }
+
+ private object Heartbeat {
+ def unapply(h: ShuffleServiceHeartbeat): Option[String] = Some(h.getAppId)
+ }
+
+ private class AppState(val heartbeatTimeout: Long, @volatile var lastHeartbeat: Long)
+
+ private class CleanerThread extends Runnable {
+ override def run(): Unit = {
+ val now = System.nanoTime()
+ connectedApps.asScala.foreach { case (appId, appState) =>
+ if (now - appState.lastHeartbeat > appState.heartbeatTimeout * 1000 * 1000) {
+ logInfo(s"Application $appId timed out. Removing shuffle files.")
+ connectedApps.remove(appId)
+ applicationRemoved(appId, true)
+ }
+ }
+ }
}
}
@@ -93,7 +113,8 @@ private[mesos] class MesosExternalShuffleService(conf: SparkConf, securityManage
protected override def newShuffleBlockHandler(
conf: TransportConf): ExternalShuffleBlockHandler = {
- new MesosExternalShuffleBlockHandler(conf)
+ val cleanerIntervalS = this.conf.getTimeAsSeconds("spark.shuffle.cleaner.interval", "30s")
+ new MesosExternalShuffleBlockHandler(conf, cleanerIntervalS)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index e1180980ee..90b1813750 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -448,7 +448,12 @@ private[spark] class CoarseMesosSchedulerBackend(
s"host ${slave.hostname}, port $externalShufflePort for app ${conf.getAppId}")
mesosExternalShuffleClient.get
- .registerDriverWithShuffleService(slave.hostname, externalShufflePort)
+ .registerDriverWithShuffleService(
+ slave.hostname,
+ externalShufflePort,
+ sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
+ s"${sc.conf.getTimeAsMs("spark.network.timeout", "120s")}ms"),
+ sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
slave.shuffleRegistered = true
}
@@ -506,6 +511,9 @@ private[spark] class CoarseMesosSchedulerBackend(
+ "on the mesos nodes.")
}
+ // Close the mesos external shuffle client if used
+ mesosExternalShuffleClient.foreach(_.close())
+
if (mesosDriver != null) {
mesosDriver.stop()
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
index dd76644288..b18f0eb162 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
@@ -192,7 +192,8 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
val status2 = createTaskStatus("1", "s1", TaskState.TASK_RUNNING)
backend.statusUpdate(driver, status2)
- verify(externalShuffleClient, times(1)).registerDriverWithShuffleService(anyString, anyInt)
+ verify(externalShuffleClient, times(1))
+ .registerDriverWithShuffleService(anyString, anyInt, anyLong, anyLong)
}
test("mesos kills an executor when told") {