aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UISuite.scala44
-rw-r--r--pom.xml2
-rw-r--r--project/SparkBuild.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala11
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala25
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala16
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala2
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala2
13 files changed, 92 insertions, 38 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index c6c5b8f22b..218b353dd9 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -220,8 +220,14 @@ class SparkContext(config: SparkConf) extends Logging {
new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)
// Initialize the Spark UI, registering all associated listeners
- private[spark] val ui = new SparkUI(this)
- ui.bind()
+ private[spark] val ui: Option[SparkUI] =
+ if (conf.getBoolean("spark.ui.enabled", true)) {
+ Some(new SparkUI(this))
+ } else {
+ // For tests, do not enable the UI
+ None
+ }
+ ui.foreach(_.bind())
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
@@ -990,7 +996,7 @@ class SparkContext(config: SparkConf) extends Logging {
/** Shut down the SparkContext. */
def stop() {
postApplicationEnd()
- ui.stop()
+ ui.foreach(_.stop())
// Do this only if not stopped already - best case effort.
// prevent NPE if stopped more than once.
val dagSchedulerCopy = dagScheduler
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 5b5257269d..9a0cb1c6c6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -292,7 +292,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
conf.set("spark.ui.filters", filterName)
conf.set(s"spark.$filterName.params", filterParams)
- JettyUtils.addFilters(scheduler.sc.ui.getHandlers, conf)
+ scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index 513d74a08a..ee10aa061f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -17,7 +17,6 @@
package org.apache.spark.scheduler.cluster
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.{Logging, SparkContext, SparkEnv}
@@ -47,16 +46,17 @@ private[spark] class SimrSchedulerBackend(
val conf = SparkHadoopUtil.get.newConfiguration(sc.conf)
val fs = FileSystem.get(conf)
+ val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
logInfo("Writing to HDFS file: " + driverFilePath)
logInfo("Writing Akka address: " + driverUrl)
- logInfo("Writing Spark UI Address: " + sc.ui.appUIAddress)
+ logInfo("Writing Spark UI Address: " + appUIAddress)
// Create temporary file to prevent race condition where executors get empty driverUrl file
val temp = fs.create(tmpPath, true)
temp.writeUTF(driverUrl)
temp.writeInt(maxCores)
- temp.writeUTF(sc.ui.appUIAddress)
+ temp.writeUTF(appUIAddress)
temp.close()
// "Atomic" rename
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 06872ace2e..2f45d192e1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -67,8 +67,10 @@ private[spark] class SparkDeploySchedulerBackend(
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
+ val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
+ val eventLogDir = sc.eventLogger.map(_.logDir)
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
- sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
+ appUIAddress, eventLogDir)
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 038746d2ed..2f56642956 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -36,11 +36,25 @@ import scala.xml.Node
class UISuite extends FunSuite {
+ /**
+ * Create a test SparkContext with the SparkUI enabled.
+ * It is safe to `get` the SparkUI directly from the SparkContext returned here.
+ */
+ private def newSparkContext(): SparkContext = {
+ val conf = new SparkConf()
+ .setMaster("local")
+ .setAppName("test")
+ .set("spark.ui.enabled", "true")
+ val sc = new SparkContext(conf)
+ assert(sc.ui.isDefined)
+ sc
+ }
+
ignore("basic ui visibility") {
- withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark(newSparkContext()) { sc =>
// test if the ui is visible, and all the expected tabs are visible
eventually(timeout(10 seconds), interval(50 milliseconds)) {
- val html = Source.fromURL(sc.ui.appUIAddress).mkString
+ val html = Source.fromURL(sc.ui.get.appUIAddress).mkString
assert(!html.contains("random data that should not be present"))
assert(html.toLowerCase.contains("stages"))
assert(html.toLowerCase.contains("storage"))
@@ -51,7 +65,7 @@ class UISuite extends FunSuite {
}
ignore("visibility at localhost:4040") {
- withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark(newSparkContext()) { sc =>
// test if visible from http://localhost:4040
eventually(timeout(10 seconds), interval(50 milliseconds)) {
val html = Source.fromURL("http://localhost:4040").mkString
@@ -61,8 +75,8 @@ class UISuite extends FunSuite {
}
ignore("attaching a new tab") {
- withSpark(new SparkContext("local", "test")) { sc =>
- val sparkUI = sc.ui
+ withSpark(newSparkContext()) { sc =>
+ val sparkUI = sc.ui.get
val newTab = new WebUITab(sparkUI, "foo") {
attachPage(new WebUIPage("") {
@@ -73,7 +87,7 @@ class UISuite extends FunSuite {
}
sparkUI.attachTab(newTab)
eventually(timeout(10 seconds), interval(50 milliseconds)) {
- val html = Source.fromURL(sc.ui.appUIAddress).mkString
+ val html = Source.fromURL(sparkUI.appUIAddress).mkString
assert(!html.contains("random data that should not be present"))
// check whether new page exists
@@ -87,7 +101,7 @@ class UISuite extends FunSuite {
}
eventually(timeout(10 seconds), interval(50 milliseconds)) {
- val html = Source.fromURL(sc.ui.appUIAddress.stripSuffix("/") + "/foo").mkString
+ val html = Source.fromURL(sparkUI.appUIAddress.stripSuffix("/") + "/foo").mkString
// check whether new page exists
assert(html.contains("magic"))
}
@@ -129,16 +143,20 @@ class UISuite extends FunSuite {
}
test("verify appUIAddress contains the scheme") {
- withSpark(new SparkContext("local", "test")) { sc =>
- val uiAddress = sc.ui.appUIAddress
- assert(uiAddress.equals("http://" + sc.ui.appUIHostPort))
+ withSpark(newSparkContext()) { sc =>
+ val ui = sc.ui.get
+ val uiAddress = ui.appUIAddress
+ val uiHostPort = ui.appUIHostPort
+ assert(uiAddress.equals("http://" + uiHostPort))
}
}
test("verify appUIAddress contains the port") {
- withSpark(new SparkContext("local", "test")) { sc =>
- val splitUIAddress = sc.ui.appUIAddress.split(':')
- assert(splitUIAddress(2).toInt == sc.ui.boundPort)
+ withSpark(newSparkContext()) { sc =>
+ val ui = sc.ui.get
+ val splitUIAddress = ui.appUIAddress.split(':')
+ val boundPort = ui.boundPort
+ assert(splitUIAddress(2).toInt == boundPort)
}
}
}
diff --git a/pom.xml b/pom.xml
index 64fb1e57e3..e5f863e854 100644
--- a/pom.xml
+++ b/pom.xml
@@ -899,7 +899,7 @@
<java.awt.headless>true</java.awt.headless>
<spark.test.home>${session.executionRootDirectory}</spark.test.home>
<spark.testing>1</spark.testing>
- <spark.ui.port>0</spark.ui.port>
+ <spark.ui.enabled>false</spark.ui.enabled>
</systemProperties>
</configuration>
<executions>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 45f6d2973e..c07ea313f1 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -337,7 +337,7 @@ object TestSettings {
javaOptions in Test += "-Dspark.test.home=" + sparkHome,
javaOptions in Test += "-Dspark.testing=1",
javaOptions in Test += "-Dspark.ports.maxRetries=100",
- javaOptions in Test += "-Dspark.ui.port=0",
+ javaOptions in Test += "-Dspark.ui.enabled=false",
javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true",
javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark")
.map { case (k,v) => s"-D$k=$v" }.toSeq,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 457e8ab28e..f63560dcb5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -37,7 +37,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.{ActorSupervisorStrategy, ActorReceiver, Receiver}
import org.apache.spark.streaming.scheduler._
-import org.apache.spark.streaming.ui.StreamingTab
+import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
import org.apache.spark.util.MetadataCleaner
/**
@@ -158,7 +158,14 @@ class StreamingContext private[streaming] (
private[streaming] val waiter = new ContextWaiter
- private[streaming] val uiTab = new StreamingTab(this)
+ private[streaming] val progressListener = new StreamingJobProgressListener(this)
+
+ private[streaming] val uiTab: Option[StreamingTab] =
+ if (conf.getBoolean("spark.ui.enabled", true)) {
+ Some(new StreamingTab(this))
+ } else {
+ None
+ }
/** Register streaming source to metrics system */
private val streamingSource = new StreamingSource(this)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
index 75f0e8716d..e35a568ddf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
@@ -26,7 +26,7 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
override val metricRegistry = new MetricRegistry
override val sourceName = "%s.StreamingMetrics".format(ssc.sparkContext.appName)
- private val streamingListener = ssc.uiTab.listener
+ private val streamingListener = ssc.progressListener
private def registerGauge[T](name: String, f: StreamingJobProgressListener => T,
defaultValue: T) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
index 34ac254f33..d9d04cd706 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
@@ -17,18 +17,31 @@
package org.apache.spark.streaming.ui
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SparkException}
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.ui.SparkUITab
+import org.apache.spark.ui.{SparkUI, SparkUITab}
-/** Spark Web UI tab that shows statistics of a streaming job */
+import StreamingTab._
+
+/**
+ * Spark Web UI tab that shows statistics of a streaming job.
+ * This assumes the given SparkContext has enabled its SparkUI.
+ */
private[spark] class StreamingTab(ssc: StreamingContext)
- extends SparkUITab(ssc.sc.ui, "streaming") with Logging {
+ extends SparkUITab(getSparkUI(ssc), "streaming") with Logging {
- val parent = ssc.sc.ui
- val listener = new StreamingJobProgressListener(ssc)
+ val parent = getSparkUI(ssc)
+ val listener = ssc.progressListener
ssc.addStreamingListener(listener)
attachPage(new StreamingPage(this))
parent.attachTab(this)
}
+
+private object StreamingTab {
+ def getSparkUI(ssc: StreamingContext): SparkUI = {
+ ssc.sc.ui.getOrElse {
+ throw new SparkException("Parent SparkUI to attach this tab to not found!")
+ }
+ }
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
index 2a0db75649..4c7e43c294 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
@@ -24,13 +24,22 @@ import org.scalatest.FunSuite
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
+import org.apache.spark.SparkConf
+
class UISuite extends FunSuite {
// Ignored: See SPARK-1530
ignore("streaming tab in spark UI") {
- val ssc = new StreamingContext("local", "test", Seconds(1))
+ val conf = new SparkConf()
+ .setMaster("local")
+ .setAppName("test")
+ .set("spark.ui.enabled", "true")
+ val ssc = new StreamingContext(conf, Seconds(1))
+ assert(ssc.sc.ui.isDefined, "Spark UI is not started!")
+ val ui = ssc.sc.ui.get
+
eventually(timeout(10 seconds), interval(50 milliseconds)) {
- val html = Source.fromURL(ssc.sparkContext.ui.appUIAddress).mkString
+ val html = Source.fromURL(ui.appUIAddress).mkString
assert(!html.contains("random data that should not be present"))
// test if streaming tab exist
assert(html.toLowerCase.contains("streaming"))
@@ -39,8 +48,7 @@ class UISuite extends FunSuite {
}
eventually(timeout(10 seconds), interval(50 milliseconds)) {
- val html = Source.fromURL(
- ssc.sparkContext.ui.appUIAddress.stripSuffix("/") + "/streaming").mkString
+ val html = Source.fromURL(ui.appUIAddress.stripSuffix("/") + "/streaming").mkString
assert(html.toLowerCase.contains("batch"))
assert(html.toLowerCase.contains("network"))
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 5756263e89..878b6db546 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -189,7 +189,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
if (sc == null) {
finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
} else {
- registerAM(sc.ui.appUIAddress, securityMgr)
+ registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
try {
userThread.join()
} finally {
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 41c662cd7a..6aa6475fe4 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -55,7 +55,7 @@ private[spark] class YarnClientSchedulerBackend(
val driverHost = conf.get("spark.driver.host")
val driverPort = conf.get("spark.driver.port")
val hostport = driverHost + ":" + driverPort
- conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort)
+ sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.appUIHostPort) }
val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += (