aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-09-11 17:18:46 -0700
committerAndrew Or <andrewor14@gmail.com>2014-09-11 17:18:46 -0700
commit6324eb7b5b0ae005cb2e913e36b1508bd6f1b9b8 (patch)
tree2286711eef4eaf26d023aa609b2f5d11dafdfdec /streaming
parent4bc9e046cb8922923dff254e3e621fb4de656f98 (diff)
downloadspark-6324eb7b5b0ae005cb2e913e36b1508bd6f1b9b8.tar.gz
spark-6324eb7b5b0ae005cb2e913e36b1508bd6f1b9b8.tar.bz2
spark-6324eb7b5b0ae005cb2e913e36b1508bd6f1b9b8.zip
[Spark-3490] Disable SparkUI for tests
We currently open many ephemeral ports during the tests, and as a result we occasionally can't bind to new ones. This has caused the `DriverSuite` and the `SparkSubmitSuite` to fail intermittently. By disabling the `SparkUI` when it's not needed, we already cut down on the number of ports opened significantly, on the order of the number of `SparkContexts` ever created. We must keep it enabled for a few tests for the UI itself, however. Author: Andrew Or <andrewor14@gmail.com> Closes #2363 from andrewor14/disable-ui-for-tests and squashes the following commits: 332a7d5 [Andrew Or] No need to set spark.ui.port to 0 anymore 30c93a2 [Andrew Or] Simplify streaming UISuite a431b84 [Andrew Or] Fix streaming test failures 8f5ae53 [Andrew Or] Fix no new line at the end 29c9b5b [Andrew Or] Disable SparkUI for tests
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala11
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala25
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala16
4 files changed, 41 insertions, 13 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 457e8ab28e..f63560dcb5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -37,7 +37,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.{ActorSupervisorStrategy, ActorReceiver, Receiver}
import org.apache.spark.streaming.scheduler._
-import org.apache.spark.streaming.ui.StreamingTab
+import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
import org.apache.spark.util.MetadataCleaner
/**
@@ -158,7 +158,14 @@ class StreamingContext private[streaming] (
private[streaming] val waiter = new ContextWaiter
- private[streaming] val uiTab = new StreamingTab(this)
+ private[streaming] val progressListener = new StreamingJobProgressListener(this)
+
+ private[streaming] val uiTab: Option[StreamingTab] =
+ if (conf.getBoolean("spark.ui.enabled", true)) {
+ Some(new StreamingTab(this))
+ } else {
+ None
+ }
/** Register streaming source to metrics system */
private val streamingSource = new StreamingSource(this)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
index 75f0e8716d..e35a568ddf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
@@ -26,7 +26,7 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
override val metricRegistry = new MetricRegistry
override val sourceName = "%s.StreamingMetrics".format(ssc.sparkContext.appName)
- private val streamingListener = ssc.uiTab.listener
+ private val streamingListener = ssc.progressListener
private def registerGauge[T](name: String, f: StreamingJobProgressListener => T,
defaultValue: T) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
index 34ac254f33..d9d04cd706 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
@@ -17,18 +17,31 @@
package org.apache.spark.streaming.ui
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SparkException}
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.ui.SparkUITab
+import org.apache.spark.ui.{SparkUI, SparkUITab}
-/** Spark Web UI tab that shows statistics of a streaming job */
+import StreamingTab._
+
+/**
+ * Spark Web UI tab that shows statistics of a streaming job.
+ * This assumes the given SparkContext has enabled its SparkUI.
+ */
private[spark] class StreamingTab(ssc: StreamingContext)
- extends SparkUITab(ssc.sc.ui, "streaming") with Logging {
+ extends SparkUITab(getSparkUI(ssc), "streaming") with Logging {
- val parent = ssc.sc.ui
- val listener = new StreamingJobProgressListener(ssc)
+ val parent = getSparkUI(ssc)
+ val listener = ssc.progressListener
ssc.addStreamingListener(listener)
attachPage(new StreamingPage(this))
parent.attachTab(this)
}
+
+private object StreamingTab {
+ def getSparkUI(ssc: StreamingContext): SparkUI = {
+ ssc.sc.ui.getOrElse {
+ throw new SparkException("Parent SparkUI to attach this tab to not found!")
+ }
+ }
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
index 2a0db75649..4c7e43c294 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
@@ -24,13 +24,22 @@ import org.scalatest.FunSuite
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
+import org.apache.spark.SparkConf
+
class UISuite extends FunSuite {
// Ignored: See SPARK-1530
ignore("streaming tab in spark UI") {
- val ssc = new StreamingContext("local", "test", Seconds(1))
+ val conf = new SparkConf()
+ .setMaster("local")
+ .setAppName("test")
+ .set("spark.ui.enabled", "true")
+ val ssc = new StreamingContext(conf, Seconds(1))
+ assert(ssc.sc.ui.isDefined, "Spark UI is not started!")
+ val ui = ssc.sc.ui.get
+
eventually(timeout(10 seconds), interval(50 milliseconds)) {
- val html = Source.fromURL(ssc.sparkContext.ui.appUIAddress).mkString
+ val html = Source.fromURL(ui.appUIAddress).mkString
assert(!html.contains("random data that should not be present"))
// test if streaming tab exist
assert(html.toLowerCase.contains("streaming"))
@@ -39,8 +48,7 @@ class UISuite extends FunSuite {
}
eventually(timeout(10 seconds), interval(50 milliseconds)) {
- val html = Source.fromURL(
- ssc.sparkContext.ui.appUIAddress.stripSuffix("/") + "/streaming").mkString
+ val html = Source.fromURL(ui.appUIAddress.stripSuffix("/") + "/streaming").mkString
assert(html.toLowerCase.contains("batch"))
assert(html.toLowerCase.contains("network"))
}