aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-07-09 13:48:29 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-07-09 13:48:29 -0700
commit1f6b0b1234cc03aa2e07aea7fec2de7563885238 (patch)
treec1a8c2d2f750ca5d8db14f6bf1558d58649396e5 /streaming/src
parentc4830598b271cc6390d127bd4cf8ab02b28792e0 (diff)
downloadspark-1f6b0b1234cc03aa2e07aea7fec2de7563885238.tar.gz
spark-1f6b0b1234cc03aa2e07aea7fec2de7563885238.tar.bz2
spark-1f6b0b1234cc03aa2e07aea7fec2de7563885238.zip
[SPARK-8701] [STREAMING] [WEBUI] Add input metadata in the batch page
This PR adds `metadata` to `InputInfo`. `InputDStream` can report its metadata for a batch and it will be shown in the batch page. For example, ![screen shot](https://cloud.githubusercontent.com/assets/1000778/8403741/d6ffc7e2-1e79-11e5-9888-c78c1575123a.png) FileInputDStream will display the new files for a batch, and DirectKafkaInputDStream will display its offset ranges. Author: zsxwing <zsxwing@gmail.com> Closes #7081 from zsxwing/input-metadata and squashes the following commits: f7abd9b [zsxwing] Revert the space changes in project/MimaExcludes.scala d906209 [zsxwing] Merge branch 'master' into input-metadata 74762da [zsxwing] Fix MiMa tests 7903e33 [zsxwing] Merge branch 'master' into input-metadata 450a46c [zsxwing] Address comments 1d94582 [zsxwing] Raname InputInfo to StreamInputInfo and change "metadata" to Map[String, Any] d496ae9 [zsxwing] Add input metadata in the batch page
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala9
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala38
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala43
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala5
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/InputInfoTrackerSuite.scala8
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala28
13 files changed, 122 insertions, 46 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 86a8e2beff..dd4da9d9ca 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -28,6 +28,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.streaming._
+import org.apache.spark.streaming.scheduler.StreamInputInfo
import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Utils}
/**
@@ -144,7 +145,14 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
batchTimeToSelectedFiles += ((validTime, newFiles))
recentlySelectedFiles ++= newFiles
- Some(filesToRDD(newFiles))
+ val rdds = Some(filesToRDD(newFiles))
+ // Copy newFiles to immutable.List to prevent from being modified by the user
+ val metadata = Map(
+ "files" -> newFiles.toList,
+ StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
+ val inputInfo = StreamInputInfo(id, 0, metadata)
+ ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
+ rdds
}
/** Clear the old time-to-files mappings along with old RDDs */
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index e76e7eb0de..a50f0efc03 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -24,7 +24,7 @@ import org.apache.spark.storage.BlockId
import org.apache.spark.streaming._
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.streaming.scheduler.InputInfo
+import org.apache.spark.streaming.scheduler.StreamInputInfo
import org.apache.spark.streaming.util.WriteAheadLogUtils
/**
@@ -70,7 +70,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
// Register the input blocks information into InputInfoTracker
- val inputInfo = InputInfo(id, blockInfos.flatMap(_.numRecords).sum)
+ val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
if (blockInfos.nonEmpty) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 5b9bfbf9b0..9922b6bc12 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -24,7 +24,7 @@ import org.apache.spark.streaming.Time
* :: DeveloperApi ::
* Class having information on completed batches.
* @param batchTime Time of the batch
- * @param streamIdToNumRecords A map of input stream id to record number
+ * @param streamIdToInputInfo A map of input stream id to its input info
* @param submissionTime Clock time of when jobs of this batch was submitted to
* the streaming scheduler queue
* @param processingStartTime Clock time of when the first job of this batch started processing
@@ -33,12 +33,15 @@ import org.apache.spark.streaming.Time
@DeveloperApi
case class BatchInfo(
batchTime: Time,
- streamIdToNumRecords: Map[Int, Long],
+ streamIdToInputInfo: Map[Int, StreamInputInfo],
submissionTime: Long,
processingStartTime: Option[Long],
processingEndTime: Option[Long]
) {
+ @deprecated("Use streamIdToInputInfo instead", "1.5.0")
+ def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords)
+
/**
* Time taken for the first job of this batch to start processing from the time this batch
* was submitted to the streaming scheduler. Essentially, it is
@@ -63,5 +66,5 @@ case class BatchInfo(
/**
* The number of recorders received by the receivers in this batch.
*/
- def numRecords: Long = streamIdToNumRecords.values.sum
+ def numRecords: Long = streamIdToInputInfo.values.map(_.numRecords).sum
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala
index 7c0db8a863..363c03d431 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala
@@ -20,11 +20,34 @@ package org.apache.spark.streaming.scheduler
import scala.collection.mutable
import org.apache.spark.Logging
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.streaming.{Time, StreamingContext}
-/** To track the information of input stream at specified batch time. */
-private[streaming] case class InputInfo(inputStreamId: Int, numRecords: Long) {
+/**
+ * :: DeveloperApi ::
+ * Track the information of input stream at specified batch time.
+ *
+ * @param inputStreamId the input stream id
+ * @param numRecords the number of records in a batch
+ * @param metadata metadata for this batch. It should contain at least one standard field named
+ * "Description" which maps to the content that will be shown in the UI.
+ */
+@DeveloperApi
+case class StreamInputInfo(
+ inputStreamId: Int, numRecords: Long, metadata: Map[String, Any] = Map.empty) {
require(numRecords >= 0, "numRecords must not be negative")
+
+ def metadataDescription: Option[String] =
+ metadata.get(StreamInputInfo.METADATA_KEY_DESCRIPTION).map(_.toString)
+}
+
+@DeveloperApi
+object StreamInputInfo {
+
+ /**
+ * The key for description in `StreamInputInfo.metadata`.
+ */
+ val METADATA_KEY_DESCRIPTION: String = "Description"
}
/**
@@ -34,12 +57,13 @@ private[streaming] case class InputInfo(inputStreamId: Int, numRecords: Long) {
private[streaming] class InputInfoTracker(ssc: StreamingContext) extends Logging {
// Map to track all the InputInfo related to specific batch time and input stream.
- private val batchTimeToInputInfos = new mutable.HashMap[Time, mutable.HashMap[Int, InputInfo]]
+ private val batchTimeToInputInfos =
+ new mutable.HashMap[Time, mutable.HashMap[Int, StreamInputInfo]]
/** Report the input information with batch time to the tracker */
- def reportInfo(batchTime: Time, inputInfo: InputInfo): Unit = synchronized {
+ def reportInfo(batchTime: Time, inputInfo: StreamInputInfo): Unit = synchronized {
val inputInfos = batchTimeToInputInfos.getOrElseUpdate(batchTime,
- new mutable.HashMap[Int, InputInfo]())
+ new mutable.HashMap[Int, StreamInputInfo]())
if (inputInfos.contains(inputInfo.inputStreamId)) {
throw new IllegalStateException(s"Input stream ${inputInfo.inputStreamId}} for batch" +
@@ -49,10 +73,10 @@ private[streaming] class InputInfoTracker(ssc: StreamingContext) extends Logging
}
/** Get the all the input stream's information of specified batch time */
- def getInfo(batchTime: Time): Map[Int, InputInfo] = synchronized {
+ def getInfo(batchTime: Time): Map[Int, StreamInputInfo] = synchronized {
val inputInfos = batchTimeToInputInfos.get(batchTime)
// Convert mutable HashMap to immutable Map for the caller
- inputInfos.map(_.toMap).getOrElse(Map[Int, InputInfo]())
+ inputInfos.map(_.toMap).getOrElse(Map[Int, StreamInputInfo]())
}
/** Cleanup the tracked input information older than threshold batch time */
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 9f93d6cbc3..f5d4185864 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -244,8 +244,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
- val streamIdToNumRecords = streamIdToInputInfos.mapValues(_.numRecords)
- jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToNumRecords))
+ jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index e6be63b2dd..95833efc94 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -28,7 +28,7 @@ private[streaming]
case class JobSet(
time: Time,
jobs: Seq[Job],
- streamIdToNumRecords: Map[Int, Long] = Map.empty) {
+ streamIdToInputInfo: Map[Int, StreamInputInfo] = Map.empty) {
private val incompleteJobs = new HashSet[Job]()
private val submissionTime = System.currentTimeMillis() // when this jobset was submitted
@@ -64,7 +64,7 @@ case class JobSet(
def toBatchInfo: BatchInfo = {
new BatchInfo(
time,
- streamIdToNumRecords,
+ streamIdToInputInfo,
submissionTime,
if (processingStartTime >= 0 ) Some(processingStartTime) else None,
if (processingEndTime >= 0 ) Some(processingEndTime) else None
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index f75067669a..0c891662c2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -17,11 +17,9 @@
package org.apache.spark.streaming.ui
-import java.text.SimpleDateFormat
-import java.util.Date
import javax.servlet.http.HttpServletRequest
-import scala.xml.{NodeSeq, Node, Text}
+import scala.xml.{NodeSeq, Node, Text, Unparsed}
import org.apache.commons.lang3.StringEscapeUtils
@@ -303,6 +301,9 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
batchUIData.processingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
val formattedTotalDelay = batchUIData.totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
+ val inputMetadatas = batchUIData.streamIdToInputInfo.values.flatMap { inputInfo =>
+ inputInfo.metadataDescription.map(desc => inputInfo.inputStreamId -> desc)
+ }.toSeq
val summary: NodeSeq =
<div>
<ul class="unstyled">
@@ -326,6 +327,13 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<strong>Total delay: </strong>
{formattedTotalDelay}
</li>
+ {
+ if (inputMetadatas.nonEmpty) {
+ <li>
+ <strong>Input Metadata:</strong>{generateInputMetadataTable(inputMetadatas)}
+ </li>
+ }
+ }
</ul>
</div>
@@ -340,4 +348,33 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
SparkUIUtils.headerSparkPage(s"Details of batch at $formattedBatchTime", content, parent)
}
+
+ def generateInputMetadataTable(inputMetadatas: Seq[(Int, String)]): Seq[Node] = {
+ <table class={SparkUIUtils.TABLE_CLASS_STRIPED}>
+ <thead>
+ <tr>
+ <th>Input</th>
+ <th>Metadata</th>
+ </tr>
+ </thead>
+ <tbody>
+ {inputMetadatas.flatMap(generateInputMetadataRow)}
+ </tbody>
+ </table>
+ }
+
+ def generateInputMetadataRow(inputMetadata: (Int, String)): Seq[Node] = {
+ val streamId = inputMetadata._1
+
+ <tr>
+ <td>{streamingListener.streamName(streamId).getOrElse(s"Stream-$streamId")}</td>
+ <td>{metadataDescriptionToHTML(inputMetadata._2)}</td>
+ </tr>
+ }
+
+ private def metadataDescriptionToHTML(metadataDescription: String): Seq[Node] = {
+ // tab to 4 spaces and "\n" to "<br/>"
+ Unparsed(StringEscapeUtils.escapeHtml4(metadataDescription).
+ replaceAllLiterally("\t", "&nbsp;&nbsp;&nbsp;&nbsp;").replaceAllLiterally("\n", "<br/>"))
+ }
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
index a5514dfd71..ae508c0e95 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
@@ -19,14 +19,14 @@
package org.apache.spark.streaming.ui
import org.apache.spark.streaming.Time
-import org.apache.spark.streaming.scheduler.BatchInfo
+import org.apache.spark.streaming.scheduler.{BatchInfo, StreamInputInfo}
import org.apache.spark.streaming.ui.StreamingJobProgressListener._
private[ui] case class OutputOpIdAndSparkJobId(outputOpId: OutputOpId, sparkJobId: SparkJobId)
private[ui] case class BatchUIData(
val batchTime: Time,
- val streamIdToNumRecords: Map[Int, Long],
+ val streamIdToInputInfo: Map[Int, StreamInputInfo],
val submissionTime: Long,
val processingStartTime: Option[Long],
val processingEndTime: Option[Long],
@@ -58,7 +58,7 @@ private[ui] case class BatchUIData(
/**
* The number of recorders received by the receivers in this batch.
*/
- def numRecords: Long = streamIdToNumRecords.values.sum
+ def numRecords: Long = streamIdToInputInfo.values.map(_.numRecords).sum
}
private[ui] object BatchUIData {
@@ -66,7 +66,7 @@ private[ui] object BatchUIData {
def apply(batchInfo: BatchInfo): BatchUIData = {
new BatchUIData(
batchInfo.batchTime,
- batchInfo.streamIdToNumRecords,
+ batchInfo.streamIdToInputInfo,
batchInfo.submissionTime,
batchInfo.processingStartTime,
batchInfo.processingEndTime
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 68e8ce9894..b77c555c68 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -192,7 +192,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
def receivedEventRateWithBatchTime: Map[Int, Seq[(Long, Double)]] = synchronized {
val _retainedBatches = retainedBatches
val latestBatches = _retainedBatches.map { batchUIData =>
- (batchUIData.batchTime.milliseconds, batchUIData.streamIdToNumRecords)
+ (batchUIData.batchTime.milliseconds, batchUIData.streamIdToInputInfo.mapValues(_.numRecords))
}
streamIds.map { streamId =>
val eventRates = latestBatches.map {
@@ -205,7 +205,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
def lastReceivedBatchRecords: Map[Int, Long] = synchronized {
- val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.streamIdToNumRecords)
+ val lastReceivedBlockInfoOption =
+ lastReceivedBatch.map(_.streamIdToInputInfo.mapValues(_.numRecords))
lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
streamIds.map { streamId =>
(streamId, lastReceivedBlockInfo.getOrElse(streamId, 0L))
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 7bc7727a9f..4bc1dd4a30 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -59,7 +59,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
batchInfosSubmitted.foreach { info =>
info.numRecords should be (1L)
- info.streamIdToNumRecords should be (Map(0 -> 1L))
+ info.streamIdToInputInfo should be (Map(0 -> StreamInputInfo(0, 1L)))
}
isInIncreasingOrder(batchInfosSubmitted.map(_.submissionTime)) should be (true)
@@ -77,7 +77,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
batchInfosStarted.foreach { info =>
info.numRecords should be (1L)
- info.streamIdToNumRecords should be (Map(0 -> 1L))
+ info.streamIdToInputInfo should be (Map(0 -> StreamInputInfo(0, 1L)))
}
isInIncreasingOrder(batchInfosStarted.map(_.submissionTime)) should be (true)
@@ -98,7 +98,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
batchInfosCompleted.foreach { info =>
info.numRecords should be (1L)
- info.streamIdToNumRecords should be (Map(0 -> 1L))
+ info.streamIdToInputInfo should be (Map(0 -> StreamInputInfo(0, 1L)))
}
isInIncreasingOrder(batchInfosCompleted.map(_.submissionTime)) should be (true)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 31b1aebf6a..0d58a7b544 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -76,7 +76,7 @@ class TestInputStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]],
}
// Report the input data's information to InputInfoTracker for testing
- val inputInfo = InputInfo(id, selectedInput.length.toLong)
+ val inputInfo = StreamInputInfo(id, selectedInput.length.toLong)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
val rdd = ssc.sc.makeRDD(selectedInput, numPartitions)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/InputInfoTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/InputInfoTrackerSuite.scala
index 2e210397fe..f5248acf71 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/InputInfoTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/InputInfoTrackerSuite.scala
@@ -46,8 +46,8 @@ class InputInfoTrackerSuite extends SparkFunSuite with BeforeAndAfter {
val streamId1 = 0
val streamId2 = 1
val time = Time(0L)
- val inputInfo1 = InputInfo(streamId1, 100L)
- val inputInfo2 = InputInfo(streamId2, 300L)
+ val inputInfo1 = StreamInputInfo(streamId1, 100L)
+ val inputInfo2 = StreamInputInfo(streamId2, 300L)
inputInfoTracker.reportInfo(time, inputInfo1)
inputInfoTracker.reportInfo(time, inputInfo2)
@@ -63,8 +63,8 @@ class InputInfoTrackerSuite extends SparkFunSuite with BeforeAndAfter {
val inputInfoTracker = new InputInfoTracker(ssc)
val streamId1 = 0
- val inputInfo1 = InputInfo(streamId1, 100L)
- val inputInfo2 = InputInfo(streamId1, 300L)
+ val inputInfo1 = StreamInputInfo(streamId1, 100L)
+ val inputInfo2 = StreamInputInfo(streamId1, 300L)
inputInfoTracker.reportInfo(Time(0), inputInfo1)
inputInfoTracker.reportInfo(Time(1), inputInfo2)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
index c9175d61b1..40dc1fb601 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
@@ -49,10 +49,12 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
val ssc = setupStreams(input, operation)
val listener = new StreamingJobProgressListener(ssc)
- val streamIdToNumRecords = Map(0 -> 300L, 1 -> 300L)
+ val streamIdToInputInfo = Map(
+ 0 -> StreamInputInfo(0, 300L),
+ 1 -> StreamInputInfo(1, 300L, Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "test")))
// onBatchSubmitted
- val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, None, None)
+ val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None)
listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
listener.waitingBatches should be (List(BatchUIData(batchInfoSubmitted)))
listener.runningBatches should be (Nil)
@@ -64,7 +66,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
listener.numTotalReceivedRecords should be (0)
// onBatchStarted
- val batchInfoStarted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
+ val batchInfoStarted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None)
listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
listener.waitingBatches should be (Nil)
listener.runningBatches should be (List(BatchUIData(batchInfoStarted)))
@@ -94,7 +96,9 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
batchUIData.get.schedulingDelay should be (batchInfoStarted.schedulingDelay)
batchUIData.get.processingDelay should be (batchInfoStarted.processingDelay)
batchUIData.get.totalDelay should be (batchInfoStarted.totalDelay)
- batchUIData.get.streamIdToNumRecords should be (Map(0 -> 300L, 1 -> 300L))
+ batchUIData.get.streamIdToInputInfo should be (Map(
+ 0 -> StreamInputInfo(0, 300L),
+ 1 -> StreamInputInfo(1, 300L, Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "test"))))
batchUIData.get.numRecords should be(600)
batchUIData.get.outputOpIdSparkJobIdPairs should be
Seq(OutputOpIdAndSparkJobId(0, 0),
@@ -103,7 +107,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
OutputOpIdAndSparkJobId(1, 1))
// onBatchCompleted
- val batchInfoCompleted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
+ val batchInfoCompleted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None)
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
listener.waitingBatches should be (Nil)
listener.runningBatches should be (Nil)
@@ -141,9 +145,9 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
val listener = new StreamingJobProgressListener(ssc)
- val streamIdToNumRecords = Map(0 -> 300L, 1 -> 300L)
+ val streamIdToInputInfo = Map(0 -> StreamInputInfo(0, 300L), 1 -> StreamInputInfo(1, 300L))
- val batchInfoCompleted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
+ val batchInfoCompleted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None)
for(_ <- 0 until (limit + 10)) {
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
@@ -182,7 +186,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
batchUIData.get.schedulingDelay should be (batchInfoSubmitted.schedulingDelay)
batchUIData.get.processingDelay should be (batchInfoSubmitted.processingDelay)
batchUIData.get.totalDelay should be (batchInfoSubmitted.totalDelay)
- batchUIData.get.streamIdToNumRecords should be (Map.empty)
+ batchUIData.get.streamIdToInputInfo should be (Map.empty)
batchUIData.get.numRecords should be (0)
batchUIData.get.outputOpIdSparkJobIdPairs should be (Seq(OutputOpIdAndSparkJobId(0, 0)))
@@ -211,14 +215,14 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
for (_ <- 0 until 2 * limit) {
- val streamIdToNumRecords = Map(0 -> 300L, 1 -> 300L)
+ val streamIdToInputInfo = Map(0 -> StreamInputInfo(0, 300L), 1 -> StreamInputInfo(1, 300L))
// onBatchSubmitted
- val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, None, None)
+ val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None)
listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
// onBatchStarted
- val batchInfoStarted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
+ val batchInfoStarted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None)
listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
// onJobStart
@@ -235,7 +239,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
listener.onJobStart(jobStart4)
// onBatchCompleted
- val batchInfoCompleted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
+ val batchInfoCompleted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None)
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
}