aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-03-05 12:04:00 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-03-05 12:04:00 -0800
commit424a86a1ed2a3e6dd54cf8b09fe2f13a1311b7e6 (patch)
treeb5d5edc4eec007c95f20899731dec7551cf271c4
parent0bfacd5c5dd7d10a69bcbcbda630f0843d1cf285 (diff)
downloadspark-424a86a1ed2a3e6dd54cf8b09fe2f13a1311b7e6.tar.gz
spark-424a86a1ed2a3e6dd54cf8b09fe2f13a1311b7e6.tar.bz2
spark-424a86a1ed2a3e6dd54cf8b09fe2f13a1311b7e6.zip
[SPARK-6175] Fix standalone executor log links when ephemeral ports or SPARK_PUBLIC_DNS are used
This patch fixes two issues with the executor log viewing links added in Spark 1.3. In standalone mode, the log URLs might include a port value of 0 rather than the actual bound port of the UI, which broke the ability to view logs from workers whose web UIs had been configured to bind to ephemeral ports. In addition, the URLs used workers' local hostnames instead of respecting SPARK_PUBLIC_DNS, which prevented this feature from working properly on Spark EC2 clusters because the links would point to internal DNS names instead of external ones. I included tests for both of these bugs: - We now browse to the URLs and verify that they point to the expected pages. - To test SPARK_PUBLIC_DNS, I changed the code that reads the environment variable to do so via `SparkConf.getenv`, then used a custom SparkConf subclass to mock the environment variable (this pattern is used elsewhere in Spark's tests). Author: Josh Rosen <joshrosen@databricks.com> Closes #4903 from JoshRosen/SPARK-6175 and squashes the following commits: 5577f41 [Josh Rosen] Remove println cfec135 [Josh Rosen] Use webUi.boundPort and publicAddress in log links 27918c7 [Josh Rosen] Add failing unit tests for standalone log URL viewing c250fbe [Josh Rosen] Respect SparkConf in local-cluster Workers. 422a2ef [Josh Rosen] Use conf.getenv to read SPARK_PUBLIC_DNS
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala4
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/ui/WebUI.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala54
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala2
8 files changed, 57 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index 0401b15446..3ab425aab8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -59,7 +59,7 @@ class LocalSparkCluster(
/* Start the Workers */
for (workerNum <- 1 to numWorkers) {
val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker,
- memoryPerWorker, masters, null, Some(workerNum))
+ memoryPerWorker, masters, null, Some(workerNum), _conf)
workerActorSystems += workerSystem
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 4584b730e3..1581429322 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -96,7 +96,7 @@ private[spark] class Master(
val webUi = new MasterWebUI(this, webUiPort)
val masterPublicAddress = {
- val envVar = System.getenv("SPARK_PUBLIC_DNS")
+ val envVar = conf.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else host
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 066d46c447..023f3c6269 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -44,6 +44,7 @@ private[spark] class ExecutorRunner(
val workerId: String,
val host: String,
val webUiPort: Int,
+ val publicAddress: String,
val sparkHome: File,
val executorDir: File,
val workerUrl: String,
@@ -140,7 +141,8 @@ private[spark] class ExecutorRunner(
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
// Add webUI log urls
- val baseUrl = s"http://$host:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
+ val baseUrl =
+ s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 2473a90aa9..f2e7418f4b 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -121,7 +121,7 @@ private[spark] class Worker(
val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr)
val publicAddress = {
- val envVar = System.getenv("SPARK_PUBLIC_DNS")
+ val envVar = conf.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else host
}
var webUi: WorkerWebUI = null
@@ -362,7 +362,8 @@ private[spark] class Worker(
self,
workerId,
host,
- webUiPort,
+ webUi.boundPort,
+ publicAddress,
sparkHome,
executorDir,
akkaUrl,
@@ -538,10 +539,10 @@ private[spark] object Worker extends Logging {
memory: Int,
masterUrls: Array[String],
workDir: String,
- workerNumber: Option[Int] = None): (ActorSystem, Int) = {
+ workerNumber: Option[Int] = None,
+ conf: SparkConf = new SparkConf): (ActorSystem, Int) = {
// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
- val conf = new SparkConf
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
val actorName = "Worker"
val securityMgr = new SecurityManager(conf)
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index 9be65a4a39..ec68837a15 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -47,7 +47,7 @@ private[spark] abstract class WebUI(
protected val handlers = ArrayBuffer[ServletContextHandler]()
protected var serverInfo: Option[ServerInfo] = None
protected val localHostName = Utils.localHostName()
- protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
+ protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
private val className = Utils.getFormattedClassName(this)
def getBasePath: String = basePath
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index e955636cf5..68b5776fc6 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -119,7 +119,7 @@ class JsonProtocolSuite extends FunSuite {
def createExecutorRunner(): ExecutorRunner = {
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", 123,
- new File("sparkHome"), new File("workDir"), "akka://worker",
+ "publicAddress", new File("sparkHome"), new File("workDir"), "akka://worker",
new SparkConf, Seq("localDir"), ExecutorState.RUNNING)
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
index f33bdc73e4..54dd7c9c45 100644
--- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
@@ -17,35 +17,69 @@
package org.apache.spark.deploy
+import java.net.URL
+
import scala.collection.mutable
+import scala.io.Source
-import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.FunSuite
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListener}
-import org.apache.spark.{SparkContext, LocalSparkContext}
+import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext}
-class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext with BeforeAndAfter {
+class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext {
/** Length of time to wait while draining listener events. */
- val WAIT_TIMEOUT_MILLIS = 10000
+ private val WAIT_TIMEOUT_MILLIS = 10000
- before {
+ test("verify that correct log urls get propagated from workers") {
sc = new SparkContext("local-cluster[2,1,512]", "test")
+
+ val listener = new SaveExecutorInfo
+ sc.addSparkListener(listener)
+
+ // Trigger a job so that executors get added
+ sc.parallelize(1 to 100, 4).map(_.toString).count()
+
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+ listener.addedExecutorInfos.values.foreach { info =>
+ assert(info.logUrlMap.nonEmpty)
+ // Browse to each URL to check that it's valid
+ info.logUrlMap.foreach { case (logType, logUrl) =>
+ val html = Source.fromURL(logUrl).mkString
+ assert(html.contains(s"$logType log page"))
+ }
+ }
}
- test("verify log urls get propagated from workers") {
+ test("verify that log urls reflect SPARK_PUBLIC_DNS (SPARK-6175)") {
+ val SPARK_PUBLIC_DNS = "public_dns"
+ class MySparkConf extends SparkConf(false) {
+ override def getenv(name: String) = {
+ if (name == "SPARK_PUBLIC_DNS") SPARK_PUBLIC_DNS
+ else super.getenv(name)
+ }
+
+ override def clone: SparkConf = {
+ new MySparkConf().setAll(getAll)
+ }
+ }
+ val conf = new MySparkConf()
+ sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
+
val listener = new SaveExecutorInfo
sc.addSparkListener(listener)
- val rdd1 = sc.parallelize(1 to 100, 4)
- val rdd2 = rdd1.map(_.toString)
- rdd2.setName("Target RDD")
- rdd2.count()
+ // Trigger a job so that executors get added
+ sc.parallelize(1 to 100, 4).map(_.toString).count()
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.addedExecutorInfos.values.foreach { info =>
assert(info.logUrlMap.nonEmpty)
+ info.logUrlMap.values.foreach { logUrl =>
+ assert(new URL(logUrl).getHost === SPARK_PUBLIC_DNS)
+ }
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index 76511699e5..6fca6321e5 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -33,7 +33,7 @@ class ExecutorRunnerTest extends FunSuite {
val appDesc = new ApplicationDescription("app name", Some(8), 500,
Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", 123,
- new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
+ "publicAddr", new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
ExecutorState.RUNNING)
val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables)
assert(builder.command().last === appId)