aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-04-13 12:21:29 +0100
committerSean Owen <sowen@cloudera.com>2015-04-13 12:21:29 +0100
commit14ce3ea2c9546c58203af85aceb76b1bfc1f650a (patch)
tree6e95e4030e0e67ea8307239c375551cdec09e3ae /streaming/src
parentcadd7d72c52ccc8d2def405a77dcf807fb5c17c2 (diff)
downloadspark-14ce3ea2c9546c58203af85aceb76b1bfc1f650a.tar.gz
spark-14ce3ea2c9546c58203af85aceb76b1bfc1f650a.tar.bz2
spark-14ce3ea2c9546c58203af85aceb76b1bfc1f650a.zip
[SPARK-6860][Streaming][WebUI] Fix the possible inconsistency of StreamingPage
Because `StreamingPage.render` doesn't hold the `listener` lock when generating the content, the different parts of content may have some inconsistent values if `listener` updates its status at the same time. And it will confuse people. This PR added `listener.synchronized` to make sure we have a consistent view of StreamingJobProgressListener when creating the content. Author: zsxwing <zsxwing@gmail.com> Closes #5470 from zsxwing/SPARK-6860 and squashes the following commits: cec6f92 [zsxwing] Add missing 'synchronized' in StreamingJobProgressListener 7182498 [zsxwing] Add synchronized to make sure we have a consistent view of StreamingJobProgressListener when creating the content
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala3
2 files changed, 7 insertions, 6 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 84f80e638f..be1e8686cf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -149,7 +149,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}.toMap
}
- def lastReceivedBatchRecords: Map[Int, Long] = {
+ def lastReceivedBatchRecords: Map[Int, Long] = synchronized {
val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo)
lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
(0 until numReceivers).map { receiverId =>
@@ -160,19 +160,19 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
}
- def receiverInfo(receiverId: Int): Option[ReceiverInfo] = {
+ def receiverInfo(receiverId: Int): Option[ReceiverInfo] = synchronized {
receiverInfos.get(receiverId)
}
- def lastCompletedBatch: Option[BatchInfo] = {
+ def lastCompletedBatch: Option[BatchInfo] = synchronized {
completedBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
}
- def lastReceivedBatch: Option[BatchInfo] = {
+ def lastReceivedBatch: Option[BatchInfo] = synchronized {
retainedBatches.lastOption
}
- private def retainedBatches: Seq[BatchInfo] = synchronized {
+ private def retainedBatches: Seq[BatchInfo] = {
(waitingBatchInfos.values.toSeq ++
runningBatchInfos.values.toSeq ++ completedBatchInfos).sortBy(_.batchTime)(Time.ordering)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index bfe8086fcf..b6dcb62bfe 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -37,11 +37,12 @@ private[ui] class StreamingPage(parent: StreamingTab)
/** Render the page */
def render(request: HttpServletRequest): Seq[Node] = {
- val content =
+ val content = listener.synchronized {
generateBasicStats() ++ <br></br> ++
<h4>Statistics over last {listener.retainedCompletedBatches.size} processed batches</h4> ++
generateReceiverStats() ++
generateBatchStatsTable()
+ }
UIUtils.headerSparkPage("Streaming", content, parent, Some(5000))
}