aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala3
2 files changed, 7 insertions, 6 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 84f80e638f..be1e8686cf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -149,7 +149,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}.toMap
}
- def lastReceivedBatchRecords: Map[Int, Long] = {
+ def lastReceivedBatchRecords: Map[Int, Long] = synchronized {
val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo)
lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
(0 until numReceivers).map { receiverId =>
@@ -160,19 +160,19 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
}
- def receiverInfo(receiverId: Int): Option[ReceiverInfo] = {
+ def receiverInfo(receiverId: Int): Option[ReceiverInfo] = synchronized {
receiverInfos.get(receiverId)
}
- def lastCompletedBatch: Option[BatchInfo] = {
+ def lastCompletedBatch: Option[BatchInfo] = synchronized {
completedBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
}
- def lastReceivedBatch: Option[BatchInfo] = {
+ def lastReceivedBatch: Option[BatchInfo] = synchronized {
retainedBatches.lastOption
}
- private def retainedBatches: Seq[BatchInfo] = synchronized {
+ private def retainedBatches: Seq[BatchInfo] = {
(waitingBatchInfos.values.toSeq ++
runningBatchInfos.values.toSeq ++ completedBatchInfos).sortBy(_.batchTime)(Time.ordering)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index bfe8086fcf..b6dcb62bfe 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -37,11 +37,12 @@ private[ui] class StreamingPage(parent: StreamingTab)
/** Render the page */
def render(request: HttpServletRequest): Seq[Node] = {
- val content =
+ val content = listener.synchronized {
generateBasicStats() ++ <br></br> ++
<h4>Statistics over last {listener.retainedCompletedBatches.size} processed batches</h4> ++
generateReceiverStats() ++
generateBatchStatsTable()
+ }
UIUtils.headerSparkPage("Streaming", content, parent, Some(5000))
}