From a0aac4b775bc8c275f96ad0fbf85c9d8a3690588 Mon Sep 17 00:00:00 2001 From: Angus Gerry Date: Thu, 1 Sep 2016 10:35:31 -0700 Subject: [SPARK-16533][CORE] resolve deadlocking in driver when executors die ## What changes were proposed in this pull request? This pull request reverts the changes made as a part of #14605, which simply side-steps the deadlock issue. Instead, I propose the following approach: * Use `scheduleWithFixedDelay` when calling `ExecutorAllocationManager.schedule` for scheduling executor requests. The intent of this is that if invocations are delayed beyond the default schedule interval on account of lock contention, then we avoid a situation where calls to `schedule` are made back-to-back, potentially releasing and then immediately reacquiring these locks - further exacerbating contention. * Replace a number of calls to `askWithRetry` with `ask` inside of message handling code in `CoarseGrainedSchedulerBackend` and its ilk. This allows us queue messages with the relevant endpoints, release whatever locks we might be holding, and then block whilst awaiting the response. This change is made at the cost of being able to retry should sending the message fail, as retrying outside of the lock could easily cause race conditions if other conflicting messages have been sent whilst awaiting a response. I believe this to be the lesser of two evils, as in many cases these RPC calls are to process local components, and so failures are more likely to be deterministic, and timeouts are more likely to be caused by lock contention. ## How was this patch tested? Existing tests, and manual tests under yarn-client mode. Author: Angus Gerry Closes #14710 from angolon/SPARK-16533. --- .../org/apache/spark/HeartbeatReceiverSuite.scala | 9 ++++--- .../spark/deploy/client/AppClientSuite.scala | 30 ++++++++++++++++------ 2 files changed, 27 insertions(+), 12 deletions(-) (limited to 'core/src/test/scala/org/apache') diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 5f59c176ab..915d7a1b8b 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -21,6 +21,7 @@ import java.util.concurrent.{ExecutorService, TimeUnit} import scala.collection.Map import scala.collection.mutable +import scala.concurrent.Future import scala.concurrent.duration._ import org.mockito.Matchers @@ -269,13 +270,13 @@ private class FakeSchedulerBackend( clusterManagerEndpoint: RpcEndpointRef) extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { - protected override def doRequestTotalExecutors(requestedTotal: Int): Boolean = { - clusterManagerEndpoint.askWithRetry[Boolean]( + protected override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = { + clusterManagerEndpoint.ask[Boolean]( RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount)) } - protected override def doKillExecutors(executorIds: Seq[String]): Boolean = { - clusterManagerEndpoint.askWithRetry[Boolean](KillExecutors(executorIds)) + protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = { + clusterManagerEndpoint.ask[Boolean](KillExecutors(executorIds)) } } diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala index f6ef9d15dd..416efaa75b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala @@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentLinkedQueue import scala.concurrent.duration._ import org.scalatest.BeforeAndAfterAll -import org.scalatest.concurrent.Eventually._ +import org.scalatest.concurrent.{Eventually, ScalaFutures} import org.apache.spark._ import org.apache.spark.deploy.{ApplicationDescription, Command} @@ -36,7 +36,12 @@ import org.apache.spark.util.Utils /** * End-to-end tests for application client in standalone mode. */ -class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterAll { +class AppClientSuite + extends SparkFunSuite + with LocalSparkContext + with BeforeAndAfterAll + with Eventually + with ScalaFutures { private val numWorkers = 2 private val conf = new SparkConf() private val securityManager = new SecurityManager(conf) @@ -93,7 +98,12 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd // Send message to Master to request Executors, verify request by change in executor limit val numExecutorsRequested = 1 - assert(ci.client.requestTotalExecutors(numExecutorsRequested)) + whenReady( + ci.client.requestTotalExecutors(numExecutorsRequested), + timeout(10.seconds), + interval(10.millis)) { acknowledged => + assert(acknowledged) + } eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() @@ -101,10 +111,12 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd } // Send request to kill executor, verify request was made - assert { - val apps = getApplications() - val executorId: String = apps.head.executors.head._2.fullId - ci.client.killExecutors(Seq(executorId)) + val executorId: String = getApplications().head.executors.head._2.fullId + whenReady( + ci.client.killExecutors(Seq(executorId)), + timeout(10.seconds), + interval(10.millis)) { acknowledged => + assert(acknowledged) } // Issue stop command for Client to disconnect from Master @@ -122,7 +134,9 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd val ci = new AppClientInst(masterRpcEnv.address.toSparkURL) // requests to master should fail immediately - assert(ci.client.requestTotalExecutors(3) === false) + whenReady(ci.client.requestTotalExecutors(3), timeout(1.seconds)) { success => + assert(success === false) + } } // =============================== -- cgit v1.2.3