aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala38
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala105
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala30
-rw-r--r--mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala5
-rw-r--r--mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala13
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala95
9 files changed, 169 insertions, 138 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 932ba16812..6f320c5242 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -230,7 +230,7 @@ private[spark] class ExecutorAllocationManager(
}
}
}
- executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
+ executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
index a9df732df9..7a60f08aad 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
@@ -21,6 +21,8 @@ import java.util.concurrent._
import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
+import scala.concurrent.Future
+import scala.util.{Failure, Success}
import scala.util.control.NonFatal
import org.apache.spark.SparkConf
@@ -79,11 +81,6 @@ private[spark] class StandaloneAppClient(
private val registrationRetryThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread")
- // A thread pool to perform receive then reply actions in a thread so as not to block the
- // event loop.
- private val askAndReplyThreadPool =
- ThreadUtils.newDaemonCachedThreadPool("appclient-receive-and-reply-threadpool")
-
override def onStart(): Unit = {
try {
registerWithMaster(1)
@@ -220,19 +217,13 @@ private[spark] class StandaloneAppClient(
endpointRef: RpcEndpointRef,
context: RpcCallContext,
msg: T): Unit = {
- // Create a thread to ask a message and reply with the result. Allow thread to be
+ // Ask a message and create a thread to reply with the result. Allow thread to be
// interrupted during shutdown, otherwise context must be notified of NonFatal errors.
- askAndReplyThreadPool.execute(new Runnable {
- override def run(): Unit = {
- try {
- context.reply(endpointRef.askWithRetry[Boolean](msg))
- } catch {
- case ie: InterruptedException => // Cancelled
- case NonFatal(t) =>
- context.sendFailure(t)
- }
- }
- })
+ endpointRef.ask[Boolean](msg).andThen {
+ case Success(b) => context.reply(b)
+ case Failure(ie: InterruptedException) => // Cancelled
+ case Failure(NonFatal(t)) => context.sendFailure(t)
+ }(ThreadUtils.sameThread)
}
override def onDisconnected(address: RpcAddress): Unit = {
@@ -272,7 +263,6 @@ private[spark] class StandaloneAppClient(
registrationRetryThread.shutdownNow()
registerMasterFutures.get.foreach(_.cancel(true))
registerMasterThreadPool.shutdownNow()
- askAndReplyThreadPool.shutdownNow()
}
}
@@ -301,12 +291,12 @@ private[spark] class StandaloneAppClient(
*
* @return whether the request is acknowledged.
*/
- def requestTotalExecutors(requestedTotal: Int): Boolean = {
+ def requestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
if (endpoint.get != null && appId.get != null) {
- endpoint.get.askWithRetry[Boolean](RequestExecutors(appId.get, requestedTotal))
+ endpoint.get.ask[Boolean](RequestExecutors(appId.get, requestedTotal))
} else {
logWarning("Attempted to request executors before driver fully initialized.")
- false
+ Future.successful(false)
}
}
@@ -314,12 +304,12 @@ private[spark] class StandaloneAppClient(
* Kill the given list of executors through the Master.
* @return whether the kill request is acknowledged.
*/
- def killExecutors(executorIds: Seq[String]): Boolean = {
+ def killExecutors(executorIds: Seq[String]): Future[Boolean] = {
if (endpoint.get != null && appId.get != null) {
- endpoint.get.askWithRetry[Boolean](KillExecutors(appId.get, executorIds))
+ endpoint.get.ask[Boolean](KillExecutors(appId.get, executorIds))
} else {
logWarning("Attempted to kill executors before driver fully initialized.")
- false
+ Future.successful(false)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 2db3a3bb81..6d26705377 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -22,6 +22,8 @@ import java.util.concurrent.atomic.AtomicInteger
import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState}
import org.apache.spark.internal.Logging
@@ -49,6 +51,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
protected val totalRegisteredExecutors = new AtomicInteger(0)
protected val conf = scheduler.sc.conf
private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
+ private val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)
// Submit tasks only after (registered resources / total expected resources)
// is equal to at least this value, that is double between 0 and 1.
private val _minRegisteredRatio =
@@ -272,6 +275,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Remove a disconnected slave from the cluster
private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
+ logDebug(s"Asked to remove executor $executorId with reason $reason")
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
// This must be synchronized because variables mutated
@@ -446,19 +450,24 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* Request an additional number of executors from the cluster manager.
* @return whether the request is acknowledged.
*/
- final override def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized {
+ final override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
if (numAdditionalExecutors < 0) {
throw new IllegalArgumentException(
"Attempted to request a negative number of additional executor(s) " +
s"$numAdditionalExecutors from the cluster manager. Please specify a positive number!")
}
logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager")
- logDebug(s"Number of pending executors is now $numPendingExecutors")
- numPendingExecutors += numAdditionalExecutors
- // Account for executors pending to be added or removed
- val newTotal = numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size
- doRequestTotalExecutors(newTotal)
+ val response = synchronized {
+ numPendingExecutors += numAdditionalExecutors
+ logDebug(s"Number of pending executors is now $numPendingExecutors")
+
+ // Account for executors pending to be added or removed
+ doRequestTotalExecutors(
+ numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
+ }
+
+ defaultAskTimeout.awaitResult(response)
}
/**
@@ -479,19 +488,24 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
numExecutors: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int]
- ): Boolean = synchronized {
+ ): Boolean = {
if (numExecutors < 0) {
throw new IllegalArgumentException(
"Attempted to request a negative number of executor(s) " +
s"$numExecutors from the cluster manager. Please specify a positive number!")
}
- this.localityAwareTasks = localityAwareTasks
- this.hostToLocalTaskCount = hostToLocalTaskCount
+ val response = synchronized {
+ this.localityAwareTasks = localityAwareTasks
+ this.hostToLocalTaskCount = hostToLocalTaskCount
+
+ numPendingExecutors =
+ math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size, 0)
- numPendingExecutors =
- math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size, 0)
- doRequestTotalExecutors(numExecutors)
+ doRequestTotalExecutors(numExecutors)
+ }
+
+ defaultAskTimeout.awaitResult(response)
}
/**
@@ -504,16 +518,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* insufficient resources to satisfy the first request. We make the assumption here that the
* cluster manager will eventually fulfill all requests when resources free up.
*
- * @return whether the request is acknowledged.
+ * @return a future whose evaluation indicates whether the request is acknowledged.
*/
- protected def doRequestTotalExecutors(requestedTotal: Int): Boolean = false
+ protected def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] =
+ Future.successful(false)
/**
* Request that the cluster manager kill the specified executors.
* @return whether the kill request is acknowledged. If list to kill is empty, it will return
* false.
*/
- final override def killExecutors(executorIds: Seq[String]): Boolean = synchronized {
+ final override def killExecutors(executorIds: Seq[String]): Boolean = {
killExecutors(executorIds, replace = false, force = false)
}
@@ -533,39 +548,53 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
final def killExecutors(
executorIds: Seq[String],
replace: Boolean,
- force: Boolean): Boolean = synchronized {
+ force: Boolean): Boolean = {
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
- val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains)
- unknownExecutors.foreach { id =>
- logWarning(s"Executor to kill $id does not exist!")
- }
- // If an executor is already pending to be removed, do not kill it again (SPARK-9795)
- // If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552)
- val executorsToKill = knownExecutors
- .filter { id => !executorsPendingToRemove.contains(id) }
- .filter { id => force || !scheduler.isExecutorBusy(id) }
- executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace }
-
- // If we do not wish to replace the executors we kill, sync the target number of executors
- // with the cluster manager to avoid allocating new ones. When computing the new target,
- // take into account executors that are pending to be added or removed.
- if (!replace) {
- doRequestTotalExecutors(
- numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
- } else {
- numPendingExecutors += knownExecutors.size
+ val response = synchronized {
+ val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains)
+ unknownExecutors.foreach { id =>
+ logWarning(s"Executor to kill $id does not exist!")
+ }
+
+ // If an executor is already pending to be removed, do not kill it again (SPARK-9795)
+ // If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552)
+ val executorsToKill = knownExecutors
+ .filter { id => !executorsPendingToRemove.contains(id) }
+ .filter { id => force || !scheduler.isExecutorBusy(id) }
+ executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace }
+
+ // If we do not wish to replace the executors we kill, sync the target number of executors
+ // with the cluster manager to avoid allocating new ones. When computing the new target,
+ // take into account executors that are pending to be added or removed.
+ val adjustTotalExecutors =
+ if (!replace) {
+ doRequestTotalExecutors(
+ numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
+ } else {
+ numPendingExecutors += knownExecutors.size
+ Future.successful(true)
+ }
+
+ val killExecutors: Boolean => Future[Boolean] =
+ if (!executorsToKill.isEmpty) {
+ _ => doKillExecutors(executorsToKill)
+ } else {
+ _ => Future.successful(false)
+ }
+
+ adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
}
- !executorsToKill.isEmpty && doKillExecutors(executorsToKill)
+ defaultAskTimeout.awaitResult(response)
}
/**
* Kill the given list of executors through the cluster manager.
* @return whether the kill request is acknowledged.
*/
- protected def doKillExecutors(executorIds: Seq[String]): Boolean = false
-
+ protected def doKillExecutors(executorIds: Seq[String]): Future[Boolean] =
+ Future.successful(false)
}
private[spark] object CoarseGrainedSchedulerBackend {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 8382fbe9dd..5068bf2e66 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -19,6 +19,8 @@ package org.apache.spark.scheduler.cluster
import java.util.concurrent.Semaphore
+import scala.concurrent.Future
+
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientListener}
@@ -173,12 +175,12 @@ private[spark] class StandaloneSchedulerBackend(
*
* @return whether the request is acknowledged.
*/
- protected override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
+ protected override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
Option(client) match {
case Some(c) => c.requestTotalExecutors(requestedTotal)
case None =>
logWarning("Attempted to request executors before driver fully initialized.")
- false
+ Future.successful(false)
}
}
@@ -186,12 +188,12 @@ private[spark] class StandaloneSchedulerBackend(
* Kill the given list of executors through the Master.
* @return whether the kill request is acknowledged.
*/
- protected override def doKillExecutors(executorIds: Seq[String]): Boolean = {
+ protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
Option(client) match {
case Some(c) => c.killExecutors(executorIds)
case None =>
logWarning("Attempted to kill executors before driver fully initialized.")
- false
+ Future.successful(false)
}
}
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 5f59c176ab..915d7a1b8b 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.{ExecutorService, TimeUnit}
import scala.collection.Map
import scala.collection.mutable
+import scala.concurrent.Future
import scala.concurrent.duration._
import org.mockito.Matchers
@@ -269,13 +270,13 @@ private class FakeSchedulerBackend(
clusterManagerEndpoint: RpcEndpointRef)
extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
- protected override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
- clusterManagerEndpoint.askWithRetry[Boolean](
+ protected override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
+ clusterManagerEndpoint.ask[Boolean](
RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount))
}
- protected override def doKillExecutors(executorIds: Seq[String]): Boolean = {
- clusterManagerEndpoint.askWithRetry[Boolean](KillExecutors(executorIds))
+ protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
+ clusterManagerEndpoint.ask[Boolean](KillExecutors(executorIds))
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
index f6ef9d15dd..416efaa75b 100644
--- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
import scala.concurrent.duration._
import org.scalatest.BeforeAndAfterAll
-import org.scalatest.concurrent.Eventually._
+import org.scalatest.concurrent.{Eventually, ScalaFutures}
import org.apache.spark._
import org.apache.spark.deploy.{ApplicationDescription, Command}
@@ -36,7 +36,12 @@ import org.apache.spark.util.Utils
/**
* End-to-end tests for application client in standalone mode.
*/
-class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterAll {
+class AppClientSuite
+ extends SparkFunSuite
+ with LocalSparkContext
+ with BeforeAndAfterAll
+ with Eventually
+ with ScalaFutures {
private val numWorkers = 2
private val conf = new SparkConf()
private val securityManager = new SecurityManager(conf)
@@ -93,7 +98,12 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd
// Send message to Master to request Executors, verify request by change in executor limit
val numExecutorsRequested = 1
- assert(ci.client.requestTotalExecutors(numExecutorsRequested))
+ whenReady(
+ ci.client.requestTotalExecutors(numExecutorsRequested),
+ timeout(10.seconds),
+ interval(10.millis)) { acknowledged =>
+ assert(acknowledged)
+ }
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
@@ -101,10 +111,12 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd
}
// Send request to kill executor, verify request was made
- assert {
- val apps = getApplications()
- val executorId: String = apps.head.executors.head._2.fullId
- ci.client.killExecutors(Seq(executorId))
+ val executorId: String = getApplications().head.executors.head._2.fullId
+ whenReady(
+ ci.client.killExecutors(Seq(executorId)),
+ timeout(10.seconds),
+ interval(10.millis)) { acknowledged =>
+ assert(acknowledged)
}
// Issue stop command for Client to disconnect from Master
@@ -122,7 +134,9 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd
val ci = new AppClientInst(masterRpcEnv.address.toSparkURL)
// requests to master should fail immediately
- assert(ci.client.requestTotalExecutors(3) === false)
+ whenReady(ci.client.requestTotalExecutors(3), timeout(1.seconds)) { success =>
+ assert(success === false)
+ }
}
// ===============================
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index fde1fb3228..a64b5768c5 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.locks.ReentrantLock
import scala.collection.JavaConverters._
import scala.collection.mutable
+import scala.concurrent.Future
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
@@ -606,7 +607,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
super.applicationId
}
- override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
+ override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future.successful {
// We don't truly know if we can fulfill the full amount of executors
// since at coarse grain it depends on the amount of slaves available.
logInfo("Capping the total amount of executors to " + requestedTotal)
@@ -614,7 +615,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
true
}
- override def doKillExecutors(executorIds: Seq[String]): Boolean = {
+ override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future.successful {
if (mesosDriver == null) {
logWarning("Asked to kill executors before the Mesos driver was started.")
false
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index d98ddb2700..6948be0ead 100644
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster.mesos
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
import scala.concurrent.Promise
import scala.reflect.ClassTag
@@ -27,6 +28,7 @@ import org.apache.mesos.Protos._
import org.mockito.Matchers
import org.mockito.Matchers._
import org.mockito.Mockito._
+import org.scalatest.concurrent.ScalaFutures
import org.scalatest.mock.MockitoSugar
import org.scalatest.BeforeAndAfter
@@ -40,7 +42,8 @@ import org.apache.spark.scheduler.cluster.mesos.Utils._
class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
with LocalSparkContext
with MockitoSugar
- with BeforeAndAfter {
+ with BeforeAndAfter
+ with ScalaFutures {
private var sparkConf: SparkConf = _
private var driver: SchedulerDriver = _
@@ -50,6 +53,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
private var driverEndpoint: RpcEndpointRef = _
@volatile private var stopCalled = false
+ // All 'requests' to the scheduler run immediately on the same thread, so
+ // demand that all futures have their value available immediately.
+ implicit override val patienceConfig = PatienceConfig(timeout = 0.seconds)
+
test("mesos supports killing and limiting executors") {
setBackend()
sparkConf.set("spark.driver.host", "driverHost")
@@ -64,8 +71,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
verifyTaskLaunched(driver, "o1")
// kills executors
- backend.doRequestTotalExecutors(0)
- assert(backend.doKillExecutors(Seq("0")))
+ assert(backend.doRequestTotalExecutors(0).futureValue)
+ assert(backend.doKillExecutors(Seq("0")).futureValue)
val taskID0 = createTaskId("0")
verify(driver, times(1)).killTask(taskID0)
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index ea63ff5dc1..2f9ea1911f 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -18,6 +18,7 @@
package org.apache.spark.scheduler.cluster
import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
import scala.util.control.NonFatal
import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
@@ -124,28 +125,16 @@ private[spark] abstract class YarnSchedulerBackend(
* Request executors from the ApplicationMaster by specifying the total number desired.
* This includes executors already pending or running.
*/
- override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
- val r = RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount)
- yarnSchedulerEndpoint.amEndpoint match {
- case Some(am) =>
- try {
- am.askWithRetry[Boolean](r)
- } catch {
- case NonFatal(e) =>
- logError(s"Sending $r to AM was unsuccessful", e)
- return false
- }
- case None =>
- logWarning("Attempted to request executors before the AM has registered!")
- return false
- }
+ override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
+ yarnSchedulerEndpointRef.ask[Boolean](
+ RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount))
}
/**
* Request that the ApplicationMaster kill the specified executors.
*/
- override def doKillExecutors(executorIds: Seq[String]): Boolean = {
- yarnSchedulerEndpointRef.askWithRetry[Boolean](KillExecutors(executorIds))
+ override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
+ yarnSchedulerEndpointRef.ask[Boolean](KillExecutors(executorIds))
}
override def sufficientResourcesRegistered(): Boolean = {
@@ -221,37 +210,37 @@ private[spark] abstract class YarnSchedulerBackend(
*/
private class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv)
extends ThreadSafeRpcEndpoint with Logging {
- var amEndpoint: Option[RpcEndpointRef] = None
-
- private val askAmThreadPool =
- ThreadUtils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool")
- implicit val askAmExecutor = ExecutionContext.fromExecutor(askAmThreadPool)
+ private var amEndpoint: Option[RpcEndpointRef] = None
private[YarnSchedulerBackend] def handleExecutorDisconnectedFromDriver(
executorId: String,
executorRpcAddress: RpcAddress): Unit = {
- amEndpoint match {
+ val removeExecutorMessage = amEndpoint match {
case Some(am) =>
val lossReasonRequest = GetExecutorLossReason(executorId)
- val future = am.ask[ExecutorLossReason](lossReasonRequest, askTimeout)
- future onSuccess {
- case reason: ExecutorLossReason =>
- driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason))
- }
- future onFailure {
- case NonFatal(e) =>
- logWarning(s"Attempted to get executor loss reason" +
- s" for executor id ${executorId} at RPC address ${executorRpcAddress}," +
- s" but got no response. Marking as slave lost.", e)
- driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, SlaveLost()))
- case t => throw t
- }
+ am.ask[ExecutorLossReason](lossReasonRequest, askTimeout)
+ .map { reason => RemoveExecutor(executorId, reason) }(ThreadUtils.sameThread)
+ .recover {
+ case NonFatal(e) =>
+ logWarning(s"Attempted to get executor loss reason" +
+ s" for executor id ${executorId} at RPC address ${executorRpcAddress}," +
+ s" but got no response. Marking as slave lost.", e)
+ RemoveExecutor(executorId, SlaveLost())
+ }(ThreadUtils.sameThread)
case None =>
logWarning("Attempted to check for an executor loss reason" +
" before the AM has registered!")
- driverEndpoint.askWithRetry[Boolean](
- RemoveExecutor(executorId, SlaveLost("AM is not yet registered.")))
+ Future.successful(RemoveExecutor(executorId, SlaveLost("AM is not yet registered.")))
}
+
+ removeExecutorMessage
+ .flatMap { message =>
+ driverEndpoint.ask[Boolean](message)
+ }(ThreadUtils.sameThread)
+ .onFailure {
+ case NonFatal(e) => logError(
+ s"Error requesting driver to remove executor $executorId after disconnection.", e)
+ }(ThreadUtils.sameThread)
}
override def receive: PartialFunction[Any, Unit] = {
@@ -269,9 +258,13 @@ private[spark] abstract class YarnSchedulerBackend(
case AddWebUIFilter(filterName, filterParams, proxyBase) =>
addWebUIFilter(filterName, filterParams, proxyBase)
- case RemoveExecutor(executorId, reason) =>
+ case r @ RemoveExecutor(executorId, reason) =>
logWarning(reason.toString)
- removeExecutor(executorId, reason)
+ driverEndpoint.ask[Boolean](r).onFailure {
+ case e =>
+ logError("Error requesting driver to remove executor" +
+ s" $executorId for reason $reason", e)
+ }(ThreadUtils.sameThread)
}
@@ -279,13 +272,12 @@ private[spark] abstract class YarnSchedulerBackend(
case r: RequestExecutors =>
amEndpoint match {
case Some(am) =>
- Future {
- context.reply(am.askWithRetry[Boolean](r))
- } onFailure {
- case NonFatal(e) =>
+ am.ask[Boolean](r).andThen {
+ case Success(b) => context.reply(b)
+ case Failure(NonFatal(e)) =>
logError(s"Sending $r to AM was unsuccessful", e)
context.sendFailure(e)
- }
+ }(ThreadUtils.sameThread)
case None =>
logWarning("Attempted to request executors before the AM has registered!")
context.reply(false)
@@ -294,13 +286,12 @@ private[spark] abstract class YarnSchedulerBackend(
case k: KillExecutors =>
amEndpoint match {
case Some(am) =>
- Future {
- context.reply(am.askWithRetry[Boolean](k))
- } onFailure {
- case NonFatal(e) =>
+ am.ask[Boolean](k).andThen {
+ case Success(b) => context.reply(b)
+ case Failure(NonFatal(e)) =>
logError(s"Sending $k to AM was unsuccessful", e)
context.sendFailure(e)
- }
+ }(ThreadUtils.sameThread)
case None =>
logWarning("Attempted to kill executors before the AM has registered!")
context.reply(false)
@@ -316,10 +307,6 @@ private[spark] abstract class YarnSchedulerBackend(
amEndpoint = None
}
}
-
- override def onStop(): Unit = {
- askAmThreadPool.shutdownNow()
- }
}
}