diff options
author | jerryshao <sshao@hortonworks.com> | 2016-04-09 23:34:14 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-04-09 23:34:14 -0700 |
commit | 2c95e4e966b90d2a315350608d4b21b0381dfd11 (patch) | |
tree | 0ba6979e54fdc1a66b97510f6774ec3bca158fdb /streaming/src/test | |
parent | 3fb09afd5e55b9a7a0a332273f09f984a78c3645 (diff) | |
download | spark-2c95e4e966b90d2a315350608d4b21b0381dfd11.tar.gz spark-2c95e4e966b90d2a315350608d4b21b0381dfd11.tar.bz2 spark-2c95e4e966b90d2a315350608d4b21b0381dfd11.zip |
[SPARK-14455][STREAMING] Fix NPE in allocatedExecutors when calling in receiver-less scenario
## What changes were proposed in this pull request?
When calling `ReceiverTracker#allocatedExecutors` in receiver-less scenario, NPE will be thrown, since this `ReceiverTracker` actually is not started and `endpoint` is not created.
This will be happened when playing streaming dynamic allocation with direct Kafka.
## How was this patch tested?
Local integrated test is done.
Author: jerryshao <sshao@hortonworks.com>
Closes #12236 from jerryshao/SPARK-14455.
Diffstat (limited to 'streaming/src/test')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala | 23 |
1 files changed, 22 insertions, 1 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index 7654bb2d03..df122ac090 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart, TaskLo import org.apache.spark.scheduler.TaskLocality.TaskLocality import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.streaming._ -import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.dstream.{ConstantInputDStream, ReceiverInputDStream} import org.apache.spark.streaming.receiver._ /** Testsuite for receiver scheduling */ @@ -102,6 +102,27 @@ class ReceiverTrackerSuite extends TestSuiteBase { } } } + + test("get allocated executors") { + // Test get allocated executors when 1 receiver is registered + withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc => + val input = ssc.receiverStream(new TestReceiver) + val output = new TestOutputStream(input) + output.register() + ssc.start() + assert(ssc.scheduler.receiverTracker.allocatedExecutors().size === 1) + } + + // Test get allocated executors when there's no receiver registered + withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc => + val rdd = ssc.sc.parallelize(1 to 10) + val input = new ConstantInputDStream(ssc, rdd) + val output = new TestOutputStream(input) + output.register() + ssc.start() + assert(ssc.scheduler.receiverTracker.allocatedExecutors() === Map.empty) + } + } } /** An input DStream with for testing rate controlling */ |