aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala38
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala105
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala30
6 files changed, 115 insertions, 79 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 932ba16812..6f320c5242 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -230,7 +230,7 @@ private[spark] class ExecutorAllocationManager(
}
}
}
- executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
+ executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
index a9df732df9..7a60f08aad 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
@@ -21,6 +21,8 @@ import java.util.concurrent._
import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
+import scala.concurrent.Future
+import scala.util.{Failure, Success}
import scala.util.control.NonFatal
import org.apache.spark.SparkConf
@@ -79,11 +81,6 @@ private[spark] class StandaloneAppClient(
private val registrationRetryThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread")
- // A thread pool to perform receive then reply actions in a thread so as not to block the
- // event loop.
- private val askAndReplyThreadPool =
- ThreadUtils.newDaemonCachedThreadPool("appclient-receive-and-reply-threadpool")
-
override def onStart(): Unit = {
try {
registerWithMaster(1)
@@ -220,19 +217,13 @@ private[spark] class StandaloneAppClient(
endpointRef: RpcEndpointRef,
context: RpcCallContext,
msg: T): Unit = {
- // Create a thread to ask a message and reply with the result. Allow thread to be
+ // Ask a message and create a thread to reply with the result. Allow thread to be
// interrupted during shutdown, otherwise context must be notified of NonFatal errors.
- askAndReplyThreadPool.execute(new Runnable {
- override def run(): Unit = {
- try {
- context.reply(endpointRef.askWithRetry[Boolean](msg))
- } catch {
- case ie: InterruptedException => // Cancelled
- case NonFatal(t) =>
- context.sendFailure(t)
- }
- }
- })
+ endpointRef.ask[Boolean](msg).andThen {
+ case Success(b) => context.reply(b)
+ case Failure(ie: InterruptedException) => // Cancelled
+ case Failure(NonFatal(t)) => context.sendFailure(t)
+ }(ThreadUtils.sameThread)
}
override def onDisconnected(address: RpcAddress): Unit = {
@@ -272,7 +263,6 @@ private[spark] class StandaloneAppClient(
registrationRetryThread.shutdownNow()
registerMasterFutures.get.foreach(_.cancel(true))
registerMasterThreadPool.shutdownNow()
- askAndReplyThreadPool.shutdownNow()
}
}
@@ -301,12 +291,12 @@ private[spark] class StandaloneAppClient(
*
* @return whether the request is acknowledged.
*/
- def requestTotalExecutors(requestedTotal: Int): Boolean = {
+ def requestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
if (endpoint.get != null && appId.get != null) {
- endpoint.get.askWithRetry[Boolean](RequestExecutors(appId.get, requestedTotal))
+ endpoint.get.ask[Boolean](RequestExecutors(appId.get, requestedTotal))
} else {
logWarning("Attempted to request executors before driver fully initialized.")
- false
+ Future.successful(false)
}
}
@@ -314,12 +304,12 @@ private[spark] class StandaloneAppClient(
* Kill the given list of executors through the Master.
* @return whether the kill request is acknowledged.
*/
- def killExecutors(executorIds: Seq[String]): Boolean = {
+ def killExecutors(executorIds: Seq[String]): Future[Boolean] = {
if (endpoint.get != null && appId.get != null) {
- endpoint.get.askWithRetry[Boolean](KillExecutors(appId.get, executorIds))
+ endpoint.get.ask[Boolean](KillExecutors(appId.get, executorIds))
} else {
logWarning("Attempted to kill executors before driver fully initialized.")
- false
+ Future.successful(false)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 2db3a3bb81..6d26705377 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -22,6 +22,8 @@ import java.util.concurrent.atomic.AtomicInteger
import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState}
import org.apache.spark.internal.Logging
@@ -49,6 +51,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
protected val totalRegisteredExecutors = new AtomicInteger(0)
protected val conf = scheduler.sc.conf
private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
+ private val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)
// Submit tasks only after (registered resources / total expected resources)
// is equal to at least this value, that is double between 0 and 1.
private val _minRegisteredRatio =
@@ -272,6 +275,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Remove a disconnected slave from the cluster
private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
+ logDebug(s"Asked to remove executor $executorId with reason $reason")
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
// This must be synchronized because variables mutated
@@ -446,19 +450,24 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* Request an additional number of executors from the cluster manager.
* @return whether the request is acknowledged.
*/
- final override def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized {
+ final override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
if (numAdditionalExecutors < 0) {
throw new IllegalArgumentException(
"Attempted to request a negative number of additional executor(s) " +
s"$numAdditionalExecutors from the cluster manager. Please specify a positive number!")
}
logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager")
- logDebug(s"Number of pending executors is now $numPendingExecutors")
- numPendingExecutors += numAdditionalExecutors
- // Account for executors pending to be added or removed
- val newTotal = numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size
- doRequestTotalExecutors(newTotal)
+ val response = synchronized {
+ numPendingExecutors += numAdditionalExecutors
+ logDebug(s"Number of pending executors is now $numPendingExecutors")
+
+ // Account for executors pending to be added or removed
+ doRequestTotalExecutors(
+ numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
+ }
+
+ defaultAskTimeout.awaitResult(response)
}
/**
@@ -479,19 +488,24 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
numExecutors: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int]
- ): Boolean = synchronized {
+ ): Boolean = {
if (numExecutors < 0) {
throw new IllegalArgumentException(
"Attempted to request a negative number of executor(s) " +
s"$numExecutors from the cluster manager. Please specify a positive number!")
}
- this.localityAwareTasks = localityAwareTasks
- this.hostToLocalTaskCount = hostToLocalTaskCount
+ val response = synchronized {
+ this.localityAwareTasks = localityAwareTasks
+ this.hostToLocalTaskCount = hostToLocalTaskCount
+
+ numPendingExecutors =
+ math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size, 0)
- numPendingExecutors =
- math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size, 0)
- doRequestTotalExecutors(numExecutors)
+ doRequestTotalExecutors(numExecutors)
+ }
+
+ defaultAskTimeout.awaitResult(response)
}
/**
@@ -504,16 +518,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* insufficient resources to satisfy the first request. We make the assumption here that the
* cluster manager will eventually fulfill all requests when resources free up.
*
- * @return whether the request is acknowledged.
+ * @return a future whose evaluation indicates whether the request is acknowledged.
*/
- protected def doRequestTotalExecutors(requestedTotal: Int): Boolean = false
+ protected def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] =
+ Future.successful(false)
/**
* Request that the cluster manager kill the specified executors.
* @return whether the kill request is acknowledged. If list to kill is empty, it will return
* false.
*/
- final override def killExecutors(executorIds: Seq[String]): Boolean = synchronized {
+ final override def killExecutors(executorIds: Seq[String]): Boolean = {
killExecutors(executorIds, replace = false, force = false)
}
@@ -533,39 +548,53 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
final def killExecutors(
executorIds: Seq[String],
replace: Boolean,
- force: Boolean): Boolean = synchronized {
+ force: Boolean): Boolean = {
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
- val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains)
- unknownExecutors.foreach { id =>
- logWarning(s"Executor to kill $id does not exist!")
- }
- // If an executor is already pending to be removed, do not kill it again (SPARK-9795)
- // If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552)
- val executorsToKill = knownExecutors
- .filter { id => !executorsPendingToRemove.contains(id) }
- .filter { id => force || !scheduler.isExecutorBusy(id) }
- executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace }
-
- // If we do not wish to replace the executors we kill, sync the target number of executors
- // with the cluster manager to avoid allocating new ones. When computing the new target,
- // take into account executors that are pending to be added or removed.
- if (!replace) {
- doRequestTotalExecutors(
- numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
- } else {
- numPendingExecutors += knownExecutors.size
+ val response = synchronized {
+ val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains)
+ unknownExecutors.foreach { id =>
+ logWarning(s"Executor to kill $id does not exist!")
+ }
+
+ // If an executor is already pending to be removed, do not kill it again (SPARK-9795)
+ // If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552)
+ val executorsToKill = knownExecutors
+ .filter { id => !executorsPendingToRemove.contains(id) }
+ .filter { id => force || !scheduler.isExecutorBusy(id) }
+ executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace }
+
+ // If we do not wish to replace the executors we kill, sync the target number of executors
+ // with the cluster manager to avoid allocating new ones. When computing the new target,
+ // take into account executors that are pending to be added or removed.
+ val adjustTotalExecutors =
+ if (!replace) {
+ doRequestTotalExecutors(
+ numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
+ } else {
+ numPendingExecutors += knownExecutors.size
+ Future.successful(true)
+ }
+
+ val killExecutors: Boolean => Future[Boolean] =
+ if (!executorsToKill.isEmpty) {
+ _ => doKillExecutors(executorsToKill)
+ } else {
+ _ => Future.successful(false)
+ }
+
+ adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
}
- !executorsToKill.isEmpty && doKillExecutors(executorsToKill)
+ defaultAskTimeout.awaitResult(response)
}
/**
* Kill the given list of executors through the cluster manager.
* @return whether the kill request is acknowledged.
*/
- protected def doKillExecutors(executorIds: Seq[String]): Boolean = false
-
+ protected def doKillExecutors(executorIds: Seq[String]): Future[Boolean] =
+ Future.successful(false)
}
private[spark] object CoarseGrainedSchedulerBackend {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 8382fbe9dd..5068bf2e66 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -19,6 +19,8 @@ package org.apache.spark.scheduler.cluster
import java.util.concurrent.Semaphore
+import scala.concurrent.Future
+
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientListener}
@@ -173,12 +175,12 @@ private[spark] class StandaloneSchedulerBackend(
*
* @return whether the request is acknowledged.
*/
- protected override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
+ protected override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
Option(client) match {
case Some(c) => c.requestTotalExecutors(requestedTotal)
case None =>
logWarning("Attempted to request executors before driver fully initialized.")
- false
+ Future.successful(false)
}
}
@@ -186,12 +188,12 @@ private[spark] class StandaloneSchedulerBackend(
* Kill the given list of executors through the Master.
* @return whether the kill request is acknowledged.
*/
- protected override def doKillExecutors(executorIds: Seq[String]): Boolean = {
+ protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
Option(client) match {
case Some(c) => c.killExecutors(executorIds)
case None =>
logWarning("Attempted to kill executors before driver fully initialized.")
- false
+ Future.successful(false)
}
}
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 5f59c176ab..915d7a1b8b 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.{ExecutorService, TimeUnit}
import scala.collection.Map
import scala.collection.mutable
+import scala.concurrent.Future
import scala.concurrent.duration._
import org.mockito.Matchers
@@ -269,13 +270,13 @@ private class FakeSchedulerBackend(
clusterManagerEndpoint: RpcEndpointRef)
extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
- protected override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
- clusterManagerEndpoint.askWithRetry[Boolean](
+ protected override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
+ clusterManagerEndpoint.ask[Boolean](
RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount))
}
- protected override def doKillExecutors(executorIds: Seq[String]): Boolean = {
- clusterManagerEndpoint.askWithRetry[Boolean](KillExecutors(executorIds))
+ protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
+ clusterManagerEndpoint.ask[Boolean](KillExecutors(executorIds))
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
index f6ef9d15dd..416efaa75b 100644
--- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
import scala.concurrent.duration._
import org.scalatest.BeforeAndAfterAll
-import org.scalatest.concurrent.Eventually._
+import org.scalatest.concurrent.{Eventually, ScalaFutures}
import org.apache.spark._
import org.apache.spark.deploy.{ApplicationDescription, Command}
@@ -36,7 +36,12 @@ import org.apache.spark.util.Utils
/**
* End-to-end tests for application client in standalone mode.
*/
-class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterAll {
+class AppClientSuite
+ extends SparkFunSuite
+ with LocalSparkContext
+ with BeforeAndAfterAll
+ with Eventually
+ with ScalaFutures {
private val numWorkers = 2
private val conf = new SparkConf()
private val securityManager = new SecurityManager(conf)
@@ -93,7 +98,12 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd
// Send message to Master to request Executors, verify request by change in executor limit
val numExecutorsRequested = 1
- assert(ci.client.requestTotalExecutors(numExecutorsRequested))
+ whenReady(
+ ci.client.requestTotalExecutors(numExecutorsRequested),
+ timeout(10.seconds),
+ interval(10.millis)) { acknowledged =>
+ assert(acknowledged)
+ }
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
@@ -101,10 +111,12 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd
}
// Send request to kill executor, verify request was made
- assert {
- val apps = getApplications()
- val executorId: String = apps.head.executors.head._2.fullId
- ci.client.killExecutors(Seq(executorId))
+ val executorId: String = getApplications().head.executors.head._2.fullId
+ whenReady(
+ ci.client.killExecutors(Seq(executorId)),
+ timeout(10.seconds),
+ interval(10.millis)) { acknowledged =>
+ assert(acknowledged)
}
// Issue stop command for Client to disconnect from Master
@@ -122,7 +134,9 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd
val ci = new AppClientInst(masterRpcEnv.address.toSparkURL)
// requests to master should fail immediately
- assert(ci.client.requestTotalExecutors(3) === false)
+ whenReady(ci.client.requestTotalExecutors(3), timeout(1.seconds)) { success =>
+ assert(success === false)
+ }
}
// ===============================