diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2016-04-06 15:46:20 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-04-06 15:46:20 -0700 |
commit | 9af5423ec28258becf27dbe89833b4f7d324d26a (patch) | |
tree | 2578d6e122b485e955b6ae389c7f36a46cd051a2 /core/src/main/scala/org/apache | |
parent | de4792605ad94d3d7548a2139372bb6cac331079 (diff) | |
download | spark-9af5423ec28258becf27dbe89833b4f7d324d26a.tar.gz spark-9af5423ec28258becf27dbe89833b4f7d324d26a.tar.bz2 spark-9af5423ec28258becf27dbe89833b4f7d324d26a.zip |
[SPARK-12133][STREAMING] Streaming dynamic allocation
## What changes were proposed in this pull request?
Added a new Executor Allocation Manager for the Streaming scheduler for doing Streaming Dynamic Allocation.
## How was this patch tested
Unit tests, and cluster tests.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #12154 from tdas/streaming-dynamic-allocation.
Diffstat (limited to 'core/src/main/scala/org/apache')
3 files changed, 18 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index 842bfdbadc..8baddf45bf 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -23,6 +23,10 @@ package org.apache.spark */ private[spark] trait ExecutorAllocationClient { + + /** Get the list of currently active executors */ + private[spark] def getExecutorIds(): Seq[String] + /** * Update the cluster manager on our scheduling needs. Three bits of information are included * to help it make decisions. diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4b3264cbf5..c40fada64b 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1360,6 +1360,16 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli listenerBus.addListener(listener) } + private[spark] override def getExecutorIds(): Seq[String] = { + schedulerBackend match { + case b: CoarseGrainedSchedulerBackend => + b.getExecutorIds() + case _ => + logWarning("Requesting executors is only supported in coarse-grained mode") + Nil + } + } + /** * Update the cluster manager on our scheduling needs. Three bits of information are included * to help it make decisions. diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index f71bfd489d..e5abf0e150 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -430,6 +430,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp */ private def numExistingExecutors: Int = executorDataMap.size + override def getExecutorIds(): Seq[String] = { + executorDataMap.keySet.toSeq + } + /** * Request an additional number of executors from the cluster manager. * @return whether the request is acknowledged. |