aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala5
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala7
2 files changed, 12 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index 15d9710d37..5cfe43a1ce 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -24,6 +24,7 @@ import org.apache.spark.storage.BlockId
import org.apache.spark.streaming._
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.streaming.scheduler.InputInfo
import org.apache.spark.streaming.util.WriteAheadLogUtils
/**
@@ -68,6 +69,10 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
+ // Register the input blocks information into InputInfoTracker
+ val inputInfo = InputInfo(id, blockInfos.map(_.numRecords).sum)
+ ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
+
// Are WAL record handles present with all the blocks
val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 6074532502..93e6b0cd7c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -50,6 +50,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Set up the streaming context and input streams
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+ ssc.addStreamingListener(ssc.progressListener)
+
val input = Seq(1, 2, 3, 4, 5)
// Use "batchCount" to make sure we check the result after all batches finish
val batchCounter = new BatchCounter(ssc)
@@ -72,6 +74,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
if (!batchCounter.waitUntilBatchesCompleted(input.size, 30000)) {
fail("Timeout: cannot finish all batches in 30 seconds")
}
+
+ // Verify all "InputInfo"s have been reported
+ assert(ssc.progressListener.numTotalReceivedRecords === input.size)
+ assert(ssc.progressListener.numTotalProcessedRecords === input.size)
+
logInfo("Stopping server")
testServer.stop()
logInfo("Stopping context")