aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-05-14 16:58:36 -0700
committerAndrew Or <andrew@databricks.com>2015-05-14 16:58:36 -0700
commitb208f998b5800bdba4ce6651f172c26a8d7d351b (patch)
tree6f32115e2e8aa7d56d54b6526ebfd83f8bc63e4f /streaming
parent0a317c124c3a43089cdb8f079345c8f2842238cd (diff)
downloadspark-b208f998b5800bdba4ce6651f172c26a8d7d351b.tar.gz
spark-b208f998b5800bdba4ce6651f172c26a8d7d351b.tar.bz2
spark-b208f998b5800bdba4ce6651f172c26a8d7d351b.zip
[SPARK-7645] [STREAMING] [WEBUI] Show milliseconds in the UI if the batch interval < 1 second
I also updated the summary of the Streaming page. ![screen shot 2015-05-14 at 11 52 59 am](https://cloud.githubusercontent.com/assets/1000778/7640103/13cdf68e-fa36-11e4-84ec-e2a3954f4319.png) ![screen shot 2015-05-14 at 12 39 33 pm](https://cloud.githubusercontent.com/assets/1000778/7640151/4cc066ac-fa36-11e4-8494-2821d6a6f17c.png) Author: zsxwing <zsxwing@gmail.com> Closes #6154 from zsxwing/SPARK-7645 and squashes the following commits: 5db6ca1 [zsxwing] Add UIUtils.formatBatchTime e4802df [zsxwing] Show milliseconds in the UI if the batch interval < 1 second
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala14
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala55
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala11
5 files changed, 84 insertions, 11 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index 3619e129ad..00cc47d6a3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -17,11 +17,14 @@
package org.apache.spark.streaming.ui
+import java.text.SimpleDateFormat
+import java.util.Date
+
import scala.xml.Node
import org.apache.spark.ui.{UIUtils => SparkUIUtils}
-private[ui] abstract class BatchTableBase(tableId: String) {
+private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) {
protected def columns: Seq[Node] = {
<th>Batch Time</th>
@@ -35,7 +38,7 @@ private[ui] abstract class BatchTableBase(tableId: String) {
protected def baseRow(batch: BatchUIData): Seq[Node] = {
val batchTime = batch.batchTime.milliseconds
- val formattedBatchTime = SparkUIUtils.formatDate(batch.batchTime.milliseconds)
+ val formattedBatchTime = UIUtils.formatBatchTime(batchTime, batchInterval)
val eventCount = batch.numRecords
val schedulingDelay = batch.schedulingDelay
val formattedSchedulingDelay = schedulingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
@@ -79,7 +82,8 @@ private[ui] abstract class BatchTableBase(tableId: String) {
private[ui] class ActiveBatchTable(
runningBatches: Seq[BatchUIData],
- waitingBatches: Seq[BatchUIData]) extends BatchTableBase("active-batches-table") {
+ waitingBatches: Seq[BatchUIData],
+ batchInterval: Long) extends BatchTableBase("active-batches-table", batchInterval) {
override protected def columns: Seq[Node] = super.columns ++ <th>Status</th>
@@ -99,8 +103,8 @@ private[ui] class ActiveBatchTable(
}
}
-private[ui] class CompletedBatchTable(batches: Seq[BatchUIData])
- extends BatchTableBase("completed-batches-table") {
+private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long)
+ extends BatchTableBase("completed-batches-table", batchInterval) {
override protected def columns: Seq[Node] = super.columns ++
<th>Total Delay
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index 831f60e870..f75067669a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -17,6 +17,8 @@
package org.apache.spark.streaming.ui
+import java.text.SimpleDateFormat
+import java.util.Date
import javax.servlet.http.HttpServletRequest
import scala.xml.{NodeSeq, Node, Text}
@@ -288,7 +290,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
val batchTime = Option(request.getParameter("id")).map(id => Time(id.toLong)).getOrElse {
throw new IllegalArgumentException(s"Missing id parameter")
}
- val formattedBatchTime = SparkUIUtils.formatDate(batchTime.milliseconds)
+ val formattedBatchTime =
+ UIUtils.formatBatchTime(batchTime.milliseconds, streamingListener.batchDuration)
val batchUIData = streamingListener.getBatchUIData(batchTime).getOrElse {
throw new IllegalArgumentException(s"Batch $formattedBatchTime does not exist")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index efce8c58fb..070564aa10 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -186,6 +186,8 @@ private[ui] class StreamingPage(parent: StreamingTab)
<strong>
{SparkUIUtils.formatDate(startTime)}
</strong>
+ (<strong>{listener.numTotalCompletedBatches}</strong>
+ completed batches, <strong>{listener.numTotalReceivedRecords}</strong> records)
</div>
<br />
}
@@ -199,9 +201,9 @@ private[ui] class StreamingPage(parent: StreamingTab)
* @param times all time values that will be used in the graphs.
*/
private def generateTimeMap(times: Seq[Long]): Seq[Node] = {
- val dateFormat = new SimpleDateFormat("HH:mm:ss")
val js = "var timeFormat = {};\n" + times.map { time =>
- val formattedTime = dateFormat.format(new Date(time))
+ val formattedTime =
+ UIUtils.formatBatchTime(time, listener.batchDuration, showYYYYMMSS = false)
s"timeFormat[$time] = '$formattedTime';"
}.mkString("\n")
@@ -472,14 +474,14 @@ private[ui] class StreamingPage(parent: StreamingTab)
val activeBatchesContent = {
<h4 id="active">Active Batches ({runningBatches.size + waitingBatches.size})</h4> ++
- new ActiveBatchTable(runningBatches, waitingBatches).toNodeSeq
+ new ActiveBatchTable(runningBatches, waitingBatches, listener.batchDuration).toNodeSeq
}
val completedBatchesContent = {
<h4 id="completed">
Completed Batches (last {completedBatches.size} out of {listener.numTotalCompletedBatches})
</h4> ++
- new CompletedBatchTable(completedBatches).toNodeSeq
+ new CompletedBatchTable(completedBatches, listener.batchDuration).toNodeSeq
}
activeBatchesContent ++ completedBatchesContent
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
index f153ee105a..86cfb1fa47 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
@@ -17,6 +17,8 @@
package org.apache.spark.streaming.ui
+import java.text.SimpleDateFormat
+import java.util.TimeZone
import java.util.concurrent.TimeUnit
private[streaming] object UIUtils {
@@ -62,7 +64,7 @@ private[streaming] object UIUtils {
* Convert `milliseconds` to the specified `unit`. We cannot use `TimeUnit.convert` because it
* will discard the fractional part.
*/
- def convertToTimeUnit(milliseconds: Long, unit: TimeUnit): Double = unit match {
+ def convertToTimeUnit(milliseconds: Long, unit: TimeUnit): Double = unit match {
case TimeUnit.NANOSECONDS => milliseconds * 1000 * 1000
case TimeUnit.MICROSECONDS => milliseconds * 1000
case TimeUnit.MILLISECONDS => milliseconds
@@ -71,4 +73,55 @@ private[streaming] object UIUtils {
case TimeUnit.HOURS => milliseconds / 1000.0 / 60.0 / 60.0
case TimeUnit.DAYS => milliseconds / 1000.0 / 60.0 / 60.0 / 24.0
}
+
+ // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
+ private val batchTimeFormat = new ThreadLocal[SimpleDateFormat]() {
+ override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+ }
+
+ private val batchTimeFormatWithMilliseconds = new ThreadLocal[SimpleDateFormat]() {
+ override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS")
+ }
+
+ /**
+ * If `batchInterval` is less than 1 second, format `batchTime` with milliseconds. Otherwise,
+ * format `batchTime` without milliseconds.
+ *
+ * @param batchTime the batch time to be formatted
+ * @param batchInterval the batch interval
+ * @param showYYYYMMSS if showing the `yyyy/MM/dd` part. If it's false, the return value wll be
+ * only `HH:mm:ss` or `HH:mm:ss.SSS` depending on `batchInterval`
+ * @param timezone only for test
+ */
+ def formatBatchTime(
+ batchTime: Long,
+ batchInterval: Long,
+ showYYYYMMSS: Boolean = true,
+ timezone: TimeZone = null): String = {
+ val oldTimezones =
+ (batchTimeFormat.get.getTimeZone, batchTimeFormatWithMilliseconds.get.getTimeZone)
+ if (timezone != null) {
+ batchTimeFormat.get.setTimeZone(timezone)
+ batchTimeFormatWithMilliseconds.get.setTimeZone(timezone)
+ }
+ try {
+ val formattedBatchTime =
+ if (batchInterval < 1000) {
+ batchTimeFormatWithMilliseconds.get.format(batchTime)
+ } else {
+ // If batchInterval >= 1 second, don't show milliseconds
+ batchTimeFormat.get.format(batchTime)
+ }
+ if (showYYYYMMSS) {
+ formattedBatchTime
+ } else {
+ formattedBatchTime.substring(formattedBatchTime.indexOf(' ') + 1)
+ }
+ } finally {
+ if (timezone != null) {
+ batchTimeFormat.get.setTimeZone(oldTimezones._1)
+ batchTimeFormatWithMilliseconds.get.setTimeZone(oldTimezones._2)
+ }
+ }
+ }
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala
index 6df1a63ab2..e9ab917ab8 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.streaming.ui
+import java.util.TimeZone
import java.util.concurrent.TimeUnit
import org.scalatest.FunSuite
@@ -64,4 +65,14 @@ class UIUtilsSuite extends FunSuite with Matchers{
val convertedTime = UIUtils.convertToTimeUnit(milliseconds, unit)
convertedTime should be (expectedTime +- 1E-6)
}
+
+ test("formatBatchTime") {
+ val tzForTest = TimeZone.getTimeZone("America/Los_Angeles")
+ val batchTime = 1431637480452L // Thu May 14 14:04:40 PDT 2015
+ assert("2015/05/14 14:04:40" === UIUtils.formatBatchTime(batchTime, 1000, timezone = tzForTest))
+ assert("2015/05/14 14:04:40.452" ===
+ UIUtils.formatBatchTime(batchTime, 999, timezone = tzForTest))
+ assert("14:04:40" === UIUtils.formatBatchTime(batchTime, 1000, false, timezone = tzForTest))
+ assert("14:04:40.452" === UIUtils.formatBatchTime(batchTime, 999, false, timezone = tzForTest))
+ }
}