diff options
Diffstat (limited to 'streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala | 65 |
1 files changed, 33 insertions, 32 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index fa40436221..b97e24f28b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -125,9 +125,9 @@ private[ui] class MillisecondsStatUIData(data: Seq[(Long, Long)]) { * A helper class for "input rate" to generate data that will be used in the timeline and histogram * graphs. * - * @param data (batchTime, event-rate). + * @param data (batch time, record rate). */ -private[ui] class EventRateUIData(val data: Seq[(Long, Double)]) { +private[ui] class RecordRateUIData(val data: Seq[(Long, Double)]) { val avg: Option[Double] = if (data.isEmpty) None else Some(data.map(_._2).sum / data.size) @@ -215,7 +215,7 @@ private[ui] class StreamingPage(parent: StreamingTab) val minBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.min val maxBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.max - val eventRateForAllStreams = new EventRateUIData(batches.map { batchInfo => + val recordRateForAllStreams = new RecordRateUIData(batches.map { batchInfo => (batchInfo.batchTime.milliseconds, batchInfo.numRecords * 1000.0 / listener.batchDuration) }) @@ -241,24 +241,24 @@ private[ui] class StreamingPage(parent: StreamingTab) // Use the max input rate for all InputDStreams' graphs to make the Y axis ranges same. // If it's not an integral number, just use its ceil integral number. - val maxEventRate = eventRateForAllStreams.max.map(_.ceil.toLong).getOrElse(0L) - val minEventRate = 0L + val maxRecordRate = recordRateForAllStreams.max.map(_.ceil.toLong).getOrElse(0L) + val minRecordRate = 0L val batchInterval = UIUtils.convertToTimeUnit(listener.batchDuration, normalizedUnit) val jsCollector = new JsCollector - val graphUIDataForEventRateOfAllStreams = + val graphUIDataForRecordRateOfAllStreams = new GraphUIData( - "all-stream-events-timeline", - "all-stream-events-histogram", - eventRateForAllStreams.data, + "all-stream-records-timeline", + "all-stream-records-histogram", + recordRateForAllStreams.data, minBatchTime, maxBatchTime, - minEventRate, - maxEventRate, - "events/sec") - graphUIDataForEventRateOfAllStreams.generateDataJs(jsCollector) + minRecordRate, + maxRecordRate, + "records/sec") + graphUIDataForRecordRateOfAllStreams.generateDataJs(jsCollector) val graphUIDataForSchedulingDelay = new GraphUIData( @@ -334,16 +334,16 @@ private[ui] class StreamingPage(parent: StreamingTab) <div>Receivers: {listener.numActiveReceivers} / {numReceivers} active</div> } } - <div>Avg: {eventRateForAllStreams.formattedAvg} events/sec</div> + <div>Avg: {recordRateForAllStreams.formattedAvg} records/sec</div> </div> </td> - <td class="timeline">{graphUIDataForEventRateOfAllStreams.generateTimelineHtml(jsCollector)}</td> - <td class="histogram">{graphUIDataForEventRateOfAllStreams.generateHistogramHtml(jsCollector)}</td> + <td class="timeline">{graphUIDataForRecordRateOfAllStreams.generateTimelineHtml(jsCollector)}</td> + <td class="histogram">{graphUIDataForRecordRateOfAllStreams.generateHistogramHtml(jsCollector)}</td> </tr> {if (hasStream) { <tr id="inputs-table" style="display: none;" > <td colspan="3"> - {generateInputDStreamsTable(jsCollector, minBatchTime, maxBatchTime, minEventRate, maxEventRate)} + {generateInputDStreamsTable(jsCollector, minBatchTime, maxBatchTime, minRecordRate, maxRecordRate)} </td> </tr> }} @@ -390,15 +390,16 @@ private[ui] class StreamingPage(parent: StreamingTab) maxX: Long, minY: Double, maxY: Double): Seq[Node] = { - val maxYCalculated = listener.receivedEventRateWithBatchTime.values - .flatMap { case streamAndRates => streamAndRates.map { case (_, eventRate) => eventRate } } + val maxYCalculated = listener.receivedRecordRateWithBatchTime.values + .flatMap { case streamAndRates => streamAndRates.map { case (_, recordRate) => recordRate } } .reduceOption[Double](math.max) .map(_.ceil.toLong) .getOrElse(0L) - val content = listener.receivedEventRateWithBatchTime.toList.sortBy(_._1).map { - case (streamId, eventRates) => - generateInputDStreamRow(jsCollector, streamId, eventRates, minX, maxX, minY, maxYCalculated) + val content = listener.receivedRecordRateWithBatchTime.toList.sortBy(_._1).map { + case (streamId, recordRates) => + generateInputDStreamRow( + jsCollector, streamId, recordRates, minX, maxX, minY, maxYCalculated) }.foldLeft[Seq[Node]](Nil)(_ ++ _) // scalastyle:off @@ -422,7 +423,7 @@ private[ui] class StreamingPage(parent: StreamingTab) private def generateInputDStreamRow( jsCollector: JsCollector, streamId: Int, - eventRates: Seq[(Long, Double)], + recordRates: Seq[(Long, Double)], minX: Long, maxX: Long, minY: Double, @@ -447,25 +448,25 @@ private[ui] class StreamingPage(parent: StreamingTab) val receiverLastErrorTime = receiverInfo.map { r => if (r.lastErrorTime < 0) "-" else SparkUIUtils.formatDate(r.lastErrorTime) }.getOrElse(emptyCell) - val receivedRecords = new EventRateUIData(eventRates) + val receivedRecords = new RecordRateUIData(recordRates) - val graphUIDataForEventRate = + val graphUIDataForRecordRate = new GraphUIData( - s"stream-$streamId-events-timeline", - s"stream-$streamId-events-histogram", + s"stream-$streamId-records-timeline", + s"stream-$streamId-records-histogram", receivedRecords.data, minX, maxX, minY, maxY, - "events/sec") - graphUIDataForEventRate.generateDataJs(jsCollector) + "records/sec") + graphUIDataForRecordRate.generateDataJs(jsCollector) <tr> <td rowspan="2" style="vertical-align: middle; width: 151px;"> <div style="width: 151px;"> <div style="word-wrap: break-word;"><strong>{receiverName}</strong></div> - <div>Avg: {receivedRecords.formattedAvg} events/sec</div> + <div>Avg: {receivedRecords.formattedAvg} records/sec</div> </div> </td> <td>{receiverActive}</td> @@ -475,9 +476,9 @@ private[ui] class StreamingPage(parent: StreamingTab) </tr> <tr> <td colspan="3" class="timeline"> - {graphUIDataForEventRate.generateTimelineHtml(jsCollector)} + {graphUIDataForRecordRate.generateTimelineHtml(jsCollector)} </td> - <td class="histogram">{graphUIDataForEventRate.generateHistogramHtml(jsCollector)}</td> + <td class="histogram">{graphUIDataForRecordRate.generateHistogramHtml(jsCollector)}</td> </tr> } |