aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2016-04-09 23:34:14 -0700
committerAndrew Or <andrew@databricks.com>2016-04-09 23:34:14 -0700
commit2c95e4e966b90d2a315350608d4b21b0381dfd11 (patch)
tree0ba6979e54fdc1a66b97510f6774ec3bca158fdb /streaming/src/test
parent3fb09afd5e55b9a7a0a332273f09f984a78c3645 (diff)
downloadspark-2c95e4e966b90d2a315350608d4b21b0381dfd11.tar.gz
spark-2c95e4e966b90d2a315350608d4b21b0381dfd11.tar.bz2
spark-2c95e4e966b90d2a315350608d4b21b0381dfd11.zip
[SPARK-14455][STREAMING] Fix NPE in allocatedExecutors when calling in receiver-less scenario
## What changes were proposed in this pull request? When calling `ReceiverTracker#allocatedExecutors` in receiver-less scenario, NPE will be thrown, since this `ReceiverTracker` actually is not started and `endpoint` is not created. This will be happened when playing streaming dynamic allocation with direct Kafka. ## How was this patch tested? Local integrated test is done. Author: jerryshao <sshao@hortonworks.com> Closes #12236 from jerryshao/SPARK-14455.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala23
1 files changed, 22 insertions, 1 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
index 7654bb2d03..df122ac090 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart, TaskLo
import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming._
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.dstream.{ConstantInputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.receiver._
/** Testsuite for receiver scheduling */
@@ -102,6 +102,27 @@ class ReceiverTrackerSuite extends TestSuiteBase {
}
}
}
+
+ test("get allocated executors") {
+ // Test get allocated executors when 1 receiver is registered
+ withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc =>
+ val input = ssc.receiverStream(new TestReceiver)
+ val output = new TestOutputStream(input)
+ output.register()
+ ssc.start()
+ assert(ssc.scheduler.receiverTracker.allocatedExecutors().size === 1)
+ }
+
+ // Test get allocated executors when there's no receiver registered
+ withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc =>
+ val rdd = ssc.sc.parallelize(1 to 10)
+ val input = new ConstantInputDStream(ssc, rdd)
+ val output = new TestOutputStream(input)
+ output.register()
+ ssc.start()
+ assert(ssc.scheduler.receiverTracker.allocatedExecutors() === Map.empty)
+ }
+ }
}
/** An input DStream with for testing rate controlling */