aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala12
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala23
2 files changed, 31 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index d67f70732d..3b33a979df 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -240,9 +240,15 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
* Get the executors allocated to each receiver.
* @return a map containing receiver ids to optional executor ids.
*/
- def allocatedExecutors(): Map[Int, Option[String]] = {
- endpoint.askWithRetry[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues {
- _.runningExecutor.map { _.executorId }
+ def allocatedExecutors(): Map[Int, Option[String]] = synchronized {
+ if (isTrackerStarted) {
+ endpoint.askWithRetry[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues {
+ _.runningExecutor.map {
+ _.executorId
+ }
+ }
+ } else {
+ Map.empty
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
index 7654bb2d03..df122ac090 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart, TaskLo
import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming._
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.dstream.{ConstantInputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.receiver._
/** Testsuite for receiver scheduling */
@@ -102,6 +102,27 @@ class ReceiverTrackerSuite extends TestSuiteBase {
}
}
}
+
+ test("get allocated executors") {
+ // Test get allocated executors when 1 receiver is registered
+ withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc =>
+ val input = ssc.receiverStream(new TestReceiver)
+ val output = new TestOutputStream(input)
+ output.register()
+ ssc.start()
+ assert(ssc.scheduler.receiverTracker.allocatedExecutors().size === 1)
+ }
+
+ // Test get allocated executors when there's no receiver registered
+ withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc =>
+ val rdd = ssc.sc.parallelize(1 to 10)
+ val input = new ConstantInputDStream(ssc, rdd)
+ val output = new TestOutputStream(input)
+ output.register()
+ ssc.start()
+ assert(ssc.scheduler.receiverTracker.allocatedExecutors() === Map.empty)
+ }
+ }
}
/** An input DStream with for testing rate controlling */