aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2016-04-09 23:34:14 -0700
committerAndrew Or <andrew@databricks.com>2016-04-09 23:34:14 -0700
commit2c95e4e966b90d2a315350608d4b21b0381dfd11 (patch)
tree0ba6979e54fdc1a66b97510f6774ec3bca158fdb /streaming
parent3fb09afd5e55b9a7a0a332273f09f984a78c3645 (diff)
downloadspark-2c95e4e966b90d2a315350608d4b21b0381dfd11.tar.gz
spark-2c95e4e966b90d2a315350608d4b21b0381dfd11.tar.bz2
spark-2c95e4e966b90d2a315350608d4b21b0381dfd11.zip
[SPARK-14455][STREAMING] Fix NPE in allocatedExecutors when calling in receiver-less scenario
## What changes were proposed in this pull request? When calling `ReceiverTracker#allocatedExecutors` in receiver-less scenario, NPE will be thrown, since this `ReceiverTracker` actually is not started and `endpoint` is not created. This will be happened when playing streaming dynamic allocation with direct Kafka. ## How was this patch tested? Local integrated test is done. Author: jerryshao <sshao@hortonworks.com> Closes #12236 from jerryshao/SPARK-14455.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala12
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala23
2 files changed, 31 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index d67f70732d..3b33a979df 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -240,9 +240,15 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
* Get the executors allocated to each receiver.
* @return a map containing receiver ids to optional executor ids.
*/
- def allocatedExecutors(): Map[Int, Option[String]] = {
- endpoint.askWithRetry[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues {
- _.runningExecutor.map { _.executorId }
+ def allocatedExecutors(): Map[Int, Option[String]] = synchronized {
+ if (isTrackerStarted) {
+ endpoint.askWithRetry[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues {
+ _.runningExecutor.map {
+ _.executorId
+ }
+ }
+ } else {
+ Map.empty
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
index 7654bb2d03..df122ac090 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart, TaskLo
import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming._
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.dstream.{ConstantInputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.receiver._
/** Testsuite for receiver scheduling */
@@ -102,6 +102,27 @@ class ReceiverTrackerSuite extends TestSuiteBase {
}
}
}
+
+ test("get allocated executors") {
+ // Test get allocated executors when 1 receiver is registered
+ withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc =>
+ val input = ssc.receiverStream(new TestReceiver)
+ val output = new TestOutputStream(input)
+ output.register()
+ ssc.start()
+ assert(ssc.scheduler.receiverTracker.allocatedExecutors().size === 1)
+ }
+
+ // Test get allocated executors when there's no receiver registered
+ withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc =>
+ val rdd = ssc.sc.parallelize(1 to 10)
+ val input = new ConstantInputDStream(ssc, rdd)
+ val output = new TestOutputStream(input)
+ output.register()
+ ssc.start()
+ assert(ssc.scheduler.receiverTracker.allocatedExecutors() === Map.empty)
+ }
+ }
}
/** An input DStream with for testing rate controlling */