diff options
author | Andrew Or <andrewor14@gmail.com> | 2014-09-11 17:18:46 -0700 |
---|---|---|
committer | Andrew Or <andrewor14@gmail.com> | 2014-09-11 17:23:16 -0700 |
commit | 2ffc7980c6818eec05e32141c52e335bc71daed9 (patch) | |
tree | 6a0e70a791a293f8a0fc4291eade72deca3eaca6 /yarn/common/src | |
parent | 06fb2d057beb50e9b690bf8b6d5bb7bdb16d8546 (diff) | |
download | spark-2ffc7980c6818eec05e32141c52e335bc71daed9.tar.gz spark-2ffc7980c6818eec05e32141c52e335bc71daed9.tar.bz2 spark-2ffc7980c6818eec05e32141c52e335bc71daed9.zip |
[Spark-3490] Disable SparkUI for tests
We currently open many ephemeral ports during the tests, and as a result we occasionally can't bind to new ones. This has caused the `DriverSuite` and the `SparkSubmitSuite` to fail intermittently.
By disabling the `SparkUI` when it's not needed, we already cut down on the number of ports opened significantly, on the order of the number of `SparkContexts` ever created. We must keep it enabled for a few tests for the UI itself, however.
Author: Andrew Or <andrewor14@gmail.com>
Closes #2363 from andrewor14/disable-ui-for-tests and squashes the following commits:
332a7d5 [Andrew Or] No need to set spark.ui.port to 0 anymore
30c93a2 [Andrew Or] Simplify streaming UISuite
a431b84 [Andrew Or] Fix streaming test failures
8f5ae53 [Andrew Or] Fix no new line at the end
29c9b5b [Andrew Or] Disable SparkUI for tests
(cherry picked from commit 6324eb7b5b0ae005cb2e913e36b1508bd6f1b9b8)
Signed-off-by: Andrew Or <andrewor14@gmail.com>
Conflicts:
pom.xml
yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
Diffstat (limited to 'yarn/common/src')
-rw-r--r-- | yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 443 | ||||
-rw-r--r-- | yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala | 6 |
2 files changed, 447 insertions, 2 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala new file mode 100644 index 0000000000..878b6db546 --- /dev/null +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -0,0 +1,443 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.io.IOException +import java.net.Socket +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.JavaConversions._ +import scala.util.Try + +import akka.actor._ +import akka.remote._ +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.util.ShutdownHookManager +import org.apache.hadoop.yarn.api._ +import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.HistoryServer +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter +import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils} + +/** + * Common application master functionality for Spark on Yarn. + */ +private[spark] class ApplicationMaster(args: ApplicationMasterArguments, + client: YarnRMClient) extends Logging { + // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be + // optimal as more containers are available. Might need to handle this better. + + private val sparkConf = new SparkConf() + private val yarnConf: YarnConfiguration = SparkHadoopUtil.get.newConfiguration(sparkConf) + .asInstanceOf[YarnConfiguration] + private val isDriver = args.userClass != null + + // Default to numExecutors * 2, with minimum of 3 + private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures", + sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3))) + + @volatile private var finished = false + @volatile private var finalStatus = FinalApplicationStatus.UNDEFINED + + private var reporterThread: Thread = _ + private var allocator: YarnAllocator = _ + + // Fields used in client mode. + private var actorSystem: ActorSystem = null + private var actor: ActorRef = _ + + // Fields used in cluster mode. + private val sparkContextRef = new AtomicReference[SparkContext](null) + + final def run(): Int = { + val appAttemptId = client.getAttemptId() + + if (isDriver) { + // Set the web ui port to be ephemeral for yarn so we don't conflict with + // other spark processes running on the same box + System.setProperty("spark.ui.port", "0") + + // Set the master property to match the requested mode. + System.setProperty("spark.master", "yarn-cluster") + + // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up. + System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString()) + } + + logInfo("ApplicationAttemptId: " + appAttemptId) + + val cleanupHook = new Runnable { + override def run() { + // If the SparkContext is still registered, shut it down as a best case effort in case + // users do not call sc.stop or do System.exit(). + val sc = sparkContextRef.get() + if (sc != null) { + logInfo("Invoking sc stop from shutdown hook") + sc.stop() + finish(FinalApplicationStatus.SUCCEEDED) + } + + // Cleanup the staging dir after the app is finished, or if it's the last attempt at + // running the AM. + val maxAppAttempts = client.getMaxRegAttempts(yarnConf) + val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts + if (finished || isLastAttempt) { + cleanupStagingDir() + } + } + } + // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using. + ShutdownHookManager.get().addShutdownHook(cleanupHook, 30) + + // Call this to force generation of secret so it gets populated into the + // Hadoop UGI. This has to happen before the startUserClass which does a + // doAs in order for the credentials to be passed on to the executor containers. + val securityMgr = new SecurityManager(sparkConf) + + if (isDriver) { + runDriver(securityMgr) + } else { + runExecutorLauncher(securityMgr) + } + + if (finalStatus != FinalApplicationStatus.UNDEFINED) { + finish(finalStatus) + 0 + } else { + 1 + } + } + + final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized { + if (!finished) { + logInfo(s"Finishing ApplicationMaster with $status" + + Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse("")) + finished = true + finalStatus = status + try { + if (Thread.currentThread() != reporterThread) { + reporterThread.interrupt() + reporterThread.join() + } + } finally { + client.shutdown(status, Option(diagnostics).getOrElse("")) + } + } + } + + private def sparkContextInitialized(sc: SparkContext) = { + sparkContextRef.synchronized { + sparkContextRef.compareAndSet(null, sc) + sparkContextRef.notifyAll() + } + } + + private def sparkContextStopped(sc: SparkContext) = { + sparkContextRef.compareAndSet(sc, null) + } + + private def registerAM(uiAddress: String, securityMgr: SecurityManager) = { + val sc = sparkContextRef.get() + + val appId = client.getAttemptId().getApplicationId().toString() + val historyAddress = + sparkConf.getOption("spark.yarn.historyServer.address") + .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}" } + .getOrElse("") + + allocator = client.register(yarnConf, + if (sc != null) sc.getConf else sparkConf, + if (sc != null) sc.preferredNodeLocationData else Map(), + uiAddress, + historyAddress, + securityMgr) + + allocator.allocateResources() + reporterThread = launchReporterThread() + } + + private def runDriver(securityMgr: SecurityManager): Unit = { + addAmIpFilter() + val userThread = startUserClass() + + // This a bit hacky, but we need to wait until the spark.driver.port property has + // been set by the Thread executing the user class. + val sc = waitForSparkContextInitialized() + + // If there is no SparkContext at this point, just fail the app. + if (sc == null) { + finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.") + } else { + registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr) + try { + userThread.join() + } finally { + // In cluster mode, ask the reporter thread to stop since the user app is finished. + reporterThread.interrupt() + } + } + } + + private def runExecutorLauncher(securityMgr: SecurityManager): Unit = { + actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, + conf = sparkConf, securityManager = securityMgr)._1 + actor = waitForSparkDriver() + addAmIpFilter() + registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr) + + // In client mode the actor will stop the reporter thread. + reporterThread.join() + finalStatus = FinalApplicationStatus.SUCCEEDED + } + + private def launchReporterThread(): Thread = { + // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses. + val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) + + // we want to be reasonably responsive without causing too many requests to RM. + val schedulerInterval = + sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000) + + // must be <= expiryInterval / 2. + val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval)) + + val t = new Thread { + override def run() { + while (!finished) { + checkNumExecutorsFailed() + if (!finished) { + logDebug("Sending progress") + allocator.allocateResources() + try { + Thread.sleep(interval) + } catch { + case e: InterruptedException => + } + } + } + } + } + // setting to daemon status, though this is usually not a good idea. + t.setDaemon(true) + t.setName("Reporter") + t.start() + logInfo("Started progress reporter thread - sleep time : " + interval) + t + } + + /** + * Clean up the staging directory. + */ + private def cleanupStagingDir() { + val fs = FileSystem.get(yarnConf) + var stagingDirPath: Path = null + try { + val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean + if (!preserveFiles) { + stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) + if (stagingDirPath == null) { + logError("Staging directory is null") + return + } + logInfo("Deleting staging directory " + stagingDirPath) + fs.delete(stagingDirPath, true) + } + } catch { + case ioe: IOException => + logError("Failed to cleanup staging dir " + stagingDirPath, ioe) + } + } + + private def waitForSparkContextInitialized(): SparkContext = { + logInfo("Waiting for spark context initialization") + try { + sparkContextRef.synchronized { + var count = 0 + val waitTime = 10000L + val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10) + while (sparkContextRef.get() == null && count < numTries && !finished) { + logInfo("Waiting for spark context initialization ... " + count) + count = count + 1 + sparkContextRef.wait(waitTime) + } + + val sparkContext = sparkContextRef.get() + assert(sparkContext != null || count >= numTries) + if (sparkContext == null) { + logError( + "Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d".format( + count * waitTime, numTries)) + } + sparkContext + } + } + } + + private def waitForSparkDriver(): ActorRef = { + logInfo("Waiting for Spark driver to be reachable.") + var driverUp = false + val hostport = args.userArgs(0) + val (driverHost, driverPort) = Utils.parseHostPort(hostport) + while (!driverUp) { + try { + val socket = new Socket(driverHost, driverPort) + socket.close() + logInfo("Driver now available: %s:%s".format(driverHost, driverPort)) + driverUp = true + } catch { + case e: Exception => + logError("Failed to connect to driver at %s:%s, retrying ...". + format(driverHost, driverPort)) + Thread.sleep(100) + } + } + sparkConf.set("spark.driver.host", driverHost) + sparkConf.set("spark.driver.port", driverPort.toString) + + val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( + SparkEnv.driverActorSystemName, + driverHost, + driverPort.toString, + CoarseGrainedSchedulerBackend.ACTOR_NAME) + actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM") + } + + private def checkNumExecutorsFailed() = { + if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) { + finish(FinalApplicationStatus.FAILED, "Max number of executor failures reached.") + + val sc = sparkContextRef.get() + if (sc != null) { + logInfo("Invoking sc stop from checkNumExecutorsFailed") + sc.stop() + } + } + } + + /** Add the Yarn IP filter that is required for properly securing the UI. */ + private def addAmIpFilter() = { + val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" + val proxy = client.getProxyHostAndPort(yarnConf) + val parts = proxy.split(":") + val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) + val uriBase = "http://" + proxy + proxyBase + val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase + + if (isDriver) { + System.setProperty("spark.ui.filters", amFilter) + System.setProperty(s"spark.$amFilter.params", params) + } else { + actor ! AddWebUIFilter(amFilter, params, proxyBase) + } + } + + private def startUserClass(): Thread = { + logInfo("Starting the user JAR in a separate Thread") + System.setProperty("spark.executor.instances", args.numExecutors.toString) + val mainMethod = Class.forName(args.userClass, false, + Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) + + val t = new Thread { + override def run() { + var status = FinalApplicationStatus.FAILED + try { + // Copy + val mainArgs = new Array[String](args.userArgs.size) + args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size) + mainMethod.invoke(null, mainArgs) + // Some apps have "System.exit(0)" at the end. The user thread will stop here unless + // it has an uncaught exception thrown out. It needs a shutdown hook to set SUCCEEDED. + status = FinalApplicationStatus.SUCCEEDED + } finally { + logDebug("Finishing main") + } + finalStatus = status + } + } + t.setName("Driver") + t.start() + t + } + + // Actor used to monitor the driver when running in client deploy mode. + private class MonitorActor(driverUrl: String) extends Actor { + + var driver: ActorSelection = _ + + override def preStart() = { + logInfo("Listen to driver: " + driverUrl) + driver = context.actorSelection(driverUrl) + // Send a hello message to establish the connection, after which + // we can monitor Lifecycle Events. + driver ! "Hello" + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) + } + + override def receive = { + case x: DisassociatedEvent => + logInfo(s"Driver terminated or disconnected! Shutting down. $x") + finish(FinalApplicationStatus.SUCCEEDED) + case x: AddWebUIFilter => + logInfo(s"Add WebUI Filter. $x") + driver ! x + } + + } + +} + +object ApplicationMaster extends Logging { + + private var master: ApplicationMaster = _ + + def main(args: Array[String]) = { + SignalLogger.register(log) + val amArgs = new ApplicationMasterArguments(args) + SparkHadoopUtil.get.runAsSparkUser { () => + master = new ApplicationMaster(amArgs, new YarnRMClientImpl(amArgs)) + System.exit(master.run()) + } + } + + private[spark] def sparkContextInitialized(sc: SparkContext) = { + master.sparkContextInitialized(sc) + } + + private[spark] def sparkContextStopped(sc: SparkContext) = { + master.sparkContextStopped(sc) + } + +} + +/** + * This object does not provide any special functionality. It exists so that it's easy to tell + * apart the client-mode AM from the cluster-mode AM when using tools such as ps or jps. + */ +object ExecutorLauncher { + + def main(args: Array[String]) = { + ApplicationMaster.main(args) + } + +} diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 833e249f9f..40d9bffe8e 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -55,8 +55,10 @@ private[spark] class YarnClientSchedulerBackend( val driverHost = conf.get("spark.driver.host") val driverPort = conf.get("spark.driver.port") val hostport = driverHost + ":" + driverPort - conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort) - conf.set("spark.driver.appUIHistoryAddress", YarnSparkHadoopUtil.getUIHistoryAddress(sc, conf)) + sc.ui.foreach { ui => + conf.set("spark.driver.appUIAddress", ui.appUIHostPort) + conf.set("spark.driver.appUIHistoryAddress", YarnSparkHadoopUtil.getUIHistoryAddress(sc, conf)) + } val argsArrayBuf = new ArrayBuffer[String]() argsArrayBuf += ( |