diff options
Diffstat (limited to 'core')
9 files changed, 320 insertions, 105 deletions
diff --git a/core/pom.xml b/core/pom.xml index c24c7be204..8fe215ab24 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -247,6 +247,11 @@ <artifactId>pyrolite</artifactId> <version>2.0.1</version> </dependency> + <dependency> + <groupId>net.sf.py4j</groupId> + <artifactId>py4j</artifactId> + <version>0.8.1</version> + </dependency> </dependencies> <build> <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index d40ed27da5..806e77d98f 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -17,6 +17,8 @@ package org.apache.spark +import java.io.File + import scala.collection.JavaConversions._ import scala.collection.mutable import scala.concurrent.Await @@ -304,7 +306,7 @@ object SparkEnv extends Logging { k == "java.class.path" }.getOrElse(("", "")) val classPathEntries = classPathProperty._2 - .split(conf.get("path.separator", ":")) + .split(File.pathSeparator) .filterNot(e => e.isEmpty) .map(e => (e, "System Classpath")) val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User")) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala new file mode 100644 index 0000000000..cf69fa1d53 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.SparkContext + +private[spark] object PythonUtils { + /** Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from our JAR */ + def sparkPythonPath: String = { + val pythonPath = new ArrayBuffer[String] + for (sparkHome <- sys.env.get("SPARK_HOME")) { + pythonPath += Seq(sparkHome, "python").mkString(File.separator) + pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.8.1-src.zip").mkString(File.separator) + } + pythonPath ++= SparkContext.jarOfObject(this) + pythonPath.mkString(File.pathSeparator) + } + + /** Merge PYTHONPATHS with the appropriate separator. Ignores blank strings. */ + def mergePythonPaths(paths: String*): String = { + paths.filter(_ != "").mkString(File.pathSeparator) + } +} diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 02799ce009..b0bf4e052b 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -37,6 +37,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1)) var daemonPort: Int = 0 + val pythonPath = PythonUtils.mergePythonPaths( + PythonUtils.sparkPythonPath, envVars.getOrElse("PYTHONPATH", "")) + def create(): Socket = { if (useDaemon) { createThroughDaemon() @@ -78,9 +81,10 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1))) // Create and start the worker - val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker")) + val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.worker")) val workerEnv = pb.environment() workerEnv.putAll(envVars) + workerEnv.put("PYTHONPATH", pythonPath) val worker = pb.start() // Redirect the worker's stderr to ours @@ -151,9 +155,10 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String try { // Create and start the daemon - val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon")) + val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.daemon")) val workerEnv = pb.environment() workerEnv.putAll(envVars) + workerEnv.put("PYTHONPATH", pythonPath) daemon = pb.start() // Redirect the stderr to ours diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala new file mode 100644 index 0000000000..f2e7c7a508 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import java.io.{IOException, File, InputStream, OutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConversions._ + +import org.apache.spark.SparkContext +import org.apache.spark.api.python.PythonUtils + +/** + * A main class used by spark-submit to launch Python applications. It executes python as a + * subprocess and then has it connect back to the JVM to access system properties, etc. + */ +object PythonRunner { + def main(args: Array[String]) { + val primaryResource = args(0) + val pyFiles = args(1) + val otherArgs = args.slice(2, args.length) + + val pythonExec = sys.env.get("PYSPARK_PYTHON").getOrElse("python") // TODO: get this from conf + + // Launch a Py4J gateway server for the process to connect to; this will let it see our + // Java system properties and such + val gatewayServer = new py4j.GatewayServer(null, 0) + gatewayServer.start() + + // Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the + // python directories in SPARK_HOME (if set), and any files in the pyFiles argument + val pathElements = new ArrayBuffer[String] + pathElements ++= pyFiles.split(",") + pathElements += PythonUtils.sparkPythonPath + pathElements += sys.env.getOrElse("PYTHONPATH", "") + val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*) + + // Launch Python process + val builder = new ProcessBuilder(Seq(pythonExec, "-u", primaryResource) ++ otherArgs) + val env = builder.environment() + env.put("PYTHONPATH", pythonPath) + env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort) + builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize + val process = builder.start() + + new RedirectThread(process.getInputStream, System.out, "redirect output").start() + + System.exit(process.waitFor()) + } + + /** + * A utility class to redirect the child process's stdout or stderr + */ + class RedirectThread(in: InputStream, out: OutputStream, name: String) extends Thread(name) { + setDaemon(true) + override def run() { + scala.util.control.Exception.ignoring(classOf[IOException]) { + // FIXME: We copy the stream on the level of bytes to avoid encoding problems. + val buf = new Array[Byte](1024) + var len = in.read(buf) + while (len != -1) { + out.write(buf, 0, len) + out.flush() + len = in.read(buf) + } + } + } + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index fb30e8a70f..e39723f383 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -60,11 +60,11 @@ object SparkSubmit { private[spark] var exitFn: () => Unit = () => System.exit(-1) private[spark] def printErrorAndExit(str: String) = { - printStream.println("error: " + str) - printStream.println("run with --help for more information or --verbose for debugging output") + printStream.println("Error: " + str) + printStream.println("Run with --help for usage help or --verbose for debug output") exitFn() } - private[spark] def printWarning(str: String) = printStream.println("warning: " + str) + private[spark] def printWarning(str: String) = printStream.println("Warning: " + str) /** * @return @@ -72,43 +72,43 @@ object SparkSubmit { * entries for the child, a list of system propertes, a list of env vars * and the main class for the child */ - private[spark] def createLaunchEnv(appArgs: SparkSubmitArguments): (ArrayBuffer[String], + private[spark] def createLaunchEnv(args: SparkSubmitArguments): (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { - if (appArgs.master.startsWith("local")) { + if (args.master.startsWith("local")) { clusterManager = LOCAL - } else if (appArgs.master.startsWith("yarn")) { + } else if (args.master.startsWith("yarn")) { clusterManager = YARN - } else if (appArgs.master.startsWith("spark")) { + } else if (args.master.startsWith("spark")) { clusterManager = STANDALONE - } else if (appArgs.master.startsWith("mesos")) { + } else if (args.master.startsWith("mesos")) { clusterManager = MESOS } else { - printErrorAndExit("master must start with yarn, mesos, spark, or local") + printErrorAndExit("Master must start with yarn, mesos, spark, or local") } // Because "yarn-cluster" and "yarn-client" encapsulate both the master // and deploy mode, we have some logic to infer the master and deploy mode // from each other if only one is specified, or exit early if they are at odds. - if (appArgs.deployMode == null && - (appArgs.master == "yarn-standalone" || appArgs.master == "yarn-cluster")) { - appArgs.deployMode = "cluster" + if (args.deployMode == null && + (args.master == "yarn-standalone" || args.master == "yarn-cluster")) { + args.deployMode = "cluster" } - if (appArgs.deployMode == "cluster" && appArgs.master == "yarn-client") { + if (args.deployMode == "cluster" && args.master == "yarn-client") { printErrorAndExit("Deploy mode \"cluster\" and master \"yarn-client\" are not compatible") } - if (appArgs.deployMode == "client" && - (appArgs.master == "yarn-standalone" || appArgs.master == "yarn-cluster")) { - printErrorAndExit("Deploy mode \"client\" and master \"" + appArgs.master + if (args.deployMode == "client" && + (args.master == "yarn-standalone" || args.master == "yarn-cluster")) { + printErrorAndExit("Deploy mode \"client\" and master \"" + args.master + "\" are not compatible") } - if (appArgs.deployMode == "cluster" && appArgs.master.startsWith("yarn")) { - appArgs.master = "yarn-cluster" + if (args.deployMode == "cluster" && args.master.startsWith("yarn")) { + args.master = "yarn-cluster" } - if (appArgs.deployMode != "cluster" && appArgs.master.startsWith("yarn")) { - appArgs.master = "yarn-client" + if (args.deployMode != "cluster" && args.master.startsWith("yarn")) { + args.master = "yarn-client" } - val deployOnCluster = Option(appArgs.deployMode).getOrElse("client") == "cluster" + val deployOnCluster = Option(args.deployMode).getOrElse("client") == "cluster" val childClasspath = new ArrayBuffer[String]() val childArgs = new ArrayBuffer[String]() @@ -116,76 +116,93 @@ object SparkSubmit { var childMainClass = "" if (clusterManager == MESOS && deployOnCluster) { - printErrorAndExit("Mesos does not support running the driver on the cluster") + printErrorAndExit("Cannot currently run driver on the cluster in Mesos") } + // If we're running a Python app, set the Java class to run to be our PythonRunner, add + // Python files to deployment list, and pass the main file and Python path to PythonRunner + if (args.isPython) { + if (deployOnCluster) { + printErrorAndExit("Cannot currently run Python driver programs on cluster") + } + args.mainClass = "org.apache.spark.deploy.PythonRunner" + args.files = mergeFileLists(args.files, args.pyFiles, args.primaryResource) + val pyFiles = Option(args.pyFiles).getOrElse("") + args.childArgs = ArrayBuffer(args.primaryResource, pyFiles) ++ args.childArgs + args.primaryResource = RESERVED_JAR_NAME + sysProps("spark.submit.pyFiles") = pyFiles + } + + // If we're deploying into YARN, use yarn.Client as a wrapper around the user class if (!deployOnCluster) { - childMainClass = appArgs.mainClass - if (appArgs.primaryResource != RESERVED_JAR_NAME) { - childClasspath += appArgs.primaryResource + childMainClass = args.mainClass + if (args.primaryResource != RESERVED_JAR_NAME) { + childClasspath += args.primaryResource } } else if (clusterManager == YARN) { childMainClass = "org.apache.spark.deploy.yarn.Client" - childArgs += ("--jar", appArgs.primaryResource) - childArgs += ("--class", appArgs.mainClass) + childArgs += ("--jar", args.primaryResource) + childArgs += ("--class", args.mainClass) } + // Make sure YARN is included in our build if we're trying to use it if (clusterManager == YARN) { - // The choice of class is arbitrary, could use any spark-yarn class if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) { - val msg = "Could not load YARN classes. This copy of Spark may not have been compiled " + - "with YARN support." - throw new Exception(msg) + printErrorAndExit("Could not load YARN classes. " + + "This copy of Spark may not have been compiled with YARN support.") } } // Special flag to avoid deprecation warnings at the client sysProps("SPARK_SUBMIT") = "true" + // A list of rules to map each argument to system properties or command-line options in + // each deploy mode; we iterate through these below val options = List[OptionAssigner]( - new OptionAssigner(appArgs.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"), - new OptionAssigner(appArgs.driverExtraClassPath, STANDALONE | YARN, true, + OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"), + OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true, sysProp = "spark.driver.extraClassPath"), - new OptionAssigner(appArgs.driverExtraJavaOptions, STANDALONE | YARN, true, + OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true, sysProp = "spark.driver.extraJavaOptions"), - new OptionAssigner(appArgs.driverExtraLibraryPath, STANDALONE | YARN, true, + OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true, sysProp = "spark.driver.extraLibraryPath"), - new OptionAssigner(appArgs.driverMemory, YARN, true, clOption = "--driver-memory"), - new OptionAssigner(appArgs.name, YARN, true, clOption = "--name"), - new OptionAssigner(appArgs.queue, YARN, true, clOption = "--queue"), - new OptionAssigner(appArgs.queue, YARN, false, sysProp = "spark.yarn.queue"), - new OptionAssigner(appArgs.numExecutors, YARN, true, clOption = "--num-executors"), - new OptionAssigner(appArgs.numExecutors, YARN, false, sysProp = "spark.executor.instances"), - new OptionAssigner(appArgs.executorMemory, YARN, true, clOption = "--executor-memory"), - new OptionAssigner(appArgs.executorMemory, STANDALONE | MESOS | YARN, false, + OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"), + OptionAssigner(args.name, YARN, true, clOption = "--name"), + OptionAssigner(args.queue, YARN, true, clOption = "--queue"), + OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"), + OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"), + OptionAssigner(args.numExecutors, YARN, false, sysProp = "spark.executor.instances"), + OptionAssigner(args.executorMemory, YARN, true, clOption = "--executor-memory"), + OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, false, sysProp = "spark.executor.memory"), - new OptionAssigner(appArgs.driverMemory, STANDALONE, true, clOption = "--memory"), - new OptionAssigner(appArgs.driverCores, STANDALONE, true, clOption = "--cores"), - new OptionAssigner(appArgs.executorCores, YARN, true, clOption = "--executor-cores"), - new OptionAssigner(appArgs.executorCores, YARN, false, sysProp = "spark.executor.cores"), - new OptionAssigner(appArgs.totalExecutorCores, STANDALONE | MESOS, false, + OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"), + OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"), + OptionAssigner(args.executorCores, YARN, true, clOption = "--executor-cores"), + OptionAssigner(args.executorCores, YARN, false, sysProp = "spark.executor.cores"), + OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, false, sysProp = "spark.cores.max"), - new OptionAssigner(appArgs.files, YARN, false, sysProp = "spark.yarn.dist.files"), - new OptionAssigner(appArgs.files, YARN, true, clOption = "--files"), - new OptionAssigner(appArgs.archives, YARN, false, sysProp = "spark.yarn.dist.archives"), - new OptionAssigner(appArgs.archives, YARN, true, clOption = "--archives"), - new OptionAssigner(appArgs.jars, YARN, true, clOption = "--addJars"), - new OptionAssigner(appArgs.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"), - new OptionAssigner(appArgs.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars"), - new OptionAssigner(appArgs.name, LOCAL | STANDALONE | MESOS, false, - sysProp = "spark.app.name") + OptionAssigner(args.files, YARN, false, sysProp = "spark.yarn.dist.files"), + OptionAssigner(args.files, YARN, true, clOption = "--files"), + OptionAssigner(args.archives, YARN, false, sysProp = "spark.yarn.dist.archives"), + OptionAssigner(args.archives, YARN, true, clOption = "--archives"), + OptionAssigner(args.jars, YARN, true, clOption = "--addJars"), + OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"), + OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"), + OptionAssigner(args.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars"), + OptionAssigner(args.name, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.app.name") ) // For client mode make any added jars immediately visible on the classpath - if (appArgs.jars != null && !deployOnCluster) { - for (jar <- appArgs.jars.split(",")) { + if (args.jars != null && !deployOnCluster) { + for (jar <- args.jars.split(",")) { childClasspath += jar } } + // Map all arguments to command-line options or system properties for our chosen mode for (opt <- options) { if (opt.value != null && deployOnCluster == opt.deployOnCluster && - (clusterManager & opt.clusterManager) != 0) { + (clusterManager & opt.clusterManager) != 0) { if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) } else if (opt.sysProp != null) { @@ -197,32 +214,35 @@ object SparkSubmit { // For standalone mode, add the application jar automatically so the user doesn't have to // call sc.addJar. TODO: Standalone mode in the cluster if (clusterManager == STANDALONE) { - val existingJars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq()) - sysProps.put("spark.jars", (existingJars ++ Seq(appArgs.primaryResource)).mkString(",")) + var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq()) + if (args.primaryResource != RESERVED_JAR_NAME) { + jars = jars ++ Seq(args.primaryResource) + } + sysProps.put("spark.jars", jars.mkString(",")) } if (deployOnCluster && clusterManager == STANDALONE) { - if (appArgs.supervise) { + if (args.supervise) { childArgs += "--supervise" } childMainClass = "org.apache.spark.deploy.Client" childArgs += "launch" - childArgs += (appArgs.master, appArgs.primaryResource, appArgs.mainClass) + childArgs += (args.master, args.primaryResource, args.mainClass) } // Arguments to be passed to user program - if (appArgs.childArgs != null) { + if (args.childArgs != null) { if (!deployOnCluster || clusterManager == STANDALONE) { - childArgs ++= appArgs.childArgs + childArgs ++= args.childArgs } else if (clusterManager == YARN) { - for (arg <- appArgs.childArgs) { + for (arg <- args.childArgs) { childArgs += ("--arg", arg) } } } - for ((k, v) <- appArgs.getDefaultSparkProperties) { + for ((k, v) <- args.getDefaultSparkProperties) { if (!sysProps.contains(k)) sysProps(k) = v } @@ -230,8 +250,8 @@ object SparkSubmit { } private def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String], - sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false) { - + sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false) + { if (verbose) { printStream.println(s"Main class:\n$childMainClass") printStream.println(s"Arguments:\n${childArgs.mkString("\n")}") @@ -273,15 +293,26 @@ object SparkSubmit { val url = localJarFile.getAbsoluteFile.toURI.toURL loader.addURL(url) } + + /** + * Merge a sequence of comma-separated file lists, some of which may be null to indicate + * no files, into a single comma-separated string. + */ + private[spark] def mergeFileLists(lists: String*): String = { + val merged = lists.filter(_ != null) + .flatMap(_.split(",")) + .mkString(",") + if (merged == "") null else merged + } } /** * Provides an indirection layer for passing arguments as system properties or flags to * the user's driver program or to downstream launcher tools. */ -private[spark] class OptionAssigner(val value: String, - val clusterManager: Int, - val deployOnCluster: Boolean, - val clOption: String = null, - val sysProp: String = null -) { } +private[spark] case class OptionAssigner( + value: String, + clusterManager: Int, + deployOnCluster: Boolean, + clOption: String = null, + sysProp: String = null) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 7031cdd9b4..2d327aa3fb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy import java.io.{File, FileInputStream, IOException} import java.util.Properties +import java.util.jar.JarFile import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap} @@ -52,6 +53,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]() var jars: String = null var verbose: Boolean = false + var isPython: Boolean = false + var pyFiles: String = null parseOpts(args.toList) loadDefaults() @@ -76,7 +79,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { } /** Fill in any undefined values based on the current properties file or built-in defaults. */ - private def loadDefaults() = { + private def loadDefaults(): Unit = { // Use common defaults file, if not specified by user if (propertiesFile == null) { @@ -107,15 +110,43 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { master = Option(master).getOrElse(System.getenv("MASTER")) deployMode = Option(deployMode).getOrElse(System.getenv("DEPLOY_MODE")) + // Try to set main class from JAR if no --class argument is given + if (mainClass == null && !isPython && primaryResource != null) { + try { + val jar = new JarFile(primaryResource) + // Note that this might still return null if no main-class is set; we catch that later + mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class") + } catch { + case e: Exception => + SparkSubmit.printErrorAndExit("Failed to read JAR: " + primaryResource) + return + } + } + // Global defaults. These should be keep to minimum to avoid confusing behavior. master = Option(master).getOrElse("local[*]") + + // Set name from main class if not given + name = Option(name).orElse(Option(mainClass)).orNull + if (name == null && primaryResource != null) { + name = Utils.stripDirectory(primaryResource) + } } /** Ensure that required fields exists. Call this only once all defaults are loaded. */ private def checkRequiredArguments() = { - if (args.length == 0) printUsageAndExit(-1) - if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource") - if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class") + if (args.length == 0) { + printUsageAndExit(-1) + } + if (primaryResource == null) { + SparkSubmit.printErrorAndExit("Must specify a primary resource (JAR or Python file)") + } + if (mainClass == null && !isPython) { + SparkSubmit.printErrorAndExit("No main class set in JAR; please specify one with --class") + } + if (pyFiles != null && !isPython) { + SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script") + } if (master.startsWith("yarn")) { val hasHadoopEnv = sys.env.contains("HADOOP_CONF_DIR") || sys.env.contains("YARN_CONF_DIR") @@ -143,6 +174,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { | queue $queue | numExecutors $numExecutors | files $files + | pyFiles $pyFiles | archives $archives | mainClass $mainClass | primaryResource $primaryResource @@ -234,6 +266,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { files = value parse(tail) + case ("--py-files") :: value :: tail => + pyFiles = value + parse(tail) + case ("--archives") :: value :: tail => archives = value parse(tail) @@ -260,9 +296,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { val errMessage = s"Unrecognized option '$value'." SparkSubmit.printErrorAndExit(errMessage) case v => - primaryResource = v - inSparkOpts = false - parse(tail) + primaryResource = v + inSparkOpts = false + isPython = v.endsWith(".py") + parse(tail) } } else { childArgs += value @@ -270,7 +307,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { } case Nil => - } + } } private def printUsageAndExit(exitCode: Int, unknownParam: Any = null) { @@ -279,23 +316,26 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { outStream.println("Unknown/unsupported param " + unknownParam) } outStream.println( - """Usage: spark-submit [options] <app jar> [app options] + """Usage: spark-submit [options] <app jar | python file> [app options] |Options: | --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local. - | --deploy-mode DEPLOY_MODE Mode to deploy the app in, either 'client' or 'cluster'. - | --class CLASS_NAME Name of your app's main class (required for Java apps). - | --name NAME The name of your application (Default: 'Spark'). - | --jars JARS A comma-separated list of local jars to include on the - | driver classpath and that SparkContext.addJar will work - | with. Doesn't work on standalone with 'cluster' deploy mode. - | --files FILES Comma separated list of files to be placed in the working dir - | of each executor. + | --deploy-mode DEPLOY_MODE Where to run the driver program: either "client" to run + | on the local machine, or "cluster" to run inside cluster. + | --class CLASS_NAME Your application's main class (for Java / Scala apps). + | --name NAME A name of your application. + | --jars JARS Comma-separated list of local jars to include on the driver + | and executor classpaths. Doesn't work for drivers in + | standalone mode with "cluster" deploy mode. + | --py-files PY_FILES Comma-separated list of .zip or .egg files to place on the + | PYTHONPATH for Python apps. + | --files FILES Comma-separated list of files to be placed in the working + | directory of each executor. | --properties-file FILE Path to a file from which to load extra properties. If not | specified, this will look for conf/spark-defaults.conf. | | --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512M). - | --driver-java-options Extra Java options to pass to the driver - | --driver-library-path Extra library path entries to pass to the driver + | --driver-java-options Extra Java options to pass to the driver. + | --driver-library-path Extra library path entries to pass to the driver. | --driver-class-path Extra class path entries to pass to the driver. Note that | jars added with --jars are automatically included in the | classpath. @@ -311,10 +351,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { | | YARN-only: | --executor-cores NUM Number of cores per executor (Default: 1). - | --queue QUEUE_NAME The YARN queue to submit to (Default: 'default'). - | --num-executors NUM Number of executors to (Default: 2). + | --queue QUEUE_NAME The YARN queue to submit to (Default: "default"). + | --num-executors NUM Number of executors to launch (Default: 2). | --archives ARCHIVES Comma separated list of archives to be extracted into the - | working dir of each executor.""".stripMargin + | working directory of each executor.""".stripMargin ) SparkSubmit.exitFn() } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index bef4dab3d7..202bd46956 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -46,7 +46,6 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream, * Various utility methods used by Spark. */ private[spark] object Utils extends Logging { - val random = new Random() def sparkBin(sparkHome: String, which: String): File = { @@ -1082,4 +1081,11 @@ private[spark] object Utils extends Logging { def isTesting = { sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") } + + /** + * Strip the directory from a path name + */ + def stripDirectory(path: String): String = { + path.split(File.separator).last + } } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index b3541b4a40..d7e3b22ed4 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -83,7 +83,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { } test("handle binary specified but not class") { - testPrematureExit(Array("foo.jar"), "Must specify a main class") + testPrematureExit(Array("foo.jar"), "No main class") } test("handles arguments with --key=val") { @@ -94,9 +94,9 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { } test("handles arguments to user program") { - val clArgs = Seq("--name", "myApp", "userjar.jar", "some", "--random", "args", "here") + val clArgs = Seq("--name", "myApp", "--class", "Foo", "userjar.jar", "some", "--weird", "args") val appArgs = new SparkSubmitArguments(clArgs) - appArgs.childArgs should be (Seq("some", "--random", "args", "here")) + appArgs.childArgs should be (Seq("some", "--weird", "args")) } test("handles YARN cluster mode") { |