diff options
author | Andrew Or <andrewor14@gmail.com> | 2014-09-11 17:18:46 -0700 |
---|---|---|
committer | Andrew Or <andrewor14@gmail.com> | 2014-09-11 17:18:46 -0700 |
commit | 6324eb7b5b0ae005cb2e913e36b1508bd6f1b9b8 (patch) | |
tree | 2286711eef4eaf26d023aa609b2f5d11dafdfdec /core | |
parent | 4bc9e046cb8922923dff254e3e621fb4de656f98 (diff) | |
download | spark-6324eb7b5b0ae005cb2e913e36b1508bd6f1b9b8.tar.gz spark-6324eb7b5b0ae005cb2e913e36b1508bd6f1b9b8.tar.bz2 spark-6324eb7b5b0ae005cb2e913e36b1508bd6f1b9b8.zip |
[Spark-3490] Disable SparkUI for tests
We currently open many ephemeral ports during the tests, and as a result we occasionally can't bind to new ones. This has caused the `DriverSuite` and the `SparkSubmitSuite` to fail intermittently.
By disabling the `SparkUI` when it's not needed, we already cut down on the number of ports opened significantly, on the order of the number of `SparkContexts` ever created. We must keep it enabled for a few tests for the UI itself, however.
Author: Andrew Or <andrewor14@gmail.com>
Closes #2363 from andrewor14/disable-ui-for-tests and squashes the following commits:
332a7d5 [Andrew Or] No need to set spark.ui.port to 0 anymore
30c93a2 [Andrew Or] Simplify streaming UISuite
a431b84 [Andrew Or] Fix streaming test failures
8f5ae53 [Andrew Or] Fix no new line at the end
29c9b5b [Andrew Or] Disable SparkUI for tests
Diffstat (limited to 'core')
5 files changed, 47 insertions, 21 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c6c5b8f22b..218b353dd9 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -220,8 +220,14 @@ class SparkContext(config: SparkConf) extends Logging { new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf) // Initialize the Spark UI, registering all associated listeners - private[spark] val ui = new SparkUI(this) - ui.bind() + private[spark] val ui: Option[SparkUI] = + if (conf.getBoolean("spark.ui.enabled", true)) { + Some(new SparkUI(this)) + } else { + // For tests, do not enable the UI + None + } + ui.foreach(_.bind()) /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf) @@ -990,7 +996,7 @@ class SparkContext(config: SparkConf) extends Logging { /** Shut down the SparkContext. */ def stop() { postApplicationEnd() - ui.stop() + ui.foreach(_.stop()) // Do this only if not stopped already - best case effort. // prevent NPE if stopped more than once. val dagSchedulerCopy = dagScheduler diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 5b5257269d..9a0cb1c6c6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -292,7 +292,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase") conf.set("spark.ui.filters", filterName) conf.set(s"spark.$filterName.params", filterParams) - JettyUtils.addFilters(scheduler.sc.ui.getHandlers, conf) + scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) } } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index 513d74a08a..ee10aa061f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -17,7 +17,6 @@ package org.apache.spark.scheduler.cluster -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.spark.{Logging, SparkContext, SparkEnv} @@ -47,16 +46,17 @@ private[spark] class SimrSchedulerBackend( val conf = SparkHadoopUtil.get.newConfiguration(sc.conf) val fs = FileSystem.get(conf) + val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") logInfo("Writing to HDFS file: " + driverFilePath) logInfo("Writing Akka address: " + driverUrl) - logInfo("Writing Spark UI Address: " + sc.ui.appUIAddress) + logInfo("Writing Spark UI Address: " + appUIAddress) // Create temporary file to prevent race condition where executors get empty driverUrl file val temp = fs.create(tmpPath, true) temp.writeUTF(driverUrl) temp.writeInt(maxCores) - temp.writeUTF(sc.ui.appUIAddress) + temp.writeUTF(appUIAddress) temp.close() // "Atomic" rename diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 06872ace2e..2f45d192e1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -67,8 +67,10 @@ private[spark] class SparkDeploySchedulerBackend( val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts) + val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") + val eventLogDir = sc.eventLogger.map(_.logDir) val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, - sc.ui.appUIAddress, sc.eventLogger.map(_.logDir)) + appUIAddress, eventLogDir) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 038746d2ed..2f56642956 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -36,11 +36,25 @@ import scala.xml.Node class UISuite extends FunSuite { + /** + * Create a test SparkContext with the SparkUI enabled. + * It is safe to `get` the SparkUI directly from the SparkContext returned here. + */ + private def newSparkContext(): SparkContext = { + val conf = new SparkConf() + .setMaster("local") + .setAppName("test") + .set("spark.ui.enabled", "true") + val sc = new SparkContext(conf) + assert(sc.ui.isDefined) + sc + } + ignore("basic ui visibility") { - withSpark(new SparkContext("local", "test")) { sc => + withSpark(newSparkContext()) { sc => // test if the ui is visible, and all the expected tabs are visible eventually(timeout(10 seconds), interval(50 milliseconds)) { - val html = Source.fromURL(sc.ui.appUIAddress).mkString + val html = Source.fromURL(sc.ui.get.appUIAddress).mkString assert(!html.contains("random data that should not be present")) assert(html.toLowerCase.contains("stages")) assert(html.toLowerCase.contains("storage")) @@ -51,7 +65,7 @@ class UISuite extends FunSuite { } ignore("visibility at localhost:4040") { - withSpark(new SparkContext("local", "test")) { sc => + withSpark(newSparkContext()) { sc => // test if visible from http://localhost:4040 eventually(timeout(10 seconds), interval(50 milliseconds)) { val html = Source.fromURL("http://localhost:4040").mkString @@ -61,8 +75,8 @@ class UISuite extends FunSuite { } ignore("attaching a new tab") { - withSpark(new SparkContext("local", "test")) { sc => - val sparkUI = sc.ui + withSpark(newSparkContext()) { sc => + val sparkUI = sc.ui.get val newTab = new WebUITab(sparkUI, "foo") { attachPage(new WebUIPage("") { @@ -73,7 +87,7 @@ class UISuite extends FunSuite { } sparkUI.attachTab(newTab) eventually(timeout(10 seconds), interval(50 milliseconds)) { - val html = Source.fromURL(sc.ui.appUIAddress).mkString + val html = Source.fromURL(sparkUI.appUIAddress).mkString assert(!html.contains("random data that should not be present")) // check whether new page exists @@ -87,7 +101,7 @@ class UISuite extends FunSuite { } eventually(timeout(10 seconds), interval(50 milliseconds)) { - val html = Source.fromURL(sc.ui.appUIAddress.stripSuffix("/") + "/foo").mkString + val html = Source.fromURL(sparkUI.appUIAddress.stripSuffix("/") + "/foo").mkString // check whether new page exists assert(html.contains("magic")) } @@ -129,16 +143,20 @@ class UISuite extends FunSuite { } test("verify appUIAddress contains the scheme") { - withSpark(new SparkContext("local", "test")) { sc => - val uiAddress = sc.ui.appUIAddress - assert(uiAddress.equals("http://" + sc.ui.appUIHostPort)) + withSpark(newSparkContext()) { sc => + val ui = sc.ui.get + val uiAddress = ui.appUIAddress + val uiHostPort = ui.appUIHostPort + assert(uiAddress.equals("http://" + uiHostPort)) } } test("verify appUIAddress contains the port") { - withSpark(new SparkContext("local", "test")) { sc => - val splitUIAddress = sc.ui.appUIAddress.split(':') - assert(splitUIAddress(2).toInt == sc.ui.boundPort) + withSpark(newSparkContext()) { sc => + val ui = sc.ui.get + val splitUIAddress = ui.appUIAddress.split(':') + val boundPort = ui.boundPort + assert(splitUIAddress(2).toInt == boundPort) } } } |