aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorLiwei Lin <lwlin7@gmail.com>2016-04-01 15:26:22 -0700
committerSean Owen <sowen@cloudera.com>2016-04-01 15:26:22 -0700
commit19f32f2d99c3620c0e562a98f7890316ddad1de9 (patch)
tree70fb0d0b73248959e846824e12d88c42a69d8896 /streaming/src
parentc16a396886672493df694f3ca30478c8edb771f0 (diff)
downloadspark-19f32f2d99c3620c0e562a98f7890316ddad1de9.tar.gz
spark-19f32f2d99c3620c0e562a98f7890316ddad1de9.tar.bz2
spark-19f32f2d99c3620c0e562a98f7890316ddad1de9.zip
[SPARK-12857][STREAMING] Standardize "records" and "events" on "records"
## What changes were proposed in this pull request? Currently the Streaming tab in web UI uses records and events interchangeably; this PR tries to standardize them on "records". "records" is chosen over "events" because: - "records" is used extensively throughout the streaming documents, codes, and comments - "events" is used only in Streaming UI related codes and comments ## How was this patch tested? - existing test suites - manually checking on the Streaming UI tab Author: Liwei Lin <lwlin7@gmail.com> Closes #12032 from lw-lin/streaming-events-to-records.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala65
4 files changed, 41 insertions, 40 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
index b2189103a0..0a861f22b1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
@@ -52,7 +52,7 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
* Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by
* {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that.
*
- * @param newRate A new rate in events per second. It has no effect if it's 0 or negative.
+ * @param newRate A new rate in records per second. It has no effect if it's 0 or negative.
*/
private[receiver] def updateRate(newRate: Long): Unit =
if (newRate > 0) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index d339723427..c024b4ef7e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -52,7 +52,7 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long)
protected def baseRow(batch: BatchUIData): Seq[Node] = {
val batchTime = batch.batchTime.milliseconds
val formattedBatchTime = UIUtils.formatBatchTime(batchTime, batchInterval)
- val eventCount = batch.numRecords
+ val numRecords = batch.numRecords
val schedulingDelay = batch.schedulingDelay
val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
val processingTime = batch.processingDelay
@@ -65,7 +65,7 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long)
{formattedBatchTime}
</a>
</td>
- <td sorttable_customkey={eventCount.toString}>{eventCount.toString} events</td>
+ <td sorttable_customkey={numRecords.toString}>{numRecords.toString} records</td>
<td sorttable_customkey={schedulingDelay.getOrElse(Long.MaxValue).toString}>
{formattedSchedulingDelay}
</td>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index d6fcc582b9..6985c37f71 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -202,21 +202,21 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
def streamIds: Seq[Int] = ssc.graph.getInputStreams().map(_.id)
/**
- * Return all of the event rates for each InputDStream in each batch. The key of the return value
- * is the stream id, and the value is a sequence of batch time with its event rate.
+ * Return all of the record rates for each InputDStream in each batch. The key of the return value
+ * is the stream id, and the value is a sequence of batch time with its record rate.
*/
- def receivedEventRateWithBatchTime: Map[Int, Seq[(Long, Double)]] = synchronized {
+ def receivedRecordRateWithBatchTime: Map[Int, Seq[(Long, Double)]] = synchronized {
val _retainedBatches = retainedBatches
val latestBatches = _retainedBatches.map { batchUIData =>
(batchUIData.batchTime.milliseconds, batchUIData.streamIdToInputInfo.mapValues(_.numRecords))
}
streamIds.map { streamId =>
- val eventRates = latestBatches.map {
+ val recordRates = latestBatches.map {
case (batchTime, streamIdToNumRecords) =>
val numRecords = streamIdToNumRecords.getOrElse(streamId, 0L)
(batchTime, numRecords * 1000.0 / batchDuration)
}
- (streamId, eventRates)
+ (streamId, recordRates)
}.toMap
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index fa40436221..b97e24f28b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -125,9 +125,9 @@ private[ui] class MillisecondsStatUIData(data: Seq[(Long, Long)]) {
* A helper class for "input rate" to generate data that will be used in the timeline and histogram
* graphs.
*
- * @param data (batchTime, event-rate).
+ * @param data (batch time, record rate).
*/
-private[ui] class EventRateUIData(val data: Seq[(Long, Double)]) {
+private[ui] class RecordRateUIData(val data: Seq[(Long, Double)]) {
val avg: Option[Double] = if (data.isEmpty) None else Some(data.map(_._2).sum / data.size)
@@ -215,7 +215,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
val minBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.min
val maxBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.max
- val eventRateForAllStreams = new EventRateUIData(batches.map { batchInfo =>
+ val recordRateForAllStreams = new RecordRateUIData(batches.map { batchInfo =>
(batchInfo.batchTime.milliseconds, batchInfo.numRecords * 1000.0 / listener.batchDuration)
})
@@ -241,24 +241,24 @@ private[ui] class StreamingPage(parent: StreamingTab)
// Use the max input rate for all InputDStreams' graphs to make the Y axis ranges same.
// If it's not an integral number, just use its ceil integral number.
- val maxEventRate = eventRateForAllStreams.max.map(_.ceil.toLong).getOrElse(0L)
- val minEventRate = 0L
+ val maxRecordRate = recordRateForAllStreams.max.map(_.ceil.toLong).getOrElse(0L)
+ val minRecordRate = 0L
val batchInterval = UIUtils.convertToTimeUnit(listener.batchDuration, normalizedUnit)
val jsCollector = new JsCollector
- val graphUIDataForEventRateOfAllStreams =
+ val graphUIDataForRecordRateOfAllStreams =
new GraphUIData(
- "all-stream-events-timeline",
- "all-stream-events-histogram",
- eventRateForAllStreams.data,
+ "all-stream-records-timeline",
+ "all-stream-records-histogram",
+ recordRateForAllStreams.data,
minBatchTime,
maxBatchTime,
- minEventRate,
- maxEventRate,
- "events/sec")
- graphUIDataForEventRateOfAllStreams.generateDataJs(jsCollector)
+ minRecordRate,
+ maxRecordRate,
+ "records/sec")
+ graphUIDataForRecordRateOfAllStreams.generateDataJs(jsCollector)
val graphUIDataForSchedulingDelay =
new GraphUIData(
@@ -334,16 +334,16 @@ private[ui] class StreamingPage(parent: StreamingTab)
<div>Receivers: {listener.numActiveReceivers} / {numReceivers} active</div>
}
}
- <div>Avg: {eventRateForAllStreams.formattedAvg} events/sec</div>
+ <div>Avg: {recordRateForAllStreams.formattedAvg} records/sec</div>
</div>
</td>
- <td class="timeline">{graphUIDataForEventRateOfAllStreams.generateTimelineHtml(jsCollector)}</td>
- <td class="histogram">{graphUIDataForEventRateOfAllStreams.generateHistogramHtml(jsCollector)}</td>
+ <td class="timeline">{graphUIDataForRecordRateOfAllStreams.generateTimelineHtml(jsCollector)}</td>
+ <td class="histogram">{graphUIDataForRecordRateOfAllStreams.generateHistogramHtml(jsCollector)}</td>
</tr>
{if (hasStream) {
<tr id="inputs-table" style="display: none;" >
<td colspan="3">
- {generateInputDStreamsTable(jsCollector, minBatchTime, maxBatchTime, minEventRate, maxEventRate)}
+ {generateInputDStreamsTable(jsCollector, minBatchTime, maxBatchTime, minRecordRate, maxRecordRate)}
</td>
</tr>
}}
@@ -390,15 +390,16 @@ private[ui] class StreamingPage(parent: StreamingTab)
maxX: Long,
minY: Double,
maxY: Double): Seq[Node] = {
- val maxYCalculated = listener.receivedEventRateWithBatchTime.values
- .flatMap { case streamAndRates => streamAndRates.map { case (_, eventRate) => eventRate } }
+ val maxYCalculated = listener.receivedRecordRateWithBatchTime.values
+ .flatMap { case streamAndRates => streamAndRates.map { case (_, recordRate) => recordRate } }
.reduceOption[Double](math.max)
.map(_.ceil.toLong)
.getOrElse(0L)
- val content = listener.receivedEventRateWithBatchTime.toList.sortBy(_._1).map {
- case (streamId, eventRates) =>
- generateInputDStreamRow(jsCollector, streamId, eventRates, minX, maxX, minY, maxYCalculated)
+ val content = listener.receivedRecordRateWithBatchTime.toList.sortBy(_._1).map {
+ case (streamId, recordRates) =>
+ generateInputDStreamRow(
+ jsCollector, streamId, recordRates, minX, maxX, minY, maxYCalculated)
}.foldLeft[Seq[Node]](Nil)(_ ++ _)
// scalastyle:off
@@ -422,7 +423,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
private def generateInputDStreamRow(
jsCollector: JsCollector,
streamId: Int,
- eventRates: Seq[(Long, Double)],
+ recordRates: Seq[(Long, Double)],
minX: Long,
maxX: Long,
minY: Double,
@@ -447,25 +448,25 @@ private[ui] class StreamingPage(parent: StreamingTab)
val receiverLastErrorTime = receiverInfo.map {
r => if (r.lastErrorTime < 0) "-" else SparkUIUtils.formatDate(r.lastErrorTime)
}.getOrElse(emptyCell)
- val receivedRecords = new EventRateUIData(eventRates)
+ val receivedRecords = new RecordRateUIData(recordRates)
- val graphUIDataForEventRate =
+ val graphUIDataForRecordRate =
new GraphUIData(
- s"stream-$streamId-events-timeline",
- s"stream-$streamId-events-histogram",
+ s"stream-$streamId-records-timeline",
+ s"stream-$streamId-records-histogram",
receivedRecords.data,
minX,
maxX,
minY,
maxY,
- "events/sec")
- graphUIDataForEventRate.generateDataJs(jsCollector)
+ "records/sec")
+ graphUIDataForRecordRate.generateDataJs(jsCollector)
<tr>
<td rowspan="2" style="vertical-align: middle; width: 151px;">
<div style="width: 151px;">
<div style="word-wrap: break-word;"><strong>{receiverName}</strong></div>
- <div>Avg: {receivedRecords.formattedAvg} events/sec</div>
+ <div>Avg: {receivedRecords.formattedAvg} records/sec</div>
</div>
</td>
<td>{receiverActive}</td>
@@ -475,9 +476,9 @@ private[ui] class StreamingPage(parent: StreamingTab)
</tr>
<tr>
<td colspan="3" class="timeline">
- {graphUIDataForEventRate.generateTimelineHtml(jsCollector)}
+ {graphUIDataForRecordRate.generateTimelineHtml(jsCollector)}
</td>
- <td class="histogram">{graphUIDataForEventRate.generateHistogramHtml(jsCollector)}</td>
+ <td class="histogram">{graphUIDataForRecordRate.generateHistogramHtml(jsCollector)}</td>
</tr>
}