aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-04-06 15:46:20 -0700
committerAndrew Or <andrew@databricks.com>2016-04-06 15:46:20 -0700
commit9af5423ec28258becf27dbe89833b4f7d324d26a (patch)
tree2578d6e122b485e955b6ae389c7f36a46cd051a2 /core
parentde4792605ad94d3d7548a2139372bb6cac331079 (diff)
downloadspark-9af5423ec28258becf27dbe89833b4f7d324d26a.tar.gz
spark-9af5423ec28258becf27dbe89833b4f7d324d26a.tar.bz2
spark-9af5423ec28258becf27dbe89833b4f7d324d26a.zip
[SPARK-12133][STREAMING] Streaming dynamic allocation
## What changes were proposed in this pull request? Added a new Executor Allocation Manager for the Streaming scheduler for doing Streaming Dynamic Allocation. ## How was this patch tested Unit tests, and cluster tests. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #12154 from tdas/streaming-dynamic-allocation.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala4
3 files changed, 18 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
index 842bfdbadc..8baddf45bf 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -23,6 +23,10 @@ package org.apache.spark
*/
private[spark] trait ExecutorAllocationClient {
+
+ /** Get the list of currently active executors */
+ private[spark] def getExecutorIds(): Seq[String]
+
/**
* Update the cluster manager on our scheduling needs. Three bits of information are included
* to help it make decisions.
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4b3264cbf5..c40fada64b 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1360,6 +1360,16 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
listenerBus.addListener(listener)
}
+ private[spark] override def getExecutorIds(): Seq[String] = {
+ schedulerBackend match {
+ case b: CoarseGrainedSchedulerBackend =>
+ b.getExecutorIds()
+ case _ =>
+ logWarning("Requesting executors is only supported in coarse-grained mode")
+ Nil
+ }
+ }
+
/**
* Update the cluster manager on our scheduling needs. Three bits of information are included
* to help it make decisions.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index f71bfd489d..e5abf0e150 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -430,6 +430,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
*/
private def numExistingExecutors: Int = executorDataMap.size
+ override def getExecutorIds(): Seq[String] = {
+ executorDataMap.keySet.toSeq
+ }
+
/**
* Request an additional number of executors from the cluster manager.
* @return whether the request is acknowledged.