aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2014-12-18 17:37:42 -0800
committerAndrew Or <andrew@databricks.com>2014-12-18 17:42:02 -0800
commitca37639aa1b537d0f9b56bf1362bf293635e235c (patch)
treefe1dd6039434d75bd0a7916d75f38986943a3a2c /core
parent075b399c59b508251f4fb259e7b0c13b79ff5883 (diff)
downloadspark-ca37639aa1b537d0f9b56bf1362bf293635e235c.tar.gz
spark-ca37639aa1b537d0f9b56bf1362bf293635e235c.tar.bz2
spark-ca37639aa1b537d0f9b56bf1362bf293635e235c.zip
[SPARK-4754] Refactor SparkContext into ExecutorAllocationClient
This is such that the `ExecutorAllocationManager` does not take in the `SparkContext` with all of its dependencies as an argument. This prevents future developers of this class to tie down this class further with the `SparkContext`, which has really become quite a monstrous object. cc'ing pwendell who originally suggested this, and JoshRosen who may have thoughts about the trait mix-in style of `SparkContext`. Author: Andrew Or <andrew@databricks.com> Closes #3614 from andrewor14/dynamic-allocation-sc and squashes the following commits: 187070d [Andrew Or] Merge branch 'master' of github.com:apache/spark into dynamic-allocation-sc 59baf6c [Andrew Or] Merge branch 'master' of github.com:apache/spark into dynamic-allocation-sc 347a348 [Andrew Or] Refactor SparkContext into ExecutorAllocationClient (cherry picked from commit 9804a759b68f56eceb8a2f4ea90f76a92b5f9f67) Signed-off-by: Andrew Or <andrew@databricks.com> Conflicts: core/src/main/scala/org/apache/spark/SparkContext.scala
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala42
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala8
4 files changed, 59 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
new file mode 100644
index 0000000000..a46a81eabd
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+/**
+ * A client that communicates with the cluster manager to request or kill executors.
+ */
+private[spark] trait ExecutorAllocationClient {
+
+ /**
+ * Request an additional number of executors from the cluster manager.
+ * Return whether the request is acknowledged by the cluster manager.
+ */
+ def requestExecutors(numAdditionalExecutors: Int): Boolean
+
+ /**
+ * Request that the cluster manager kill the specified executors.
+ * Return whether the request is acknowledged by the cluster manager.
+ */
+ def killExecutors(executorIds: Seq[String]): Boolean
+
+ /**
+ * Request that the cluster manager kill the specified executor.
+ * Return whether the request is acknowledged by the cluster manager.
+ */
+ def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId))
+}
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 88adb89299..e9e90e3f2f 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -60,11 +60,13 @@ import org.apache.spark.scheduler._
* spark.dynamicAllocation.executorIdleTimeout (K) -
* If an executor has been idle for this duration, remove it
*/
-private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging {
+private[spark] class ExecutorAllocationManager(
+ client: ExecutorAllocationClient,
+ listenerBus: LiveListenerBus,
+ conf: SparkConf)
+ extends Logging {
import ExecutorAllocationManager._
- private val conf = sc.conf
-
// Lower and upper bounds on the number of executors. These are required.
private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1)
private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1)
@@ -168,7 +170,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
* Register for scheduler callbacks to decide when to add and remove executors.
*/
def start(): Unit = {
- sc.addSparkListener(listener)
+ listenerBus.addListener(listener)
startPolling()
}
@@ -253,7 +255,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd)
val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
- val addRequestAcknowledged = testing || sc.requestExecutors(actualNumExecutorsToAdd)
+ val addRequestAcknowledged = testing || client.requestExecutors(actualNumExecutorsToAdd)
if (addRequestAcknowledged) {
logInfo(s"Requesting $actualNumExecutorsToAdd new executor(s) because " +
s"tasks are backlogged (new desired total will be $newTotalExecutors)")
@@ -295,7 +297,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
}
// Send a request to the backend to kill this executor
- val removeRequestAcknowledged = testing || sc.killExecutor(executorId)
+ val removeRequestAcknowledged = testing || client.killExecutor(executorId)
if (removeRequestAcknowledged) {
logInfo(s"Removing executor $executorId because it has been idle for " +
s"$executorIdleTimeout seconds (new desired total will be ${numExistingExecutors - 1})")
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 32191c601c..9d3d1e1d2b 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -64,7 +64,7 @@ import org.apache.spark.util._
* @param config a Spark Config object describing the application configuration. Any settings in
* this config overrides the default configs as well as system properties.
*/
-class SparkContext(config: SparkConf) extends Logging {
+class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
// The call site where this SparkContext was constructed.
private val creationSite: CallSite = Utils.getCallSite()
@@ -361,7 +361,7 @@ class SparkContext(config: SparkConf) extends Logging {
// Optionally scale number of executors dynamically based on workload. Exposed for testing.
private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] =
if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
- Some(new ExecutorAllocationManager(this))
+ Some(new ExecutorAllocationManager(this, listenerBus, conf))
} else {
None
}
@@ -990,7 +990,7 @@ class SparkContext(config: SparkConf) extends Logging {
* This is currently only supported in Yarn mode. Return whether the request is received.
*/
@DeveloperApi
- def requestExecutors(numAdditionalExecutors: Int): Boolean = {
+ override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestExecutors(numAdditionalExecutors)
@@ -1006,7 +1006,7 @@ class SparkContext(config: SparkConf) extends Logging {
* This is currently only supported in Yarn mode. Return whether the request is received.
*/
@DeveloperApi
- def killExecutors(executorIds: Seq[String]): Boolean = {
+ override def killExecutors(executorIds: Seq[String]): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutors(executorIds)
@@ -1022,7 +1022,7 @@ class SparkContext(config: SparkConf) extends Logging {
* This is currently only supported in Yarn mode. Return whether the request is received.
*/
@DeveloperApi
- def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId))
+ override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId)
/** The version of Spark on which this application is running. */
def version = SPARK_VERSION
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 29cd34429b..fe9914b50b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -27,7 +27,7 @@ import akka.actor._
import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
-import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
+import org.apache.spark.{ExecutorAllocationClient, Logging, SparkEnv, SparkException, TaskState}
import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, Utils}
@@ -42,7 +42,7 @@ import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, Ut
*/
private[spark]
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSystem: ActorSystem)
- extends SchedulerBackend with Logging
+ extends ExecutorAllocationClient with SchedulerBackend with Logging
{
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
@@ -307,7 +307,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
* Request an additional number of executors from the cluster manager.
* Return whether the request is acknowledged.
*/
- final def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized {
+ final override def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized {
logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager")
logDebug(s"Number of pending executors is now $numPendingExecutors")
numPendingExecutors += numAdditionalExecutors
@@ -334,7 +334,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
* Request that the cluster manager kill the specified executors.
* Return whether the kill request is acknowledged.
*/
- final def killExecutors(executorIds: Seq[String]): Boolean = synchronized {
+ final override def killExecutors(executorIds: Seq[String]): Boolean = synchronized {
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
val filteredExecutorIds = new ArrayBuffer[String]
executorIds.foreach { id =>