aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala8
1 files changed, 7 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 88a4483e80..b3692c3ea3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -392,9 +392,15 @@ private[ui] class StreamingPage(parent: StreamingTab)
maxX: Long,
minY: Double,
maxY: Double): Seq[Node] = {
+ val maxYCalculated = listener.receivedEventRateWithBatchTime.values
+ .flatMap { case streamAndRates => streamAndRates.map { case (_, eventRate) => eventRate } }
+ .reduceOption[Double](math.max)
+ .map(_.ceil.toLong)
+ .getOrElse(0L)
+
val content = listener.receivedEventRateWithBatchTime.toList.sortBy(_._1).map {
case (streamId, eventRates) =>
- generateInputDStreamRow(jsCollector, streamId, eventRates, minX, maxX, minY, maxY)
+ generateInputDStreamRow(jsCollector, streamId, eventRates, minX, maxX, minY, maxYCalculated)
}.foldLeft[Seq[Node]](Nil)(_ ++ _)
// scalastyle:off