aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-09-12 10:40:03 -0700
committerAndrew Or <andrewor14@gmail.com>2014-09-12 10:40:03 -0700
commitf17b7957a4283952021d9e4106c5bd9994148128 (patch)
tree0e5e3e87b9508483da39b8b5450b553c5172f520 /streaming
parente69deb81842639ee089b518e994080e27a343297 (diff)
downloadspark-f17b7957a4283952021d9e4106c5bd9994148128.tar.gz
spark-f17b7957a4283952021d9e4106c5bd9994148128.tar.bz2
spark-f17b7957a4283952021d9e4106c5bd9994148128.zip
Revert "[Spark-3490] Disable SparkUI for tests"
This reverts commit 2ffc7980c6818eec05e32141c52e335bc71daed9.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala11
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala25
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala16
4 files changed, 13 insertions, 41 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 4fc77bbe1a..101cec1c7a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -37,7 +37,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.{ActorSupervisorStrategy, ActorReceiver, Receiver}
import org.apache.spark.streaming.scheduler._
-import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
+import org.apache.spark.streaming.ui.StreamingTab
import org.apache.spark.util.MetadataCleaner
/**
@@ -158,14 +158,7 @@ class StreamingContext private[streaming] (
private[streaming] val waiter = new ContextWaiter
- private[streaming] val progressListener = new StreamingJobProgressListener(this)
-
- private[streaming] val uiTab: Option[StreamingTab] =
- if (conf.getBoolean("spark.ui.enabled", true)) {
- Some(new StreamingTab(this))
- } else {
- None
- }
+ private[streaming] val uiTab = new StreamingTab(this)
/** Register streaming source to metrics system */
private val streamingSource = new StreamingSource(this)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
index e35a568ddf..75f0e8716d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
@@ -26,7 +26,7 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
override val metricRegistry = new MetricRegistry
override val sourceName = "%s.StreamingMetrics".format(ssc.sparkContext.appName)
- private val streamingListener = ssc.progressListener
+ private val streamingListener = ssc.uiTab.listener
private def registerGauge[T](name: String, f: StreamingJobProgressListener => T,
defaultValue: T) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
index d9d04cd706..34ac254f33 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
@@ -17,31 +17,18 @@
package org.apache.spark.streaming.ui
-import org.apache.spark.{Logging, SparkException}
+import org.apache.spark.Logging
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.ui.{SparkUI, SparkUITab}
+import org.apache.spark.ui.SparkUITab
-import StreamingTab._
-
-/**
- * Spark Web UI tab that shows statistics of a streaming job.
- * This assumes the given SparkContext has enabled its SparkUI.
- */
+/** Spark Web UI tab that shows statistics of a streaming job */
private[spark] class StreamingTab(ssc: StreamingContext)
- extends SparkUITab(getSparkUI(ssc), "streaming") with Logging {
+ extends SparkUITab(ssc.sc.ui, "streaming") with Logging {
- val parent = getSparkUI(ssc)
- val listener = ssc.progressListener
+ val parent = ssc.sc.ui
+ val listener = new StreamingJobProgressListener(ssc)
ssc.addStreamingListener(listener)
attachPage(new StreamingPage(this))
parent.attachTab(this)
}
-
-private object StreamingTab {
- def getSparkUI(ssc: StreamingContext): SparkUI = {
- ssc.sc.ui.getOrElse {
- throw new SparkException("Parent SparkUI to attach this tab to not found!")
- }
- }
-}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
index 4c7e43c294..2a0db75649 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
@@ -24,22 +24,13 @@ import org.scalatest.FunSuite
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
-import org.apache.spark.SparkConf
-
class UISuite extends FunSuite {
// Ignored: See SPARK-1530
ignore("streaming tab in spark UI") {
- val conf = new SparkConf()
- .setMaster("local")
- .setAppName("test")
- .set("spark.ui.enabled", "true")
- val ssc = new StreamingContext(conf, Seconds(1))
- assert(ssc.sc.ui.isDefined, "Spark UI is not started!")
- val ui = ssc.sc.ui.get
-
+ val ssc = new StreamingContext("local", "test", Seconds(1))
eventually(timeout(10 seconds), interval(50 milliseconds)) {
- val html = Source.fromURL(ui.appUIAddress).mkString
+ val html = Source.fromURL(ssc.sparkContext.ui.appUIAddress).mkString
assert(!html.contains("random data that should not be present"))
// test if streaming tab exist
assert(html.toLowerCase.contains("streaming"))
@@ -48,7 +39,8 @@ class UISuite extends FunSuite {
}
eventually(timeout(10 seconds), interval(50 milliseconds)) {
- val html = Source.fromURL(ui.appUIAddress.stripSuffix("/") + "/streaming").mkString
+ val html = Source.fromURL(
+ ssc.sparkContext.ui.appUIAddress.stripSuffix("/") + "/streaming").mkString
assert(html.toLowerCase.contains("batch"))
assert(html.toLowerCase.contains("network"))
}