From 19f32f2d99c3620c0e562a98f7890316ddad1de9 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Fri, 1 Apr 2016 15:26:22 -0700 Subject: [SPARK-12857][STREAMING] Standardize "records" and "events" on "records" ## What changes were proposed in this pull request? Currently the Streaming tab in web UI uses records and events interchangeably; this PR tries to standardize them on "records". "records" is chosen over "events" because: - "records" is used extensively throughout the streaming documents, codes, and comments - "events" is used only in Streaming UI related codes and comments ## How was this patch tested? - existing test suites - manually checking on the Streaming UI tab Author: Liwei Lin Closes #12032 from lw-lin/streaming-events-to-records. --- .../spark/streaming/receiver/RateLimiter.scala | 2 +- .../spark/streaming/ui/AllBatchesTable.scala | 4 +- .../ui/StreamingJobProgressListener.scala | 10 ++-- .../apache/spark/streaming/ui/StreamingPage.scala | 65 +++++++++++----------- 4 files changed, 41 insertions(+), 40 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala index b2189103a0..0a861f22b1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -52,7 +52,7 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { * Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by * {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that. * - * @param newRate A new rate in events per second. It has no effect if it's 0 or negative. + * @param newRate A new rate in records per second. It has no effect if it's 0 or negative. */ private[receiver] def updateRate(newRate: Long): Unit = if (newRate > 0) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala index d339723427..c024b4ef7e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala @@ -52,7 +52,7 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) protected def baseRow(batch: BatchUIData): Seq[Node] = { val batchTime = batch.batchTime.milliseconds val formattedBatchTime = UIUtils.formatBatchTime(batchTime, batchInterval) - val eventCount = batch.numRecords + val numRecords = batch.numRecords val schedulingDelay = batch.schedulingDelay val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-") val processingTime = batch.processingDelay @@ -65,7 +65,7 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) {formattedBatchTime} - {eventCount.toString} events + {numRecords.toString} records {formattedSchedulingDelay} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index d6fcc582b9..6985c37f71 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -202,21 +202,21 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) def streamIds: Seq[Int] = ssc.graph.getInputStreams().map(_.id) /** - * Return all of the event rates for each InputDStream in each batch. The key of the return value - * is the stream id, and the value is a sequence of batch time with its event rate. + * Return all of the record rates for each InputDStream in each batch. The key of the return value + * is the stream id, and the value is a sequence of batch time with its record rate. */ - def receivedEventRateWithBatchTime: Map[Int, Seq[(Long, Double)]] = synchronized { + def receivedRecordRateWithBatchTime: Map[Int, Seq[(Long, Double)]] = synchronized { val _retainedBatches = retainedBatches val latestBatches = _retainedBatches.map { batchUIData => (batchUIData.batchTime.milliseconds, batchUIData.streamIdToInputInfo.mapValues(_.numRecords)) } streamIds.map { streamId => - val eventRates = latestBatches.map { + val recordRates = latestBatches.map { case (batchTime, streamIdToNumRecords) => val numRecords = streamIdToNumRecords.getOrElse(streamId, 0L) (batchTime, numRecords * 1000.0 / batchDuration) } - (streamId, eventRates) + (streamId, recordRates) }.toMap } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index fa40436221..b97e24f28b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -125,9 +125,9 @@ private[ui] class MillisecondsStatUIData(data: Seq[(Long, Long)]) { * A helper class for "input rate" to generate data that will be used in the timeline and histogram * graphs. * - * @param data (batchTime, event-rate). + * @param data (batch time, record rate). */ -private[ui] class EventRateUIData(val data: Seq[(Long, Double)]) { +private[ui] class RecordRateUIData(val data: Seq[(Long, Double)]) { val avg: Option[Double] = if (data.isEmpty) None else Some(data.map(_._2).sum / data.size) @@ -215,7 +215,7 @@ private[ui] class StreamingPage(parent: StreamingTab) val minBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.min val maxBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.max - val eventRateForAllStreams = new EventRateUIData(batches.map { batchInfo => + val recordRateForAllStreams = new RecordRateUIData(batches.map { batchInfo => (batchInfo.batchTime.milliseconds, batchInfo.numRecords * 1000.0 / listener.batchDuration) }) @@ -241,24 +241,24 @@ private[ui] class StreamingPage(parent: StreamingTab) // Use the max input rate for all InputDStreams' graphs to make the Y axis ranges same. // If it's not an integral number, just use its ceil integral number. - val maxEventRate = eventRateForAllStreams.max.map(_.ceil.toLong).getOrElse(0L) - val minEventRate = 0L + val maxRecordRate = recordRateForAllStreams.max.map(_.ceil.toLong).getOrElse(0L) + val minRecordRate = 0L val batchInterval = UIUtils.convertToTimeUnit(listener.batchDuration, normalizedUnit) val jsCollector = new JsCollector - val graphUIDataForEventRateOfAllStreams = + val graphUIDataForRecordRateOfAllStreams = new GraphUIData( - "all-stream-events-timeline", - "all-stream-events-histogram", - eventRateForAllStreams.data, + "all-stream-records-timeline", + "all-stream-records-histogram", + recordRateForAllStreams.data, minBatchTime, maxBatchTime, - minEventRate, - maxEventRate, - "events/sec") - graphUIDataForEventRateOfAllStreams.generateDataJs(jsCollector) + minRecordRate, + maxRecordRate, + "records/sec") + graphUIDataForRecordRateOfAllStreams.generateDataJs(jsCollector) val graphUIDataForSchedulingDelay = new GraphUIData( @@ -334,16 +334,16 @@ private[ui] class StreamingPage(parent: StreamingTab)
Receivers: {listener.numActiveReceivers} / {numReceivers} active
} } -
Avg: {eventRateForAllStreams.formattedAvg} events/sec
+
Avg: {recordRateForAllStreams.formattedAvg} records/sec
- {graphUIDataForEventRateOfAllStreams.generateTimelineHtml(jsCollector)} - {graphUIDataForEventRateOfAllStreams.generateHistogramHtml(jsCollector)} + {graphUIDataForRecordRateOfAllStreams.generateTimelineHtml(jsCollector)} + {graphUIDataForRecordRateOfAllStreams.generateHistogramHtml(jsCollector)} {if (hasStream) { - {generateInputDStreamsTable(jsCollector, minBatchTime, maxBatchTime, minEventRate, maxEventRate)} + {generateInputDStreamsTable(jsCollector, minBatchTime, maxBatchTime, minRecordRate, maxRecordRate)} }} @@ -390,15 +390,16 @@ private[ui] class StreamingPage(parent: StreamingTab) maxX: Long, minY: Double, maxY: Double): Seq[Node] = { - val maxYCalculated = listener.receivedEventRateWithBatchTime.values - .flatMap { case streamAndRates => streamAndRates.map { case (_, eventRate) => eventRate } } + val maxYCalculated = listener.receivedRecordRateWithBatchTime.values + .flatMap { case streamAndRates => streamAndRates.map { case (_, recordRate) => recordRate } } .reduceOption[Double](math.max) .map(_.ceil.toLong) .getOrElse(0L) - val content = listener.receivedEventRateWithBatchTime.toList.sortBy(_._1).map { - case (streamId, eventRates) => - generateInputDStreamRow(jsCollector, streamId, eventRates, minX, maxX, minY, maxYCalculated) + val content = listener.receivedRecordRateWithBatchTime.toList.sortBy(_._1).map { + case (streamId, recordRates) => + generateInputDStreamRow( + jsCollector, streamId, recordRates, minX, maxX, minY, maxYCalculated) }.foldLeft[Seq[Node]](Nil)(_ ++ _) // scalastyle:off @@ -422,7 +423,7 @@ private[ui] class StreamingPage(parent: StreamingTab) private def generateInputDStreamRow( jsCollector: JsCollector, streamId: Int, - eventRates: Seq[(Long, Double)], + recordRates: Seq[(Long, Double)], minX: Long, maxX: Long, minY: Double, @@ -447,25 +448,25 @@ private[ui] class StreamingPage(parent: StreamingTab) val receiverLastErrorTime = receiverInfo.map { r => if (r.lastErrorTime < 0) "-" else SparkUIUtils.formatDate(r.lastErrorTime) }.getOrElse(emptyCell) - val receivedRecords = new EventRateUIData(eventRates) + val receivedRecords = new RecordRateUIData(recordRates) - val graphUIDataForEventRate = + val graphUIDataForRecordRate = new GraphUIData( - s"stream-$streamId-events-timeline", - s"stream-$streamId-events-histogram", + s"stream-$streamId-records-timeline", + s"stream-$streamId-records-histogram", receivedRecords.data, minX, maxX, minY, maxY, - "events/sec") - graphUIDataForEventRate.generateDataJs(jsCollector) + "records/sec") + graphUIDataForRecordRate.generateDataJs(jsCollector)
{receiverName}
-
Avg: {receivedRecords.formattedAvg} events/sec
+
Avg: {receivedRecords.formattedAvg} records/sec
{receiverActive} @@ -475,9 +476,9 @@ private[ui] class StreamingPage(parent: StreamingTab) - {graphUIDataForEventRate.generateTimelineHtml(jsCollector)} + {graphUIDataForRecordRate.generateTimelineHtml(jsCollector)} - {graphUIDataForEventRate.generateHistogramHtml(jsCollector)} + {graphUIDataForRecordRate.generateHistogramHtml(jsCollector)} } -- cgit v1.2.3