aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala65
1 files changed, 33 insertions, 32 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index fa40436221..b97e24f28b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -125,9 +125,9 @@ private[ui] class MillisecondsStatUIData(data: Seq[(Long, Long)]) {
* A helper class for "input rate" to generate data that will be used in the timeline and histogram
* graphs.
*
- * @param data (batchTime, event-rate).
+ * @param data (batch time, record rate).
*/
-private[ui] class EventRateUIData(val data: Seq[(Long, Double)]) {
+private[ui] class RecordRateUIData(val data: Seq[(Long, Double)]) {
val avg: Option[Double] = if (data.isEmpty) None else Some(data.map(_._2).sum / data.size)
@@ -215,7 +215,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
val minBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.min
val maxBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.max
- val eventRateForAllStreams = new EventRateUIData(batches.map { batchInfo =>
+ val recordRateForAllStreams = new RecordRateUIData(batches.map { batchInfo =>
(batchInfo.batchTime.milliseconds, batchInfo.numRecords * 1000.0 / listener.batchDuration)
})
@@ -241,24 +241,24 @@ private[ui] class StreamingPage(parent: StreamingTab)
// Use the max input rate for all InputDStreams' graphs to make the Y axis ranges same.
// If it's not an integral number, just use its ceil integral number.
- val maxEventRate = eventRateForAllStreams.max.map(_.ceil.toLong).getOrElse(0L)
- val minEventRate = 0L
+ val maxRecordRate = recordRateForAllStreams.max.map(_.ceil.toLong).getOrElse(0L)
+ val minRecordRate = 0L
val batchInterval = UIUtils.convertToTimeUnit(listener.batchDuration, normalizedUnit)
val jsCollector = new JsCollector
- val graphUIDataForEventRateOfAllStreams =
+ val graphUIDataForRecordRateOfAllStreams =
new GraphUIData(
- "all-stream-events-timeline",
- "all-stream-events-histogram",
- eventRateForAllStreams.data,
+ "all-stream-records-timeline",
+ "all-stream-records-histogram",
+ recordRateForAllStreams.data,
minBatchTime,
maxBatchTime,
- minEventRate,
- maxEventRate,
- "events/sec")
- graphUIDataForEventRateOfAllStreams.generateDataJs(jsCollector)
+ minRecordRate,
+ maxRecordRate,
+ "records/sec")
+ graphUIDataForRecordRateOfAllStreams.generateDataJs(jsCollector)
val graphUIDataForSchedulingDelay =
new GraphUIData(
@@ -334,16 +334,16 @@ private[ui] class StreamingPage(parent: StreamingTab)
<div>Receivers: {listener.numActiveReceivers} / {numReceivers} active</div>
}
}
- <div>Avg: {eventRateForAllStreams.formattedAvg} events/sec</div>
+ <div>Avg: {recordRateForAllStreams.formattedAvg} records/sec</div>
</div>
</td>
- <td class="timeline">{graphUIDataForEventRateOfAllStreams.generateTimelineHtml(jsCollector)}</td>
- <td class="histogram">{graphUIDataForEventRateOfAllStreams.generateHistogramHtml(jsCollector)}</td>
+ <td class="timeline">{graphUIDataForRecordRateOfAllStreams.generateTimelineHtml(jsCollector)}</td>
+ <td class="histogram">{graphUIDataForRecordRateOfAllStreams.generateHistogramHtml(jsCollector)}</td>
</tr>
{if (hasStream) {
<tr id="inputs-table" style="display: none;" >
<td colspan="3">
- {generateInputDStreamsTable(jsCollector, minBatchTime, maxBatchTime, minEventRate, maxEventRate)}
+ {generateInputDStreamsTable(jsCollector, minBatchTime, maxBatchTime, minRecordRate, maxRecordRate)}
</td>
</tr>
}}
@@ -390,15 +390,16 @@ private[ui] class StreamingPage(parent: StreamingTab)
maxX: Long,
minY: Double,
maxY: Double): Seq[Node] = {
- val maxYCalculated = listener.receivedEventRateWithBatchTime.values
- .flatMap { case streamAndRates => streamAndRates.map { case (_, eventRate) => eventRate } }
+ val maxYCalculated = listener.receivedRecordRateWithBatchTime.values
+ .flatMap { case streamAndRates => streamAndRates.map { case (_, recordRate) => recordRate } }
.reduceOption[Double](math.max)
.map(_.ceil.toLong)
.getOrElse(0L)
- val content = listener.receivedEventRateWithBatchTime.toList.sortBy(_._1).map {
- case (streamId, eventRates) =>
- generateInputDStreamRow(jsCollector, streamId, eventRates, minX, maxX, minY, maxYCalculated)
+ val content = listener.receivedRecordRateWithBatchTime.toList.sortBy(_._1).map {
+ case (streamId, recordRates) =>
+ generateInputDStreamRow(
+ jsCollector, streamId, recordRates, minX, maxX, minY, maxYCalculated)
}.foldLeft[Seq[Node]](Nil)(_ ++ _)
// scalastyle:off
@@ -422,7 +423,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
private def generateInputDStreamRow(
jsCollector: JsCollector,
streamId: Int,
- eventRates: Seq[(Long, Double)],
+ recordRates: Seq[(Long, Double)],
minX: Long,
maxX: Long,
minY: Double,
@@ -447,25 +448,25 @@ private[ui] class StreamingPage(parent: StreamingTab)
val receiverLastErrorTime = receiverInfo.map {
r => if (r.lastErrorTime < 0) "-" else SparkUIUtils.formatDate(r.lastErrorTime)
}.getOrElse(emptyCell)
- val receivedRecords = new EventRateUIData(eventRates)
+ val receivedRecords = new RecordRateUIData(recordRates)
- val graphUIDataForEventRate =
+ val graphUIDataForRecordRate =
new GraphUIData(
- s"stream-$streamId-events-timeline",
- s"stream-$streamId-events-histogram",
+ s"stream-$streamId-records-timeline",
+ s"stream-$streamId-records-histogram",
receivedRecords.data,
minX,
maxX,
minY,
maxY,
- "events/sec")
- graphUIDataForEventRate.generateDataJs(jsCollector)
+ "records/sec")
+ graphUIDataForRecordRate.generateDataJs(jsCollector)
<tr>
<td rowspan="2" style="vertical-align: middle; width: 151px;">
<div style="width: 151px;">
<div style="word-wrap: break-word;"><strong>{receiverName}</strong></div>
- <div>Avg: {receivedRecords.formattedAvg} events/sec</div>
+ <div>Avg: {receivedRecords.formattedAvg} records/sec</div>
</div>
</td>
<td>{receiverActive}</td>
@@ -475,9 +476,9 @@ private[ui] class StreamingPage(parent: StreamingTab)
</tr>
<tr>
<td colspan="3" class="timeline">
- {graphUIDataForEventRate.generateTimelineHtml(jsCollector)}
+ {graphUIDataForRecordRate.generateTimelineHtml(jsCollector)}
</td>
- <td class="histogram">{graphUIDataForEventRate.generateHistogramHtml(jsCollector)}</td>
+ <td class="histogram">{graphUIDataForRecordRate.generateHistogramHtml(jsCollector)}</td>
</tr>
}