aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/SparkContext.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/SparkContext.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala24
1 files changed, 15 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 1981ad5671..f58037e100 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -73,7 +73,7 @@ import org.apache.spark.util._
* @param config a Spark Config object describing the application configuration. Any settings in
* this config overrides the default configs as well as system properties.
*/
-class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
+class SparkContext(config: SparkConf) extends Logging {
// The call site where this SparkContext was constructed.
private val creationSite: CallSite = Utils.getCallSite()
@@ -534,7 +534,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
_executorAllocationManager =
if (dynamicAllocationEnabled) {
- Some(new ExecutorAllocationManager(this, listenerBus, _conf))
+ schedulerBackend match {
+ case b: ExecutorAllocationClient =>
+ Some(new ExecutorAllocationManager(
+ schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
+ case _ =>
+ None
+ }
} else {
None
}
@@ -1473,7 +1479,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
listenerBus.addListener(listener)
}
- private[spark] override def getExecutorIds(): Seq[String] = {
+ private[spark] def getExecutorIds(): Seq[String] = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.getExecutorIds()
@@ -1498,7 +1504,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @return whether the request is acknowledged by the cluster manager.
*/
@DeveloperApi
- override def requestTotalExecutors(
+ def requestTotalExecutors(
numExecutors: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: scala.collection.immutable.Map[String, Int]
@@ -1518,7 +1524,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @return whether the request is received.
*/
@DeveloperApi
- override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
+ def requestExecutors(numAdditionalExecutors: Int): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestExecutors(numAdditionalExecutors)
@@ -1540,10 +1546,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @return whether the request is received.
*/
@DeveloperApi
- override def killExecutors(executorIds: Seq[String]): Boolean = {
+ def killExecutors(executorIds: Seq[String]): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
- b.killExecutors(executorIds, replace = false, force = true)
+ b.killExecutors(executorIds, replace = false, force = true).nonEmpty
case _ =>
logWarning("Killing executors is only supported in coarse-grained mode")
false
@@ -1562,7 +1568,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @return whether the request is received.
*/
@DeveloperApi
- override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId)
+ def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId))
/**
* Request that the cluster manager kill the specified executor without adjusting the
@@ -1581,7 +1587,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
- b.killExecutors(Seq(executorId), replace = true, force = true)
+ b.killExecutors(Seq(executorId), replace = true, force = true).nonEmpty
case _ =>
logWarning("Killing executors is only supported in coarse-grained mode")
false