diff options
author | Andrew Or <andrewor14@gmail.com> | 2014-09-12 10:40:03 -0700 |
---|---|---|
committer | Andrew Or <andrewor14@gmail.com> | 2014-09-12 10:40:03 -0700 |
commit | f17b7957a4283952021d9e4106c5bd9994148128 (patch) | |
tree | 0e5e3e87b9508483da39b8b5450b553c5172f520 /streaming | |
parent | e69deb81842639ee089b518e994080e27a343297 (diff) | |
download | spark-f17b7957a4283952021d9e4106c5bd9994148128.tar.gz spark-f17b7957a4283952021d9e4106c5bd9994148128.tar.bz2 spark-f17b7957a4283952021d9e4106c5bd9994148128.zip |
Revert "[Spark-3490] Disable SparkUI for tests"
This reverts commit 2ffc7980c6818eec05e32141c52e335bc71daed9.
Diffstat (limited to 'streaming')
4 files changed, 13 insertions, 41 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 4fc77bbe1a..101cec1c7a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -37,7 +37,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receiver.{ActorSupervisorStrategy, ActorReceiver, Receiver} import org.apache.spark.streaming.scheduler._ -import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab} +import org.apache.spark.streaming.ui.StreamingTab import org.apache.spark.util.MetadataCleaner /** @@ -158,14 +158,7 @@ class StreamingContext private[streaming] ( private[streaming] val waiter = new ContextWaiter - private[streaming] val progressListener = new StreamingJobProgressListener(this) - - private[streaming] val uiTab: Option[StreamingTab] = - if (conf.getBoolean("spark.ui.enabled", true)) { - Some(new StreamingTab(this)) - } else { - None - } + private[streaming] val uiTab = new StreamingTab(this) /** Register streaming source to metrics system */ private val streamingSource = new StreamingSource(this) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala index e35a568ddf..75f0e8716d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala @@ -26,7 +26,7 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { override val metricRegistry = new MetricRegistry override val sourceName = "%s.StreamingMetrics".format(ssc.sparkContext.appName) - private val streamingListener = ssc.progressListener + private val streamingListener = ssc.uiTab.listener private def registerGauge[T](name: String, f: StreamingJobProgressListener => T, defaultValue: T) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala index d9d04cd706..34ac254f33 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala @@ -17,31 +17,18 @@ package org.apache.spark.streaming.ui -import org.apache.spark.{Logging, SparkException} +import org.apache.spark.Logging import org.apache.spark.streaming.StreamingContext -import org.apache.spark.ui.{SparkUI, SparkUITab} +import org.apache.spark.ui.SparkUITab -import StreamingTab._ - -/** - * Spark Web UI tab that shows statistics of a streaming job. - * This assumes the given SparkContext has enabled its SparkUI. - */ +/** Spark Web UI tab that shows statistics of a streaming job */ private[spark] class StreamingTab(ssc: StreamingContext) - extends SparkUITab(getSparkUI(ssc), "streaming") with Logging { + extends SparkUITab(ssc.sc.ui, "streaming") with Logging { - val parent = getSparkUI(ssc) - val listener = ssc.progressListener + val parent = ssc.sc.ui + val listener = new StreamingJobProgressListener(ssc) ssc.addStreamingListener(listener) attachPage(new StreamingPage(this)) parent.attachTab(this) } - -private object StreamingTab { - def getSparkUI(ssc: StreamingContext): SparkUI = { - ssc.sc.ui.getOrElse { - throw new SparkException("Parent SparkUI to attach this tab to not found!") - } - } -} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala index 4c7e43c294..2a0db75649 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala @@ -24,22 +24,13 @@ import org.scalatest.FunSuite import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ -import org.apache.spark.SparkConf - class UISuite extends FunSuite { // Ignored: See SPARK-1530 ignore("streaming tab in spark UI") { - val conf = new SparkConf() - .setMaster("local") - .setAppName("test") - .set("spark.ui.enabled", "true") - val ssc = new StreamingContext(conf, Seconds(1)) - assert(ssc.sc.ui.isDefined, "Spark UI is not started!") - val ui = ssc.sc.ui.get - + val ssc = new StreamingContext("local", "test", Seconds(1)) eventually(timeout(10 seconds), interval(50 milliseconds)) { - val html = Source.fromURL(ui.appUIAddress).mkString + val html = Source.fromURL(ssc.sparkContext.ui.appUIAddress).mkString assert(!html.contains("random data that should not be present")) // test if streaming tab exist assert(html.toLowerCase.contains("streaming")) @@ -48,7 +39,8 @@ class UISuite extends FunSuite { } eventually(timeout(10 seconds), interval(50 milliseconds)) { - val html = Source.fromURL(ui.appUIAddress.stripSuffix("/") + "/streaming").mkString + val html = Source.fromURL( + ssc.sparkContext.ui.appUIAddress.stripSuffix("/") + "/streaming").mkString assert(html.toLowerCase.contains("batch")) assert(html.toLowerCase.contains("network")) } |