aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-08-01 11:57:14 -0700
committerAndrew Or <andrew@databricks.com>2015-08-01 11:57:14 -0700
commit6688ba6e68e342201b81ea09cc2c6ba216f90f3e (patch)
treeb5b951e88275e45705fb9466f1ea3bd4ec571818
parentc5166f7a69faeaa8a41a774c73c1ed4d4c2cf0ce (diff)
downloadspark-6688ba6e68e342201b81ea09cc2c6ba216f90f3e.tar.gz
spark-6688ba6e68e342201b81ea09cc2c6ba216f90f3e.tar.bz2
spark-6688ba6e68e342201b81ea09cc2c6ba216f90f3e.zip
[SPARK-4751] Dynamic allocation in standalone mode
Dynamic allocation is a feature that allows a Spark application to scale the number of executors up and down dynamically based on the workload. Support was first introduced in YARN since 1.2, and then extended to Mesos coarse-grained mode recently. Today, it is finally supported in standalone mode as well! I tested this locally and it works as expected. This is WIP because unit tests are coming. Author: Andrew Or <andrew@databricks.com> Closes #7532 from andrewor14/standalone-da and squashes the following commits: b3c1736 [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-da 879e928 [Andrew Or] Add end-to-end tests for standalone dynamic allocation accc8f6 [Andrew Or] Address comments ee686a8 [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-da c0a2c02 [Andrew Or] Fix build after merge conflict 24149eb [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-da 2e762d6 [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-da 6832bd7 [Andrew Or] Add tests for scheduling with executor limit a82e907 [Andrew Or] Fix comments 0a8be79 [Andrew Or] Simplify logic by removing the worker blacklist b7742af [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-da 2eb5f3f [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-da 1334e9a [Andrew Or] Fix MiMa 32abe44 [Andrew Or] Fix style 58cb06f [Andrew Or] Privatize worker blacklist for cleanliness 42ac215 [Andrew Or] Clean up comments and rewrite code for readability 49702d1 [Andrew Or] Clean up shuffle files after application exits 80047aa [Andrew Or] First working implementation
-rw-r--r--core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala45
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala144
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala28
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala363
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala256
-rw-r--r--project/MimaExcludes.scala4
13 files changed, 753 insertions, 170 deletions
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 43dd4a1707..ee60d697d8 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -177,16 +177,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " +
s"timed out after ${now - lastSeenMs} ms"))
- if (sc.supportDynamicAllocation) {
// Asynchronously kill the executor to avoid blocking the current thread
- killExecutorThread.submit(new Runnable {
- override def run(): Unit = Utils.tryLogNonFatalError {
- // Note: we want to get an executor back after expiring this one,
- // so do not simply call `sc.killExecutor` here (SPARK-8119)
- sc.killAndReplaceExecutor(executorId)
- }
- })
- }
+ killExecutorThread.submit(new Runnable {
+ override def run(): Unit = Utils.tryLogNonFatalError {
+ // Note: we want to get an executor back after expiring this one,
+ // so do not simply call `sc.killExecutor` here (SPARK-8119)
+ sc.killAndReplaceExecutor(executorId)
+ }
+ })
executorLastSeen.remove(executorId)
}
}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 2d8aa25d81..a1c66ef4fc 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -531,8 +531,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
val dynamicAllocationEnabled = _conf.getBoolean("spark.dynamicAllocation.enabled", false)
_executorAllocationManager =
if (dynamicAllocationEnabled) {
- assert(supportDynamicAllocation,
- "Dynamic allocation of executors is currently only supported in YARN and Mesos mode")
Some(new ExecutorAllocationManager(this, listenerBus, _conf))
} else {
None
@@ -1362,17 +1360,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
/**
- * Return whether dynamically adjusting the amount of resources allocated to
- * this application is supported. This is currently only available for YARN
- * and Mesos coarse-grained mode.
- */
- private[spark] def supportDynamicAllocation: Boolean = {
- (master.contains("yarn")
- || master.contains("mesos")
- || _conf.getBoolean("spark.dynamicAllocation.testing", false))
- }
-
- /**
* :: DeveloperApi ::
* Register a listener to receive up-calls from events that happen during execution.
*/
@@ -1400,8 +1387,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
localityAwareTasks: Int,
hostToLocalTaskCount: scala.collection.immutable.Map[String, Int]
): Boolean = {
- assert(supportDynamicAllocation,
- "Requesting executors is currently only supported in YARN and Mesos modes")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestTotalExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount)
@@ -1414,12 +1399,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/**
* :: DeveloperApi ::
* Request an additional number of executors from the cluster manager.
- * This is currently only supported in YARN mode. Return whether the request is received.
+ * @return whether the request is received.
*/
@DeveloperApi
override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
- assert(supportDynamicAllocation,
- "Requesting executors is currently only supported in YARN and Mesos modes")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestExecutors(numAdditionalExecutors)
@@ -1438,12 +1421,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* through this method with new ones, it should follow up explicitly with a call to
* {{SparkContext#requestExecutors}}.
*
- * This is currently only supported in YARN mode. Return whether the request is received.
+ * @return whether the request is received.
*/
@DeveloperApi
override def killExecutors(executorIds: Seq[String]): Boolean = {
- assert(supportDynamicAllocation,
- "Killing executors is currently only supported in YARN and Mesos modes")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutors(executorIds)
@@ -1462,7 +1443,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* through this method with a new one, it should follow up explicitly with a call to
* {{SparkContext#requestExecutors}}.
*
- * This is currently only supported in YARN mode. Return whether the request is received.
+ * @return whether the request is received.
*/
@DeveloperApi
override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId)
@@ -1479,7 +1460,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* can steal the window of opportunity and acquire this application's resources in the
* mean time.
*
- * This is currently only supported in YARN mode. Return whether the request is received.
+ * @return whether the request is received.
*/
private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
schedulerBackend match {
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 12727de9b4..d8084a5765 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -107,6 +107,10 @@ private[deploy] object DeployMessages {
case class MasterChangeAcknowledged(appId: String)
+ case class RequestExecutors(appId: String, requestedTotal: Int)
+
+ case class KillExecutors(appId: String, executorIds: Seq[String])
+
// Master to AppClient
case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index 09973a0a2c..4089c3e771 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -70,6 +70,11 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
server = transportContext.createServer(port, bootstraps)
}
+ /** Clean up all shuffle files associated with an application that has exited. */
+ def applicationRemoved(appId: String): Unit = {
+ blockHandler.applicationRemoved(appId, true /* cleanupLocalDirs */)
+ }
+
def stop() {
if (server != null) {
server.close()
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index a659abf703..7576a2985e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -197,6 +197,22 @@ private[spark] class AppClient(
sendToMaster(UnregisterApplication(appId))
context.reply(true)
stop()
+
+ case r: RequestExecutors =>
+ master match {
+ case Some(m) => context.reply(m.askWithRetry[Boolean](r))
+ case None =>
+ logWarning("Attempted to request executors before registering with Master.")
+ context.reply(false)
+ }
+
+ case k: KillExecutors =>
+ master match {
+ case Some(m) => context.reply(m.askWithRetry[Boolean](k))
+ case None =>
+ logWarning("Attempted to kill executors before registering with Master.")
+ context.reply(false)
+ }
}
override def onDisconnected(address: RpcAddress): Unit = {
@@ -257,4 +273,33 @@ private[spark] class AppClient(
endpoint = null
}
}
+
+ /**
+ * Request executors from the Master by specifying the total number desired,
+ * including existing pending and running executors.
+ *
+ * @return whether the request is acknowledged.
+ */
+ def requestTotalExecutors(requestedTotal: Int): Boolean = {
+ if (endpoint != null && appId != null) {
+ endpoint.askWithRetry[Boolean](RequestExecutors(appId, requestedTotal))
+ } else {
+ logWarning("Attempted to request executors before driver fully initialized.")
+ false
+ }
+ }
+
+ /**
+ * Kill the given list of executors through the Master.
+ * @return whether the kill request is acknowledged.
+ */
+ def killExecutors(executorIds: Seq[String]): Boolean = {
+ if (endpoint != null && appId != null) {
+ endpoint.askWithRetry[Boolean](KillExecutors(appId, executorIds))
+ } else {
+ logWarning("Attempted to kill executors before driver fully initialized.")
+ false
+ }
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index aa54ed9360..b40d20f9f7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -22,7 +22,6 @@ import java.util.Date
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.ApplicationDescription
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.Utils
@@ -43,6 +42,11 @@ private[spark] class ApplicationInfo(
@transient var endTime: Long = _
@transient var appSource: ApplicationSource = _
+ // A cap on the number of executors this application can have at any given time.
+ // By default, this is infinite. Only after the first allocation request is issued by the
+ // application will this be set to a finite value. This is used for dynamic allocation.
+ @transient private[master] var executorLimit: Int = _
+
@transient private var nextExecutorId: Int = _
init()
@@ -60,6 +64,7 @@ private[spark] class ApplicationInfo(
appSource = new ApplicationSource(this)
nextExecutorId = 0
removedExecutors = new ArrayBuffer[ExecutorDesc]
+ executorLimit = Integer.MAX_VALUE
}
private def newExecutorId(useID: Option[Int] = None): Int = {
@@ -116,6 +121,12 @@ private[spark] class ApplicationInfo(
state != ApplicationState.WAITING && state != ApplicationState.RUNNING
}
+ /**
+ * Return the limit on the number of executors this application can have.
+ * For testing only.
+ */
+ private[deploy] def getExecutorLimit: Int = executorLimit
+
def duration: Long = {
if (endTime != -1) {
endTime - startTime
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 51b3f0dead..e38e437fe1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -45,7 +45,7 @@ import org.apache.spark.serializer.{JavaSerializer, Serializer}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils}
-private[master] class Master(
+private[deploy] class Master(
override val rpcEnv: RpcEnv,
address: RpcAddress,
webUiPort: Int,
@@ -468,6 +468,13 @@ private[master] class Master(
case BoundPortsRequest => {
context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort))
}
+
+ case RequestExecutors(appId, requestedTotal) =>
+ context.reply(handleRequestExecutors(appId, requestedTotal))
+
+ case KillExecutors(appId, executorIds) =>
+ val formattedExecutorIds = formatExecutorIds(executorIds)
+ context.reply(handleKillExecutors(appId, formattedExecutorIds))
}
override def onDisconnected(address: RpcAddress): Unit = {
@@ -563,32 +570,49 @@ private[master] class Master(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
- // If the number of cores per executor is not specified, then we can just schedule
- // 1 core at a time since we expect a single executor to be launched on each worker
- val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
+ val coresPerExecutor = app.desc.coresPerExecutor
+ val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
+ val oneExecutorPerWorker = coresPerExecutor.isEmpty
val memoryPerExecutor = app.desc.memoryPerExecutorMB
val numUsable = usableWorkers.length
val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
- val assignedMemory = new Array[Int](numUsable) // Amount of memory to give to each worker
+ val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
- var freeWorkers = (0 until numUsable).toIndexedSeq
+ /** Return whether the specified worker can launch an executor for this app. */
def canLaunchExecutor(pos: Int): Boolean = {
- usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor &&
- usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor
+ // If we allow multiple executors per worker, then we can always launch new executors.
+ // Otherwise, we may have already started assigning cores to the executor on this worker.
+ val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
+ val underLimit =
+ if (launchingNewExecutor) {
+ assignedExecutors.sum + app.executors.size < app.executorLimit
+ } else {
+ true
+ }
+ val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
+ usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor &&
+ usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor &&
+ coresToAssign >= minCoresPerExecutor &&
+ underLimit
}
- while (coresToAssign >= coresPerExecutor && freeWorkers.nonEmpty) {
- freeWorkers = freeWorkers.filter(canLaunchExecutor)
+ // Keep launching executors until no more workers can accommodate any
+ // more executors, or if we have reached this application's limits
+ var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
+ while (freeWorkers.nonEmpty) {
freeWorkers.foreach { pos =>
var keepScheduling = true
- while (keepScheduling && canLaunchExecutor(pos) && coresToAssign >= coresPerExecutor) {
- coresToAssign -= coresPerExecutor
- assignedCores(pos) += coresPerExecutor
- // If cores per executor is not set, we are assigning 1 core at a time
- // without actually meaning to launch 1 executor for each core assigned
- if (app.desc.coresPerExecutor.isDefined) {
- assignedMemory(pos) += memoryPerExecutor
+ while (keepScheduling && canLaunchExecutor(pos)) {
+ coresToAssign -= minCoresPerExecutor
+ assignedCores(pos) += minCoresPerExecutor
+
+ // If we are launching one executor per worker, then every iteration assigns 1 core
+ // to the executor. Otherwise, every iteration assigns cores to a new executor.
+ if (oneExecutorPerWorker) {
+ assignedExecutors(pos) = 1
+ } else {
+ assignedExecutors(pos) += 1
}
// Spreading out an application means spreading out its executors across as
@@ -600,6 +624,7 @@ private[master] class Master(
}
}
}
+ freeWorkers = freeWorkers.filter(canLaunchExecutor)
}
assignedCores
}
@@ -785,9 +810,7 @@ private[master] class Master(
rebuildSparkUI(app)
for (exec <- app.executors.values) {
- exec.worker.removeExecutor(exec)
- exec.worker.endpoint.send(KillExecutor(masterUrl, exec.application.id, exec.id))
- exec.state = ExecutorState.KILLED
+ killExecutor(exec)
}
app.markFinished(state)
if (state != ApplicationState.FINISHED) {
@@ -804,6 +827,87 @@ private[master] class Master(
}
/**
+ * Handle a request to set the target number of executors for this application.
+ *
+ * If the executor limit is adjusted upwards, new executors will be launched provided
+ * that there are workers with sufficient resources. If it is adjusted downwards, however,
+ * we do not kill existing executors until we explicitly receive a kill request.
+ *
+ * @return whether the application has previously registered with this Master.
+ */
+ private def handleRequestExecutors(appId: String, requestedTotal: Int): Boolean = {
+ idToApp.get(appId) match {
+ case Some(appInfo) =>
+ logInfo(s"Application $appId requested to set total executors to $requestedTotal.")
+ appInfo.executorLimit = requestedTotal
+ schedule()
+ true
+ case None =>
+ logWarning(s"Unknown application $appId requested $requestedTotal total executors.")
+ false
+ }
+ }
+
+ /**
+ * Handle a kill request from the given application.
+ *
+ * This method assumes the executor limit has already been adjusted downwards through
+ * a separate [[RequestExecutors]] message, such that we do not launch new executors
+ * immediately after the old ones are removed.
+ *
+ * @return whether the application has previously registered with this Master.
+ */
+ private def handleKillExecutors(appId: String, executorIds: Seq[Int]): Boolean = {
+ idToApp.get(appId) match {
+ case Some(appInfo) =>
+ logInfo(s"Application $appId requests to kill executors: " + executorIds.mkString(", "))
+ val (known, unknown) = executorIds.partition(appInfo.executors.contains)
+ known.foreach { executorId =>
+ val desc = appInfo.executors(executorId)
+ appInfo.removeExecutor(desc)
+ killExecutor(desc)
+ }
+ if (unknown.nonEmpty) {
+ logWarning(s"Application $appId attempted to kill non-existent executors: "
+ + unknown.mkString(", "))
+ }
+ schedule()
+ true
+ case None =>
+ logWarning(s"Unregistered application $appId requested us to kill executors!")
+ false
+ }
+ }
+
+ /**
+ * Cast the given executor IDs to integers and filter out the ones that fail.
+ *
+ * All executors IDs should be integers since we launched these executors. However,
+ * the kill interface on the driver side accepts arbitrary strings, so we need to
+ * handle non-integer executor IDs just to be safe.
+ */
+ private def formatExecutorIds(executorIds: Seq[String]): Seq[Int] = {
+ executorIds.flatMap { executorId =>
+ try {
+ Some(executorId.toInt)
+ } catch {
+ case e: NumberFormatException =>
+ logError(s"Encountered executor with a non-integer ID: $executorId. Ignoring")
+ None
+ }
+ }
+ }
+
+ /**
+ * Ask the worker on which the specified executor is launched to kill the executor.
+ */
+ private def killExecutor(exec: ExecutorDesc): Unit = {
+ exec.worker.removeExecutor(exec)
+ exec.worker.endpoint.send(KillExecutor(masterUrl, exec.application.id, exec.id))
+ exec.state = ExecutorState.KILLED
+ }
+
+ /**
* Rebuild a new SparkUI from the given application's event logs.
* Return the UI if successful, else None
*/
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 0276c24f85..c82a7ccab5 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -40,7 +40,7 @@ import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils}
-private[worker] class Worker(
+private[deploy] class Worker(
override val rpcEnv: RpcEnv,
webUiPort: Int,
cores: Int,
@@ -553,6 +553,7 @@ private[worker] class Worker(
Utils.deleteRecursively(new File(dir))
}
}
+ shuffleService.applicationRemoved(id)
}
}
@@ -660,6 +661,9 @@ private[worker] class Worker(
}
private[deploy] object Worker extends Logging {
+ val SYSTEM_NAME = "sparkWorker"
+ val ENDPOINT_NAME = "Worker"
+
def main(argStrings: Array[String]) {
SignalLogger.register(log)
val conf = new SparkConf
@@ -681,13 +685,12 @@ private[deploy] object Worker extends Logging {
conf: SparkConf = new SparkConf): RpcEnv = {
// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
- val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
- val actorName = "Worker"
+ val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
- rpcEnv.setupEndpoint(actorName, new Worker(rpcEnv, webUiPort, cores, memory, masterAddresses,
- systemName, actorName, workDir, conf, securityMgr))
+ rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
+ masterAddresses, systemName, ENDPOINT_NAME, workDir, conf, securityMgr))
rpcEnv
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index bd89160af4..6acf8a9a5e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -134,7 +134,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
} else {
logInfo("Registered executor: " + executorRef + " with ID " + executorId)
- context.reply(RegisteredExecutor)
addressToExecutorId(executorRef.address) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
@@ -149,6 +148,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
+ // Note: some tests expect the reply to come after we put the executor in the map
+ context.reply(RegisteredExecutor)
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
makeOffers()
@@ -435,7 +436,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
/**
* Kill the given list of executors through the cluster manager.
- * Return whether the kill request is acknowledged.
+ * @return whether the kill request is acknowledged.
*/
protected def doKillExecutors(executorIds: Seq[String]): Boolean = false
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 687ae96204..bbe51b4a09 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -152,6 +152,34 @@ private[spark] class SparkDeploySchedulerBackend(
super.applicationId
}
+ /**
+ * Request executors from the Master by specifying the total number desired,
+ * including existing pending and running executors.
+ *
+ * @return whether the request is acknowledged.
+ */
+ protected override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
+ Option(client) match {
+ case Some(c) => c.requestTotalExecutors(requestedTotal)
+ case None =>
+ logWarning("Attempted to request executors before driver fully initialized.")
+ false
+ }
+ }
+
+ /**
+ * Kill the given list of executors through the Master.
+ * @return whether the kill request is acknowledged.
+ */
+ protected override def doKillExecutors(executorIds: Seq[String]): Boolean = {
+ Option(client) match {
+ case Some(c) => c.killExecutors(executorIds)
+ case None =>
+ logWarning("Attempted to kill executors before driver fully initialized.")
+ false
+ }
+ }
+
private def waitForRegistration() = {
registrationBarrier.acquire()
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
new file mode 100644
index 0000000000..08c41a897a
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import org.mockito.Mockito.{mock, when}
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark._
+import org.apache.spark.deploy.master.Master
+import org.apache.spark.deploy.worker.Worker
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
+import org.apache.spark.scheduler.cluster._
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor
+
+/**
+ * End-to-end tests for dynamic allocation in standalone mode.
+ */
+class StandaloneDynamicAllocationSuite
+ extends SparkFunSuite
+ with LocalSparkContext
+ with BeforeAndAfterAll {
+
+ private val numWorkers = 2
+ private val conf = new SparkConf()
+ private val securityManager = new SecurityManager(conf)
+
+ private var masterRpcEnv: RpcEnv = null
+ private var workerRpcEnvs: Seq[RpcEnv] = null
+ private var master: Master = null
+ private var workers: Seq[Worker] = null
+
+ /**
+ * Start the local cluster.
+ * Note: local-cluster mode is insufficient because we want a reference to the Master.
+ */
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ masterRpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, securityManager)
+ workerRpcEnvs = (0 until numWorkers).map { i =>
+ RpcEnv.create(Worker.SYSTEM_NAME + i, "localhost", 0, conf, securityManager)
+ }
+ master = makeMaster()
+ workers = makeWorkers(10, 2048)
+ }
+
+ override def afterAll(): Unit = {
+ masterRpcEnv.shutdown()
+ workerRpcEnvs.foreach(_.shutdown())
+ master.stop()
+ workers.foreach(_.stop())
+ masterRpcEnv = null
+ workerRpcEnvs = null
+ master = null
+ workers = null
+ super.afterAll()
+ }
+
+ test("dynamic allocation default behavior") {
+ sc = new SparkContext(appConf)
+ val appId = sc.applicationId
+ assert(master.apps.size === 1)
+ assert(master.apps.head.id === appId)
+ assert(master.apps.head.executors.size === 2)
+ assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+ // kill all executors
+ assert(killAllExecutors(sc))
+ assert(master.apps.head.executors.size === 0)
+ assert(master.apps.head.getExecutorLimit === 0)
+ // request 1
+ assert(sc.requestExecutors(1))
+ assert(master.apps.head.executors.size === 1)
+ assert(master.apps.head.getExecutorLimit === 1)
+ // request 1 more
+ assert(sc.requestExecutors(1))
+ assert(master.apps.head.executors.size === 2)
+ assert(master.apps.head.getExecutorLimit === 2)
+ // request 1 more; this one won't go through
+ assert(sc.requestExecutors(1))
+ assert(master.apps.head.executors.size === 2)
+ assert(master.apps.head.getExecutorLimit === 3)
+ // kill all existing executors; we should end up with 3 - 2 = 1 executor
+ assert(killAllExecutors(sc))
+ assert(master.apps.head.executors.size === 1)
+ assert(master.apps.head.getExecutorLimit === 1)
+ // kill all executors again; this time we'll have 1 - 1 = 0 executors left
+ assert(killAllExecutors(sc))
+ assert(master.apps.head.executors.size === 0)
+ assert(master.apps.head.getExecutorLimit === 0)
+ // request many more; this increases the limit well beyond the cluster capacity
+ assert(sc.requestExecutors(1000))
+ assert(master.apps.head.executors.size === 2)
+ assert(master.apps.head.getExecutorLimit === 1000)
+ }
+
+ test("dynamic allocation with max cores <= cores per worker") {
+ sc = new SparkContext(appConf.set("spark.cores.max", "8"))
+ val appId = sc.applicationId
+ assert(master.apps.size === 1)
+ assert(master.apps.head.id === appId)
+ assert(master.apps.head.executors.size === 2)
+ assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
+ assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+ // kill all executors
+ assert(killAllExecutors(sc))
+ assert(master.apps.head.executors.size === 0)
+ assert(master.apps.head.getExecutorLimit === 0)
+ // request 1
+ assert(sc.requestExecutors(1))
+ assert(master.apps.head.executors.size === 1)
+ assert(master.apps.head.executors.values.head.cores === 8)
+ assert(master.apps.head.getExecutorLimit === 1)
+ // request 1 more; this one won't go through because we're already at max cores.
+ // This highlights a limitation of using dynamic allocation with max cores WITHOUT
+ // setting cores per executor: once an application scales down and then scales back
+ // up, its executors may not be spread out anymore!
+ assert(sc.requestExecutors(1))
+ assert(master.apps.head.executors.size === 1)
+ assert(master.apps.head.getExecutorLimit === 2)
+ // request 1 more; this one also won't go through for the same reason
+ assert(sc.requestExecutors(1))
+ assert(master.apps.head.executors.size === 1)
+ assert(master.apps.head.getExecutorLimit === 3)
+ // kill all existing executors; we should end up with 3 - 1 = 2 executor
+ // Note: we scheduled these executors together, so their cores should be evenly distributed
+ assert(killAllExecutors(sc))
+ assert(master.apps.head.executors.size === 2)
+ assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
+ assert(master.apps.head.getExecutorLimit === 2)
+ // kill all executors again; this time we'll have 1 - 1 = 0 executors left
+ assert(killAllExecutors(sc))
+ assert(master.apps.head.executors.size === 0)
+ assert(master.apps.head.getExecutorLimit === 0)
+ // request many more; this increases the limit well beyond the cluster capacity
+ assert(sc.requestExecutors(1000))
+ assert(master.apps.head.executors.size === 2)
+ assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
+ assert(master.apps.head.getExecutorLimit === 1000)
+ }
+
+ test("dynamic allocation with max cores > cores per worker") {
+ sc = new SparkContext(appConf.set("spark.cores.max", "16"))
+ val appId = sc.applicationId
+ assert(master.apps.size === 1)
+ assert(master.apps.head.id === appId)
+ assert(master.apps.head.executors.size === 2)
+ assert(master.apps.head.executors.values.map(_.cores).toArray === Array(8, 8))
+ assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+ // kill all executors
+ assert(killAllExecutors(sc))
+ assert(master.apps.head.executors.size === 0)
+ assert(master.apps.head.getExecutorLimit === 0)
+ // request 1
+ assert(sc.requestExecutors(1))
+ assert(master.apps.head.executors.size === 1)
+ assert(master.apps.head.executors.values.head.cores === 10)
+ assert(master.apps.head.getExecutorLimit === 1)
+ // request 1 more
+ // Note: the cores are not evenly distributed because we scheduled these executors 1 by 1
+ assert(sc.requestExecutors(1))
+ assert(master.apps.head.executors.size === 2)
+ assert(master.apps.head.executors.values.map(_.cores).toSet === Set(10, 6))
+ assert(master.apps.head.getExecutorLimit === 2)
+ // request 1 more; this one won't go through
+ assert(sc.requestExecutors(1))
+ assert(master.apps.head.executors.size === 2)
+ assert(master.apps.head.getExecutorLimit === 3)
+ // kill all existing executors; we should end up with 3 - 2 = 1 executor
+ assert(killAllExecutors(sc))
+ assert(master.apps.head.executors.size === 1)
+ assert(master.apps.head.executors.values.head.cores === 10)
+ assert(master.apps.head.getExecutorLimit === 1)
+ // kill all executors again; this time we'll have 1 - 1 = 0 executors left
+ assert(killAllExecutors(sc))
+ assert(master.apps.head.executors.size === 0)
+ assert(master.apps.head.getExecutorLimit === 0)
+ // request many more; this increases the limit well beyond the cluster capacity
+ assert(sc.requestExecutors(1000))
+ assert(master.apps.head.executors.size === 2)
+ assert(master.apps.head.executors.values.map(_.cores).toArray === Array(8, 8))
+ assert(master.apps.head.getExecutorLimit === 1000)
+ }
+
+ test("dynamic allocation with cores per executor") {
+ sc = new SparkContext(appConf.set("spark.executor.cores", "2"))
+ val appId = sc.applicationId
+ assert(master.apps.size === 1)
+ assert(master.apps.head.id === appId)
+ assert(master.apps.head.executors.size === 10) // 20 cores total
+ assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+ // kill all executors
+ assert(killAllExecutors(sc))
+ assert(master.apps.head.executors.size === 0)
+ assert(master.apps.head.getExecutorLimit === 0)
+ // request 1
+ assert(sc.requestExecutors(1))
+ assert(master.apps.head.executors.size === 1)
+ assert(master.apps.head.getExecutorLimit === 1)
+ // request 3 more
+ assert(sc.requestExecutors(3))
+ assert(master.apps.head.executors.size === 4)
+ assert(master.apps.head.getExecutorLimit === 4)
+ // request 10 more; only 6 will go through
+ assert(sc.requestExecutors(10))
+ assert(master.apps.head.executors.size === 10)
+ assert(master.apps.head.getExecutorLimit === 14)
+ // kill 2 executors; we should get 2 back immediately
+ assert(killNExecutors(sc, 2))
+ assert(master.apps.head.executors.size === 10)
+ assert(master.apps.head.getExecutorLimit === 12)
+ // kill 4 executors; we should end up with 12 - 4 = 8 executors
+ assert(killNExecutors(sc, 4))
+ assert(master.apps.head.executors.size === 8)
+ assert(master.apps.head.getExecutorLimit === 8)
+ // kill all executors; this time we'll have 8 - 8 = 0 executors left
+ assert(killAllExecutors(sc))
+ assert(master.apps.head.executors.size === 0)
+ assert(master.apps.head.getExecutorLimit === 0)
+ // request many more; this increases the limit well beyond the cluster capacity
+ assert(sc.requestExecutors(1000))
+ assert(master.apps.head.executors.size === 10)
+ assert(master.apps.head.getExecutorLimit === 1000)
+ }
+
+ test("dynamic allocation with cores per executor AND max cores") {
+ sc = new SparkContext(appConf
+ .set("spark.executor.cores", "2")
+ .set("spark.cores.max", "8"))
+ val appId = sc.applicationId
+ assert(master.apps.size === 1)
+ assert(master.apps.head.id === appId)
+ assert(master.apps.head.executors.size === 4) // 8 cores total
+ assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+ // kill all executors
+ assert(killAllExecutors(sc))
+ assert(master.apps.head.executors.size === 0)
+ assert(master.apps.head.getExecutorLimit === 0)
+ // request 1
+ assert(sc.requestExecutors(1))
+ assert(master.apps.head.executors.size === 1)
+ assert(master.apps.head.getExecutorLimit === 1)
+ // request 3 more
+ assert(sc.requestExecutors(3))
+ assert(master.apps.head.executors.size === 4)
+ assert(master.apps.head.getExecutorLimit === 4)
+ // request 10 more; none will go through
+ assert(sc.requestExecutors(10))
+ assert(master.apps.head.executors.size === 4)
+ assert(master.apps.head.getExecutorLimit === 14)
+ // kill all executors; 4 executors will be launched immediately
+ assert(killAllExecutors(sc))
+ assert(master.apps.head.executors.size === 4)
+ assert(master.apps.head.getExecutorLimit === 10)
+ // ... and again
+ assert(killAllExecutors(sc))
+ assert(master.apps.head.executors.size === 4)
+ assert(master.apps.head.getExecutorLimit === 6)
+ // ... and again; now we end up with 6 - 4 = 2 executors left
+ assert(killAllExecutors(sc))
+ assert(master.apps.head.executors.size === 2)
+ assert(master.apps.head.getExecutorLimit === 2)
+ // ... and again; this time we have 2 - 2 = 0 executors left
+ assert(killAllExecutors(sc))
+ assert(master.apps.head.executors.size === 0)
+ assert(master.apps.head.getExecutorLimit === 0)
+ // request many more; this increases the limit well beyond the cluster capacity
+ assert(sc.requestExecutors(1000))
+ assert(master.apps.head.executors.size === 4)
+ assert(master.apps.head.getExecutorLimit === 1000)
+ }
+
+ // ===============================
+ // | Utility methods for testing |
+ // ===============================
+
+ /** Return a SparkConf for applications that want to talk to our Master. */
+ private def appConf: SparkConf = {
+ new SparkConf()
+ .setMaster(masterRpcEnv.address.toSparkURL)
+ .setAppName("test")
+ .set("spark.executor.memory", "256m")
+ }
+
+ /** Make a master to which our application will send executor requests. */
+ private def makeMaster(): Master = {
+ val master = new Master(masterRpcEnv, masterRpcEnv.address, 0, securityManager, conf)
+ masterRpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
+ master
+ }
+
+ /** Make a few workers that talk to our master. */
+ private def makeWorkers(cores: Int, memory: Int): Seq[Worker] = {
+ (0 until numWorkers).map { i =>
+ val rpcEnv = workerRpcEnvs(i)
+ val worker = new Worker(rpcEnv, 0, cores, memory, Array(masterRpcEnv.address),
+ Worker.SYSTEM_NAME + i, Worker.ENDPOINT_NAME, null, conf, securityManager)
+ rpcEnv.setupEndpoint(Worker.ENDPOINT_NAME, worker)
+ worker
+ }
+ }
+
+ /** Kill all executors belonging to this application. */
+ private def killAllExecutors(sc: SparkContext): Boolean = {
+ killNExecutors(sc, Int.MaxValue)
+ }
+
+ /** Kill N executors belonging to this application. */
+ private def killNExecutors(sc: SparkContext, n: Int): Boolean = {
+ syncExecutors(sc)
+ sc.killExecutors(getExecutorIds(sc).take(n))
+ }
+
+ /**
+ * Return a list of executor IDs belonging to this application.
+ *
+ * Note that we must use the executor IDs according to the Master, which has the most
+ * updated view. We cannot rely on the executor IDs according to the driver because we
+ * don't wait for executors to register. Otherwise the tests will take much longer to run.
+ */
+ private def getExecutorIds(sc: SparkContext): Seq[String] = {
+ assert(master.idToApp.contains(sc.applicationId))
+ master.idToApp(sc.applicationId).executors.keys.map(_.toString).toSeq
+ }
+
+ /**
+ * Sync executor IDs between the driver and the Master.
+ *
+ * This allows us to avoid waiting for new executors to register with the driver before
+ * we submit a request to kill them. This must be called before each kill request.
+ */
+ private def syncExecutors(sc: SparkContext): Unit = {
+ val driverExecutors = sc.getExecutorStorageStatus
+ .map(_.blockManagerId.executorId)
+ .filter { _ != SparkContext.DRIVER_IDENTIFIER}
+ val masterExecutors = getExecutorIds(sc)
+ val missingExecutors = masterExecutors.toSet.diff(driverExecutors.toSet).toSeq.sorted
+ missingExecutors.foreach { id =>
+ // Fake an executor registration so the driver knows about us
+ val port = System.currentTimeMillis % 65536
+ val endpointRef = mock(classOf[RpcEndpointRef])
+ val mockAddress = mock(classOf[RpcAddress])
+ when(endpointRef.address).thenReturn(mockAddress)
+ val message = RegisterExecutor(id, endpointRef, s"localhost:$port", 10, Map.empty)
+ val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
+ backend.driverEndpoint.askWithRetry[CoarseGrainedClusterMessage](message)
+ }
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 4d7016d1e5..30780a0da7 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -120,7 +120,7 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva
CustomRecoveryModeFactory.instantiationAttempts should be > instantiationAttempts
}
- test("Master & worker web ui available") {
+ test("master/worker web ui available") {
implicit val formats = org.json4s.DefaultFormats
val conf = new SparkConf()
val localCluster = new LocalSparkCluster(2, 2, 512, conf)
@@ -144,174 +144,202 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva
}
test("basic scheduling - spread out") {
- testBasicScheduling(spreadOut = true)
+ basicScheduling(spreadOut = true)
}
test("basic scheduling - no spread out") {
- testBasicScheduling(spreadOut = false)
+ basicScheduling(spreadOut = false)
}
test("scheduling with max cores - spread out") {
- testSchedulingWithMaxCores(spreadOut = true)
+ schedulingWithMaxCores(spreadOut = true)
}
test("scheduling with max cores - no spread out") {
- testSchedulingWithMaxCores(spreadOut = false)
+ schedulingWithMaxCores(spreadOut = false)
}
test("scheduling with cores per executor - spread out") {
- testSchedulingWithCoresPerExecutor(spreadOut = true)
+ schedulingWithCoresPerExecutor(spreadOut = true)
}
test("scheduling with cores per executor - no spread out") {
- testSchedulingWithCoresPerExecutor(spreadOut = false)
+ schedulingWithCoresPerExecutor(spreadOut = false)
}
test("scheduling with cores per executor AND max cores - spread out") {
- testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = true)
+ schedulingWithCoresPerExecutorAndMaxCores(spreadOut = true)
}
test("scheduling with cores per executor AND max cores - no spread out") {
- testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = false)
+ schedulingWithCoresPerExecutorAndMaxCores(spreadOut = false)
}
- private def testBasicScheduling(spreadOut: Boolean): Unit = {
+ test("scheduling with executor limit - spread out") {
+ schedulingWithExecutorLimit(spreadOut = true)
+ }
+
+ test("scheduling with executor limit - no spread out") {
+ schedulingWithExecutorLimit(spreadOut = false)
+ }
+
+ test("scheduling with executor limit AND max cores - spread out") {
+ schedulingWithExecutorLimitAndMaxCores(spreadOut = true)
+ }
+
+ test("scheduling with executor limit AND max cores - no spread out") {
+ schedulingWithExecutorLimitAndMaxCores(spreadOut = false)
+ }
+
+ test("scheduling with executor limit AND cores per executor - spread out") {
+ schedulingWithExecutorLimitAndCoresPerExecutor(spreadOut = true)
+ }
+
+ test("scheduling with executor limit AND cores per executor - no spread out") {
+ schedulingWithExecutorLimitAndCoresPerExecutor(spreadOut = false)
+ }
+
+ test("scheduling with executor limit AND cores per executor AND max cores - spread out") {
+ schedulingWithEverything(spreadOut = true)
+ }
+
+ test("scheduling with executor limit AND cores per executor AND max cores - no spread out") {
+ schedulingWithEverything(spreadOut = false)
+ }
+
+ private def basicScheduling(spreadOut: Boolean): Unit = {
val master = makeMaster()
val appInfo = makeAppInfo(1024)
- val workerInfo = makeWorkerInfo(4096, 10)
- val workerInfos = Array(workerInfo, workerInfo, workerInfo)
- val scheduledCores = master.invokePrivate(
- _scheduleExecutorsOnWorkers(appInfo, workerInfos, spreadOut))
- assert(scheduledCores.length === 3)
- assert(scheduledCores(0) === 10)
- assert(scheduledCores(1) === 10)
- assert(scheduledCores(2) === 10)
+ val scheduledCores = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ assert(scheduledCores === Array(10, 10, 10))
}
- private def testSchedulingWithMaxCores(spreadOut: Boolean): Unit = {
+ private def schedulingWithMaxCores(spreadOut: Boolean): Unit = {
val master = makeMaster()
val appInfo1 = makeAppInfo(1024, maxCores = Some(8))
val appInfo2 = makeAppInfo(1024, maxCores = Some(16))
- val workerInfo = makeWorkerInfo(4096, 10)
- val workerInfos = Array(workerInfo, workerInfo, workerInfo)
- var scheduledCores = master.invokePrivate(
- _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
- assert(scheduledCores.length === 3)
- // With spreading out, each worker should be assigned a few cores
- if (spreadOut) {
- assert(scheduledCores(0) === 3)
- assert(scheduledCores(1) === 3)
- assert(scheduledCores(2) === 2)
- } else {
- // Without spreading out, the cores should be concentrated on the first worker
- assert(scheduledCores(0) === 8)
- assert(scheduledCores(1) === 0)
- assert(scheduledCores(2) === 0)
- }
- // Now test the same thing with max cores > cores per worker
- scheduledCores = master.invokePrivate(
- _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
- assert(scheduledCores.length === 3)
+ val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
+ val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
if (spreadOut) {
- assert(scheduledCores(0) === 6)
- assert(scheduledCores(1) === 5)
- assert(scheduledCores(2) === 5)
+ assert(scheduledCores1 === Array(3, 3, 2))
+ assert(scheduledCores2 === Array(6, 5, 5))
} else {
- // Without spreading out, the first worker should be fully booked,
- // and the leftover cores should spill over to the second worker only.
- assert(scheduledCores(0) === 10)
- assert(scheduledCores(1) === 6)
- assert(scheduledCores(2) === 0)
+ assert(scheduledCores1 === Array(8, 0, 0))
+ assert(scheduledCores2 === Array(10, 6, 0))
}
}
- private def testSchedulingWithCoresPerExecutor(spreadOut: Boolean): Unit = {
+ private def schedulingWithCoresPerExecutor(spreadOut: Boolean): Unit = {
val master = makeMaster()
val appInfo1 = makeAppInfo(1024, coresPerExecutor = Some(2))
val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2))
val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3))
- val workerInfo = makeWorkerInfo(4096, 10)
- val workerInfos = Array(workerInfo, workerInfo, workerInfo)
- // Each worker should end up with 4 executors with 2 cores each
- // This should be 4 because of the memory restriction on each worker
- var scheduledCores = master.invokePrivate(
- _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
- assert(scheduledCores.length === 3)
- assert(scheduledCores(0) === 8)
- assert(scheduledCores(1) === 8)
- assert(scheduledCores(2) === 8)
- // Now test the same thing without running into the worker memory limit
- // Each worker should now end up with 5 executors with 2 cores each
- scheduledCores = master.invokePrivate(
- _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
- assert(scheduledCores.length === 3)
- assert(scheduledCores(0) === 10)
- assert(scheduledCores(1) === 10)
- assert(scheduledCores(2) === 10)
- // Now test the same thing with a cores per executor that 10 is not divisible by
- scheduledCores = master.invokePrivate(
- _scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut))
- assert(scheduledCores.length === 3)
- assert(scheduledCores(0) === 9)
- assert(scheduledCores(1) === 9)
- assert(scheduledCores(2) === 9)
+ val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
+ val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
+ val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo3, workerInfos, spreadOut)
+ assert(scheduledCores1 === Array(8, 8, 8)) // 4 * 2 because of memory limits
+ assert(scheduledCores2 === Array(10, 10, 10)) // 5 * 2
+ assert(scheduledCores3 === Array(9, 9, 9)) // 3 * 3
}
// Sorry for the long method name!
- private def testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut: Boolean): Unit = {
+ private def schedulingWithCoresPerExecutorAndMaxCores(spreadOut: Boolean): Unit = {
val master = makeMaster()
val appInfo1 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(4))
val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(20))
val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3), maxCores = Some(20))
- val workerInfo = makeWorkerInfo(4096, 10)
- val workerInfos = Array(workerInfo, workerInfo, workerInfo)
- // We should only launch two executors, each with exactly 2 cores
- var scheduledCores = master.invokePrivate(
- _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
- assert(scheduledCores.length === 3)
+ val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
+ val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
+ val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo3, workerInfos, spreadOut)
+ if (spreadOut) {
+ assert(scheduledCores1 === Array(2, 2, 0))
+ assert(scheduledCores2 === Array(8, 6, 6))
+ assert(scheduledCores3 === Array(6, 6, 6))
+ } else {
+ assert(scheduledCores1 === Array(4, 0, 0))
+ assert(scheduledCores2 === Array(10, 10, 0))
+ assert(scheduledCores3 === Array(9, 9, 0))
+ }
+ }
+
+ private def schedulingWithExecutorLimit(spreadOut: Boolean): Unit = {
+ val master = makeMaster()
+ val appInfo = makeAppInfo(256)
+ appInfo.executorLimit = 0
+ val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ appInfo.executorLimit = 2
+ val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ appInfo.executorLimit = 5
+ val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ assert(scheduledCores1 === Array(0, 0, 0))
+ assert(scheduledCores2 === Array(10, 10, 0))
+ assert(scheduledCores3 === Array(10, 10, 10))
+ }
+
+ private def schedulingWithExecutorLimitAndMaxCores(spreadOut: Boolean): Unit = {
+ val master = makeMaster()
+ val appInfo = makeAppInfo(256, maxCores = Some(16))
+ appInfo.executorLimit = 0
+ val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ appInfo.executorLimit = 2
+ val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ appInfo.executorLimit = 5
+ val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ assert(scheduledCores1 === Array(0, 0, 0))
if (spreadOut) {
- assert(scheduledCores(0) === 2)
- assert(scheduledCores(1) === 2)
- assert(scheduledCores(2) === 0)
+ assert(scheduledCores2 === Array(8, 8, 0))
+ assert(scheduledCores3 === Array(6, 5, 5))
} else {
- assert(scheduledCores(0) === 4)
- assert(scheduledCores(1) === 0)
- assert(scheduledCores(2) === 0)
+ assert(scheduledCores2 === Array(10, 6, 0))
+ assert(scheduledCores3 === Array(10, 6, 0))
}
- // Test max cores > number of cores per worker
- scheduledCores = master.invokePrivate(
- _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
- assert(scheduledCores.length === 3)
+ }
+
+ private def schedulingWithExecutorLimitAndCoresPerExecutor(spreadOut: Boolean): Unit = {
+ val master = makeMaster()
+ val appInfo = makeAppInfo(256, coresPerExecutor = Some(4))
+ appInfo.executorLimit = 0
+ val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ appInfo.executorLimit = 2
+ val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ appInfo.executorLimit = 5
+ val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ assert(scheduledCores1 === Array(0, 0, 0))
if (spreadOut) {
- assert(scheduledCores(0) === 8)
- assert(scheduledCores(1) === 6)
- assert(scheduledCores(2) === 6)
+ assert(scheduledCores2 === Array(4, 4, 0))
} else {
- assert(scheduledCores(0) === 10)
- assert(scheduledCores(1) === 10)
- assert(scheduledCores(2) === 0)
+ assert(scheduledCores2 === Array(8, 0, 0))
}
- // Test max cores > number of cores per worker AND
- // a cores per executor that is 10 is not divisible by
- scheduledCores = master.invokePrivate(
- _scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut))
- assert(scheduledCores.length === 3)
+ assert(scheduledCores3 === Array(8, 8, 4))
+ }
+
+ // Everything being: executor limit + cores per executor + max cores
+ private def schedulingWithEverything(spreadOut: Boolean): Unit = {
+ val master = makeMaster()
+ val appInfo = makeAppInfo(256, coresPerExecutor = Some(4), maxCores = Some(18))
+ appInfo.executorLimit = 0
+ val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ appInfo.executorLimit = 2
+ val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ appInfo.executorLimit = 5
+ val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ assert(scheduledCores1 === Array(0, 0, 0))
if (spreadOut) {
- assert(scheduledCores(0) === 6)
- assert(scheduledCores(1) === 6)
- assert(scheduledCores(2) === 6)
+ assert(scheduledCores2 === Array(4, 4, 0))
+ assert(scheduledCores3 === Array(8, 4, 4))
} else {
- assert(scheduledCores(0) === 9)
- assert(scheduledCores(1) === 9)
- assert(scheduledCores(2) === 0)
+ assert(scheduledCores2 === Array(8, 0, 0))
+ assert(scheduledCores3 === Array(8, 8, 0))
}
}
- // ===============================
- // | Utility methods for testing |
- // ===============================
+ // ==========================================
+ // | Utility methods and fields for testing |
+ // ==========================================
private val _scheduleExecutorsOnWorkers = PrivateMethod[Array[Int]]('scheduleExecutorsOnWorkers)
+ private val workerInfo = makeWorkerInfo(4096, 10)
+ private val workerInfos = Array(workerInfo, workerInfo, workerInfo)
private def makeMaster(conf: SparkConf = new SparkConf): Master = {
val securityMgr = new SecurityManager(conf)
@@ -335,4 +363,12 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva
new WorkerInfo(workerId, "host", 100, cores, memoryMb, null, 101, "address")
}
+ private def scheduleExecutorsOnWorkers(
+ master: Master,
+ appInfo: ApplicationInfo,
+ workerInfos: Array[WorkerInfo],
+ spreadOut: Boolean): Array[Int] = {
+ master.invokePrivate(_scheduleExecutorsOnWorkers(appInfo, workerInfos, spreadOut))
+ }
+
}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index fa36629c37..f9384c4c3c 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -151,6 +151,10 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitionSpec$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DescribeCommand"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DDLException")
+ ) ++ Seq(
+ // SPARK-4751 Dynamic allocation for standalone mode
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.SparkContext.supportDynamicAllocation")
)
case v if v.startsWith("1.4") =>