aboutsummaryrefslogtreecommitdiff
path: root/mesos
diff options
context:
space:
mode:
authorAngus Gerry <angolon@gmail.com>2016-09-01 10:35:31 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-09-01 10:35:31 -0700
commita0aac4b775bc8c275f96ad0fbf85c9d8a3690588 (patch)
tree94e7f9f8d5eb9f1b32009d99caea48376cc84c81 /mesos
parentadaaffa34ef0ef6a7baa5c1fea848cf5bc3987a2 (diff)
downloadspark-a0aac4b775bc8c275f96ad0fbf85c9d8a3690588.tar.gz
spark-a0aac4b775bc8c275f96ad0fbf85c9d8a3690588.tar.bz2
spark-a0aac4b775bc8c275f96ad0fbf85c9d8a3690588.zip
[SPARK-16533][CORE] resolve deadlocking in driver when executors die
## What changes were proposed in this pull request? This pull request reverts the changes made as a part of #14605, which simply side-steps the deadlock issue. Instead, I propose the following approach: * Use `scheduleWithFixedDelay` when calling `ExecutorAllocationManager.schedule` for scheduling executor requests. The intent of this is that if invocations are delayed beyond the default schedule interval on account of lock contention, then we avoid a situation where calls to `schedule` are made back-to-back, potentially releasing and then immediately reacquiring these locks - further exacerbating contention. * Replace a number of calls to `askWithRetry` with `ask` inside of message handling code in `CoarseGrainedSchedulerBackend` and its ilk. This allows us queue messages with the relevant endpoints, release whatever locks we might be holding, and then block whilst awaiting the response. This change is made at the cost of being able to retry should sending the message fail, as retrying outside of the lock could easily cause race conditions if other conflicting messages have been sent whilst awaiting a response. I believe this to be the lesser of two evils, as in many cases these RPC calls are to process local components, and so failures are more likely to be deterministic, and timeouts are more likely to be caused by lock contention. ## How was this patch tested? Existing tests, and manual tests under yarn-client mode. Author: Angus Gerry <angolon@gmail.com> Closes #14710 from angolon/SPARK-16533.
Diffstat (limited to 'mesos')
-rw-r--r--mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala5
-rw-r--r--mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala13
2 files changed, 13 insertions, 5 deletions
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index fde1fb3228..a64b5768c5 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.locks.ReentrantLock
import scala.collection.JavaConverters._
import scala.collection.mutable
+import scala.concurrent.Future
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
@@ -606,7 +607,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
super.applicationId
}
- override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
+ override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future.successful {
// We don't truly know if we can fulfill the full amount of executors
// since at coarse grain it depends on the amount of slaves available.
logInfo("Capping the total amount of executors to " + requestedTotal)
@@ -614,7 +615,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
true
}
- override def doKillExecutors(executorIds: Seq[String]): Boolean = {
+ override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future.successful {
if (mesosDriver == null) {
logWarning("Asked to kill executors before the Mesos driver was started.")
false
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index d98ddb2700..6948be0ead 100644
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster.mesos
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
import scala.concurrent.Promise
import scala.reflect.ClassTag
@@ -27,6 +28,7 @@ import org.apache.mesos.Protos._
import org.mockito.Matchers
import org.mockito.Matchers._
import org.mockito.Mockito._
+import org.scalatest.concurrent.ScalaFutures
import org.scalatest.mock.MockitoSugar
import org.scalatest.BeforeAndAfter
@@ -40,7 +42,8 @@ import org.apache.spark.scheduler.cluster.mesos.Utils._
class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
with LocalSparkContext
with MockitoSugar
- with BeforeAndAfter {
+ with BeforeAndAfter
+ with ScalaFutures {
private var sparkConf: SparkConf = _
private var driver: SchedulerDriver = _
@@ -50,6 +53,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
private var driverEndpoint: RpcEndpointRef = _
@volatile private var stopCalled = false
+ // All 'requests' to the scheduler run immediately on the same thread, so
+ // demand that all futures have their value available immediately.
+ implicit override val patienceConfig = PatienceConfig(timeout = 0.seconds)
+
test("mesos supports killing and limiting executors") {
setBackend()
sparkConf.set("spark.driver.host", "driverHost")
@@ -64,8 +71,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
verifyTaskLaunched(driver, "o1")
// kills executors
- backend.doRequestTotalExecutors(0)
- assert(backend.doKillExecutors(Seq("0")))
+ assert(backend.doRequestTotalExecutors(0).futureValue)
+ assert(backend.doKillExecutors(Seq("0")).futureValue)
val taskID0 = createTaskId("0")
verify(driver, times(1)).killTask(taskID0)