diff options
author | jerryshao <sshao@hortonworks.com> | 2016-04-09 23:34:14 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-04-09 23:34:14 -0700 |
commit | 2c95e4e966b90d2a315350608d4b21b0381dfd11 (patch) | |
tree | 0ba6979e54fdc1a66b97510f6774ec3bca158fdb | |
parent | 3fb09afd5e55b9a7a0a332273f09f984a78c3645 (diff) | |
download | spark-2c95e4e966b90d2a315350608d4b21b0381dfd11.tar.gz spark-2c95e4e966b90d2a315350608d4b21b0381dfd11.tar.bz2 spark-2c95e4e966b90d2a315350608d4b21b0381dfd11.zip |
[SPARK-14455][STREAMING] Fix NPE in allocatedExecutors when calling in receiver-less scenario
## What changes were proposed in this pull request?
When calling `ReceiverTracker#allocatedExecutors` in receiver-less scenario, NPE will be thrown, since this `ReceiverTracker` actually is not started and `endpoint` is not created.
This will be happened when playing streaming dynamic allocation with direct Kafka.
## How was this patch tested?
Local integrated test is done.
Author: jerryshao <sshao@hortonworks.com>
Closes #12236 from jerryshao/SPARK-14455.
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala | 12 | ||||
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala | 23 |
2 files changed, 31 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index d67f70732d..3b33a979df 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -240,9 +240,15 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false * Get the executors allocated to each receiver. * @return a map containing receiver ids to optional executor ids. */ - def allocatedExecutors(): Map[Int, Option[String]] = { - endpoint.askWithRetry[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues { - _.runningExecutor.map { _.executorId } + def allocatedExecutors(): Map[Int, Option[String]] = synchronized { + if (isTrackerStarted) { + endpoint.askWithRetry[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues { + _.runningExecutor.map { + _.executorId + } + } + } else { + Map.empty } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index 7654bb2d03..df122ac090 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart, TaskLo import org.apache.spark.scheduler.TaskLocality.TaskLocality import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.streaming._ -import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.dstream.{ConstantInputDStream, ReceiverInputDStream} import org.apache.spark.streaming.receiver._ /** Testsuite for receiver scheduling */ @@ -102,6 +102,27 @@ class ReceiverTrackerSuite extends TestSuiteBase { } } } + + test("get allocated executors") { + // Test get allocated executors when 1 receiver is registered + withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc => + val input = ssc.receiverStream(new TestReceiver) + val output = new TestOutputStream(input) + output.register() + ssc.start() + assert(ssc.scheduler.receiverTracker.allocatedExecutors().size === 1) + } + + // Test get allocated executors when there's no receiver registered + withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc => + val rdd = ssc.sc.parallelize(1 to 10) + val input = new ConstantInputDStream(ssc, rdd) + val output = new TestOutputStream(input) + output.register() + ssc.start() + assert(ssc.scheduler.receiverTracker.allocatedExecutors() === Map.empty) + } + } } /** An input DStream with for testing rate controlling */ |