diff options
author | Andrew Or <andrewor14@gmail.com> | 2014-07-29 23:52:09 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-07-29 23:52:09 -0700 |
commit | 4ce92ccaf761e48a10fc4fe4927dbfca858ca22b (patch) | |
tree | af298364e8b7cff17d1fa93c7f60b4883b0fa88c /core/src/main/scala | |
parent | 077f633b4720422c5efbf0382e869ead3dc49612 (diff) | |
download | spark-4ce92ccaf761e48a10fc4fe4927dbfca858ca22b.tar.gz spark-4ce92ccaf761e48a10fc4fe4927dbfca858ca22b.tar.bz2 spark-4ce92ccaf761e48a10fc4fe4927dbfca858ca22b.zip |
[SPARK-2260] Fix standalone-cluster mode, which was broken
The main thing was that spark configs were not propagated to the driver, and so applications that do not specify `master` or `appName` automatically failed. This PR fixes that and a couple of miscellaneous things that are related.
One thing that may or may not be an issue is that the jars must be available on the driver node. In `standalone-cluster` mode, this effectively means these jars must be available on all the worker machines, since the driver is launched on one of them. The semantics here are not the same as `yarn-cluster` mode, where all the relevant jars are uploaded to a distributed cache automatically and shipped to the containers. This is probably not a concern, but still worth a mention.
Author: Andrew Or <andrewor14@gmail.com>
Closes #1538 from andrewor14/standalone-cluster and squashes the following commits:
8c11a0d [Andrew Or] Clean up imports / comments (minor)
2678d13 [Andrew Or] Handle extraJavaOpts properly
7660547 [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-cluster
6f64a9b [Andrew Or] Revert changes in YARN
2f2908b [Andrew Or] Fix tests
ed01491 [Andrew Or] Don't go overboard with escaping
8e105e1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-cluster
b890949 [Andrew Or] Abstract usages of converting spark opts to java opts
79f63a3 [Andrew Or] Move sparkProps into javaOpts
78752f8 [Andrew Or] Fix tests
5a9c6c7 [Andrew Or] Fix line too long
c141a00 [Andrew Or] Don't display "unknown app" on driver log pages
d7e2728 [Andrew Or] Avoid deprecation warning in standalone Client
6ceb14f [Andrew Or] Allow relevant configs to propagate to standalone Driver
7f854bc [Andrew Or] Fix test
855256e [Andrew Or] Fix standalone-cluster mode
fd9da51 [Andrew Or] Formatting changes (minor)
Diffstat (limited to 'core/src/main/scala')
12 files changed, 83 insertions, 44 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 8ce4b91cae..38700847c8 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -40,6 +40,8 @@ import scala.collection.mutable.HashMap */ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { + import SparkConf._ + /** Create a SparkConf that loads defaults from system properties and the classpath */ def this() = this(true) @@ -198,7 +200,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { * * E.g. spark.akka.option.x.y.x = "value" */ - getAll.filter {case (k, v) => k.startsWith("akka.")} + getAll.filter { case (k, _) => isAkkaConf(k) } /** Does the configuration contain a given parameter? */ def contains(key: String): Boolean = settings.contains(key) @@ -292,3 +294,21 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { settings.toArray.sorted.map{case (k, v) => k + "=" + v}.mkString("\n") } } + +private[spark] object SparkConf { + /** + * Return whether the given config is an akka config (e.g. akka.actor.provider). + * Note that this does not include spark-specific akka configs (e.g. spark.akka.timeout). + */ + def isAkkaConf(name: String): Boolean = name.startsWith("akka.") + + /** + * Return whether the given config should be passed to an executor on start-up. + * + * Certain akka and authentication configs are required of the executor when it connects to + * the scheduler, while the rest of the spark configs can be inherited from the driver later. + */ + def isExecutorStartupConf(name: String): Boolean = { + isAkkaConf(name) || name.startsWith("spark.akka") || name.startsWith("spark.auth") + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index c371dc3a51..17c507af26 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -17,8 +17,6 @@ package org.apache.spark.deploy -import scala.collection.JavaConversions._ -import scala.collection.mutable.Map import scala.concurrent._ import akka.actor._ @@ -50,9 +48,6 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends // TODO: We could add an env variable here and intercept it in `sc.addJar` that would // truncate filesystem paths similar to what YARN does. For now, we just require // people call `addJar` assuming the jar is in the same directory. - val env = Map[String, String]() - System.getenv().foreach{case (k, v) => env(k) = v} - val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" val classPathConf = "spark.driver.extraClassPath" @@ -65,10 +60,13 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends cp.split(java.io.File.pathSeparator) } - val javaOptionsConf = "spark.driver.extraJavaOptions" - val javaOpts = sys.props.get(javaOptionsConf) + val extraJavaOptsConf = "spark.driver.extraJavaOptions" + val extraJavaOpts = sys.props.get(extraJavaOptsConf) + .map(Utils.splitCommandString).getOrElse(Seq.empty) + val sparkJavaOpts = Utils.sparkJavaOpts(conf) + val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++ - driverArgs.driverOptions, env, classPathEntries, libraryPathEntries, javaOpts) + driverArgs.driverOptions, sys.env, classPathEntries, libraryPathEntries, javaOpts) val driverDescription = new DriverDescription( driverArgs.jarUrl, @@ -109,6 +107,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends // Exception, if present statusResponse.exception.map { e => println(s"Exception from cluster was: $e") + e.printStackTrace() System.exit(-1) } System.exit(0) @@ -141,8 +140,10 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends */ object Client { def main(args: Array[String]) { - println("WARNING: This client is deprecated and will be removed in a future version of Spark.") - println("Use ./bin/spark-submit with \"--master spark://host:port\"") + if (!sys.props.contains("SPARK_SUBMIT")) { + println("WARNING: This client is deprecated and will be removed in a future version of Spark") + println("Use ./bin/spark-submit with \"--master spark://host:port\"") + } val conf = new SparkConf() val driverArgs = new ClientArguments(args) diff --git a/core/src/main/scala/org/apache/spark/deploy/Command.scala b/core/src/main/scala/org/apache/spark/deploy/Command.scala index 32f3ba3850..a2b263544c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Command.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Command.scala @@ -25,5 +25,5 @@ private[spark] case class Command( environment: Map[String, String], classPathEntries: Seq[String], libraryPathEntries: Seq[String], - extraJavaOptions: Option[String] = None) { + javaOpts: Seq[String]) { } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index c9cec33eba..3df811c4ac 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -136,8 +136,6 @@ object SparkSubmit { (clusterManager, deployMode) match { case (MESOS, CLUSTER) => printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.") - case (STANDALONE, CLUSTER) => - printErrorAndExit("Cluster deploy mode is currently not supported for Standalone clusters.") case (_, CLUSTER) if args.isPython => printErrorAndExit("Cluster deploy mode is currently not supported for python applications.") case (_, CLUSTER) if isShell(args.primaryResource) => @@ -170,9 +168,9 @@ object SparkSubmit { val options = List[OptionAssigner]( // All cluster managers - OptionAssigner(args.master, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.master"), - OptionAssigner(args.name, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.app.name"), - OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"), + OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"), + OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"), + OptionAssigner(args.jars, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.jars"), // Standalone cluster only OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"), @@ -203,9 +201,9 @@ object SparkSubmit { sysProp = "spark.driver.extraJavaOptions"), OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER, sysProp = "spark.driver.extraLibraryPath"), - OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, CLIENT, + OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES, sysProp = "spark.executor.memory"), - OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, CLIENT, + OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.cores.max"), OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.files") diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala index e15a87bd38..b8ffa9afb6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala @@ -46,11 +46,11 @@ private[spark] object TestClient { def main(args: Array[String]) { val url = args(0) val conf = new SparkConf - val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0, + val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0, conf = conf, securityManager = new SecurityManager(conf)) val desc = new ApplicationDescription( - "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), - Seq()), Some("dummy-spark-home"), "ignored") + "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(), + Seq(), Seq(), Seq()), Some("dummy-spark-home"), "ignored") val listener = new TestListener val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf) client.start() diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala index 4af5bc3afa..687e492a0d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -47,7 +47,6 @@ object CommandUtils extends Logging { */ def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = { val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M") - val extraOpts = command.extraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq()) // Exists for backwards compatibility with older Spark versions val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString) @@ -62,7 +61,7 @@ object CommandUtils extends Logging { val joined = command.libraryPathEntries.mkString(File.pathSeparator) Seq(s"-Djava.library.path=$joined") } else { - Seq() + Seq() } val permGenOpt = Seq("-XX:MaxPermSize=128m") @@ -71,11 +70,11 @@ object CommandUtils extends Logging { val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh" val classPath = Utils.executeAndGetOutput( Seq(sparkHome + "/bin/compute-classpath" + ext), - extraEnvironment=command.environment) + extraEnvironment = command.environment) val userClassPath = command.classPathEntries ++ Seq(classPath) Seq("-cp", userClassPath.filterNot(_.isEmpty).mkString(File.pathSeparator)) ++ - permGenOpt ++ libraryOpts ++ extraOpts ++ workerLocalOpts ++ memoryOpts + permGenOpt ++ libraryOpts ++ workerLocalOpts ++ command.javaOpts ++ memoryOpts } /** Spawn a thread that will redirect a given stream to a file */ diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 662d37871e..5caaf6bea3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -36,6 +36,7 @@ import org.apache.spark.deploy.master.DriverState.DriverState /** * Manages the execution of one driver, including automatically restarting the driver on failure. + * This is currently only used in standalone cluster deploy mode. */ private[spark] class DriverRunner( val driverId: String, @@ -81,7 +82,7 @@ private[spark] class DriverRunner( driverDesc.command.environment, classPath, driverDesc.command.libraryPathEntries, - driverDesc.command.extraJavaOptions) + driverDesc.command.javaOpts) val command = CommandUtils.buildCommandSeq(newCommand, driverDesc.mem, sparkHome.getAbsolutePath) launchDriver(command, driverDesc.command.environment, driverDir, driverDesc.supervise) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 467317dd9b..7be89f9aff 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -30,6 +30,7 @@ import org.apache.spark.util.logging.FileAppender /** * Manages the execution of one executor process. + * This is currently only used in standalone mode. */ private[spark] class ExecutorRunner( val appId: String, @@ -72,7 +73,7 @@ private[spark] class ExecutorRunner( } /** - * kill executor process, wait for exit and notify worker to update resource status + * Kill executor process, wait for exit and notify worker to update resource status. * * @param message the exception message which caused the executor's death */ @@ -114,10 +115,13 @@ private[spark] class ExecutorRunner( } def getCommandSeq = { - val command = Command(appDesc.command.mainClass, - appDesc.command.arguments.map(substituteVariables) ++ Seq(appId), appDesc.command.environment, - appDesc.command.classPathEntries, appDesc.command.libraryPathEntries, - appDesc.command.extraJavaOptions) + val command = Command( + appDesc.command.mainClass, + appDesc.command.arguments.map(substituteVariables) ++ Seq(appId), + appDesc.command.environment, + appDesc.command.classPathEntries, + appDesc.command.libraryPathEntries, + appDesc.command.javaOpts) CommandUtils.buildCommandSeq(command, memory, sparkHome.getAbsolutePath) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala index b389cb546d..ecb358c399 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala @@ -17,7 +17,6 @@ package org.apache.spark.deploy.worker.ui -import java.io.File import javax.servlet.http.HttpServletRequest import scala.xml.Node @@ -25,7 +24,7 @@ import scala.xml.Node import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils import org.apache.spark.Logging -import org.apache.spark.util.logging.{FileAppender, RollingFileAppender} +import org.apache.spark.util.logging.RollingFileAppender private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with Logging { private val worker = parent.worker @@ -64,11 +63,11 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") w val offset = Option(request.getParameter("offset")).map(_.toLong) val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes) - val (logDir, params) = (appId, executorId, driverId) match { + val (logDir, params, pageName) = (appId, executorId, driverId) match { case (Some(a), Some(e), None) => - (s"${workDir.getPath}/$a/$e/", s"appId=$a&executorId=$e") + (s"${workDir.getPath}/$a/$e/", s"appId=$a&executorId=$e", s"$a/$e") case (None, None, Some(d)) => - (s"${workDir.getPath}/$d/", s"driverId=$d") + (s"${workDir.getPath}/$d/", s"driverId=$d", d) case _ => throw new Exception("Request must specify either application or driver identifiers") } @@ -120,7 +119,7 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") w </div> </body> </html> - UIUtils.basicSparkPage(content, logType + " log page for " + appId.getOrElse("unknown app")) + UIUtils.basicSparkPage(content, logType + " log page for " + pageName) } /** Get the part of the log files given the offset and desired length of bytes */ diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index b455c9fcf4..860b47e056 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -98,8 +98,13 @@ private[spark] class CoarseGrainedExecutorBackend( } private[spark] object CoarseGrainedExecutorBackend extends Logging { - def run(driverUrl: String, executorId: String, hostname: String, cores: Int, - workerUrl: Option[String]) { + + private def run( + driverUrl: String, + executorId: String, + hostname: String, + cores: Int, + workerUrl: Option[String]) { SignalLogger.register(log) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index bf2dc88e29..48aaaa54bd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler.cluster -import org.apache.spark.{Logging, SparkContext} +import org.apache.spark.{Logging, SparkConf, SparkContext} import org.apache.spark.deploy.{ApplicationDescription, Command} import org.apache.spark.deploy.client.{AppClient, AppClientListener} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl} @@ -46,6 +46,7 @@ private[spark] class SparkDeploySchedulerBackend( CoarseGrainedSchedulerBackend.ACTOR_NAME) val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}") val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions") + .map(Utils.splitCommandString).getOrElse(Seq.empty) val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp => cp.split(java.io.File.pathSeparator) } @@ -54,9 +55,11 @@ private[spark] class SparkDeploySchedulerBackend( cp.split(java.io.File.pathSeparator) } - val command = Command( - "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, - classPathEntries, libraryPathEntries, extraJavaOpts) + // Start executors with a few necessary configs for registering with the scheduler + val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf) + val javaOpts = sparkJavaOpts ++ extraJavaOpts + val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", + args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts) val sparkHome = sc.getSparkHome() val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir)) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 8cbb9050f3..69f65b4bdc 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1313,4 +1313,13 @@ private[spark] object Utils extends Logging { s"$className: $desc\n$st" } + /** + * Convert all spark properties set in the given SparkConf to a sequence of java options. + */ + def sparkJavaOpts(conf: SparkConf, filterKey: (String => Boolean) = _ => true): Seq[String] = { + conf.getAll + .filter { case (k, _) => filterKey(k) } + .map { case (k, v) => s"-D$k=$v" } + } + } |