From f5e63751f0ed50ceafdc2ec5173b161a5155b646 Mon Sep 17 00:00:00 2001 From: lianhuiwang Date: Mon, 2 Feb 2015 12:32:28 -0800 Subject: [SPARK-5173]support python application running on yarn cluster mode now when we run python application on yarn cluster mode through spark-submit, spark-submit does not support python application on yarn cluster mode. so i modify code of submit and yarn's AM in order to support it. through specifying .py file or primaryResource file via spark-submit, we can make pyspark run in yarn-cluster mode. example:spark-submit --master yarn-master --num-executors 1 --driver-memory 1g --executor-memory 1g xx.py --primaryResource yy.conf this config is same as pyspark on yarn-client mode. firstly,we put local path of .py or primaryResource to yarn's dist.files.that can be distributed on slave nodes.and then in spark-submit we transfer --py-files and --primaryResource to yarn.Client and use "org.apache.spark.deploy.PythonRunner" to user class that can run .py files on ApplicationMaster. in yarn.Client we transfer --py-files and --primaryResource to ApplicationMaster. in ApplicationMaster, user's class is org.apache.spark.deploy.PythonRunner, and user's args is primaryResource and -py-files. so that can make pyspark run on ApplicationMaster. JoshRosen tgravescs sryza Author: lianhuiwang Author: Wang Lianhui Closes #3976 from lianhuiwang/SPARK-5173 and squashes the following commits: 28a8a58 [lianhuiwang] fix variable name 67f8cee [lianhuiwang] update with andrewor's comments 0319ae3 [lianhuiwang] address with sryza's comments 2385ef6 [lianhuiwang] address with sryza's comments 03640ab [lianhuiwang] add sparkHome to env 47d2fc3 [lianhuiwang] fix test 2adc8f5 [lianhuiwang] add spark.test.home d60bc60 [lianhuiwang] fix test 5b30064 [lianhuiwang] add test 097a5ec [lianhuiwang] fix line length exceeds 100 905a106 [lianhuiwang] update with sryza and andrewor 's comments f1f55b6 [lianhuiwang] when yarn-cluster, all python files can be non-local 172eec1 [Wang Lianhui] fix a min submit's bug 9c941bc [lianhuiwang] support python application running on yarn cluster mode --- .../org/apache/spark/deploy/PythonRunner.scala | 2 +- .../org/apache/spark/deploy/SparkSubmit.scala | 49 ++++++++++++++++++---- .../apache/spark/deploy/SparkSubmitArguments.scala | 12 ------ .../spark/deploy/yarn/ApplicationMaster.scala | 14 ++++--- .../deploy/yarn/ApplicationMasterArguments.scala | 13 ++++++ .../org/apache/spark/deploy/yarn/Client.scala | 19 ++++++++- .../apache/spark/deploy/yarn/ClientArguments.scala | 20 ++++++--- .../spark/deploy/yarn/YarnClusterSuite.scala | 44 +++++++++++++++++++ 8 files changed, 141 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala index 039c8719e2..53e18c4bce 100644 --- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -26,7 +26,7 @@ import org.apache.spark.api.python.PythonUtils import org.apache.spark.util.{RedirectThread, Utils} /** - * A main class used by spark-submit to launch Python applications. It executes python as a + * A main class used to launch Python applications. It executes python as a * subprocess and then has it connect back to the JVM to access system properties, etc. */ object PythonRunner { diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index c240bcd705..02021be9f9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -23,6 +23,8 @@ import java.net.URL import scala.collection.mutable.{ArrayBuffer, HashMap, Map} +import org.apache.hadoop.fs.Path + import org.apache.spark.executor.ExecutorURLClassLoader import org.apache.spark.util.Utils @@ -134,12 +136,27 @@ object SparkSubmit { } } + val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER + + // Require all python files to be local, so we can add them to the PYTHONPATH + // In YARN cluster mode, python files are distributed as regular files, which can be non-local + if (args.isPython && !isYarnCluster) { + if (Utils.nonLocalPaths(args.primaryResource).nonEmpty) { + printErrorAndExit(s"Only local python files are supported: $args.primaryResource") + } + val nonLocalPyFiles = Utils.nonLocalPaths(args.pyFiles).mkString(",") + if (nonLocalPyFiles.nonEmpty) { + printErrorAndExit(s"Only local additional python files are supported: $nonLocalPyFiles") + } + } + // The following modes are not supported or applicable (clusterManager, deployMode) match { case (MESOS, CLUSTER) => printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.") - case (_, CLUSTER) if args.isPython => - printErrorAndExit("Cluster deploy mode is currently not supported for python applications.") + case (STANDALONE, CLUSTER) if args.isPython => + printErrorAndExit("Cluster deploy mode is currently not supported for python " + + "applications on standalone clusters.") case (_, CLUSTER) if isShell(args.primaryResource) => printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.") case (_, CLUSTER) if isSqlShell(args.mainClass) => @@ -150,7 +167,7 @@ object SparkSubmit { } // If we're running a python app, set the main class to our specific python runner - if (args.isPython) { + if (args.isPython && deployMode == CLIENT) { if (args.primaryResource == PYSPARK_SHELL) { args.mainClass = "py4j.GatewayServer" args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0") @@ -167,6 +184,13 @@ object SparkSubmit { } } + // In yarn-cluster mode for a python app, add primary resource and pyFiles to files + // that can be distributed with the job + if (args.isPython && isYarnCluster) { + args.files = mergeFileLists(args.files, args.primaryResource) + args.files = mergeFileLists(args.files, args.pyFiles) + } + // Special flag to avoid deprecation warnings at the client sysProps("SPARK_SUBMIT") = "true" @@ -245,7 +269,6 @@ object SparkSubmit { // Add the application jar automatically so the user doesn't have to call sc.addJar // For YARN cluster mode, the jar is already distributed on each node as "app.jar" // For python files, the primary resource is already distributed as a regular file - val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER if (!isYarnCluster && !args.isPython) { var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty) if (isUserJar(args.primaryResource)) { @@ -270,10 +293,22 @@ object SparkSubmit { // In yarn-cluster mode, use yarn.Client as a wrapper around the user class if (isYarnCluster) { childMainClass = "org.apache.spark.deploy.yarn.Client" - if (args.primaryResource != SPARK_INTERNAL) { - childArgs += ("--jar", args.primaryResource) + if (args.isPython) { + val mainPyFile = new Path(args.primaryResource).getName + childArgs += ("--primary-py-file", mainPyFile) + if (args.pyFiles != null) { + // These files will be distributed to each machine's working directory, so strip the + // path prefix + val pyFilesNames = args.pyFiles.split(",").map(p => (new Path(p)).getName).mkString(",") + childArgs += ("--py-files", pyFilesNames) + } + childArgs += ("--class", "org.apache.spark.deploy.PythonRunner") + } else { + if (args.primaryResource != SPARK_INTERNAL) { + childArgs += ("--jar", args.primaryResource) + } + childArgs += ("--class", args.mainClass) } - childArgs += ("--class", args.mainClass) if (args.childArgs != null) { args.childArgs.foreach { arg => childArgs += ("--arg", arg) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 81ec08cb6d..73e921fd83 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -179,18 +179,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script") } - // Require all python files to be local, so we can add them to the PYTHONPATH - if (isPython) { - if (Utils.nonLocalPaths(primaryResource).nonEmpty) { - SparkSubmit.printErrorAndExit(s"Only local python files are supported: $primaryResource") - } - val nonLocalPyFiles = Utils.nonLocalPaths(pyFiles).mkString(",") - if (nonLocalPyFiles.nonEmpty) { - SparkSubmit.printErrorAndExit( - s"Only local additional python files are supported: $nonLocalPyFiles") - } - } - if (master.startsWith("yarn")) { val hasHadoopEnv = env.contains("HADOOP_CONF_DIR") || env.contains("YARN_CONF_DIR") if (!hasHadoopEnv && !Utils.isTesting) { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index d3e327b249..eb328b2b8a 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv} import org.apache.spark.SparkException -import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil} import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.scheduler.cluster.YarnSchedulerBackend import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ @@ -135,7 +135,7 @@ private[spark] class ApplicationMaster( .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY) // Call this to force generation of secret so it gets populated into the - // Hadoop UGI. This has to happen before the startUserClass which does a + // Hadoop UGI. This has to happen before the startUserApplication which does a // doAs in order for the credentials to be passed on to the executor containers. val securityMgr = new SecurityManager(sparkConf) @@ -254,7 +254,7 @@ private[spark] class ApplicationMaster( private def runDriver(securityMgr: SecurityManager): Unit = { addAmIpFilter() - userClassThread = startUserClass() + userClassThread = startUserApplication() // This a bit hacky, but we need to wait until the spark.driver.port property has // been set by the Thread executing the user class. @@ -448,9 +448,13 @@ private[spark] class ApplicationMaster( * * Returns the user thread that was started. */ - private def startUserClass(): Thread = { - logInfo("Starting the user JAR in a separate Thread") + private def startUserApplication(): Thread = { + logInfo("Starting the user application in a separate Thread") System.setProperty("spark.executor.instances", args.numExecutors.toString) + if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) { + System.setProperty("spark.submit.pyFiles", + PythonRunner.formatPaths(args.pyFiles).mkString(",")) + } val mainMethod = Class.forName(args.userClass, false, Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala index d76a63276d..e1a992af3a 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -24,6 +24,8 @@ import collection.mutable.ArrayBuffer class ApplicationMasterArguments(val args: Array[String]) { var userJar: String = null var userClass: String = null + var primaryPyFile: String = null + var pyFiles: String = null var userArgs: Seq[String] = Seq[String]() var executorMemory = 1024 var executorCores = 1 @@ -48,6 +50,14 @@ class ApplicationMasterArguments(val args: Array[String]) { userClass = value args = tail + case ("--primary-py-file") :: value :: tail => + primaryPyFile = value + args = tail + + case ("--py-files") :: value :: tail => + pyFiles = value + args = tail + case ("--args" | "--arg") :: value :: tail => userArgsBuffer += value args = tail @@ -81,6 +91,9 @@ class ApplicationMasterArguments(val args: Array[String]) { |Options: | --jar JAR_PATH Path to your application's JAR file | --class CLASS_NAME Name of your application's main class + | --primary-py-file A main Python file + | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to + | place on the PYTHONPATH for Python apps. | --args ARGS Arguments to be passed to your application's main class. | Multiple invocations are possible, each will be passed in order. | --num-executors NUM Number of executors to start (Default: 2) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 1a18e6509e..91e8574e94 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -21,7 +21,7 @@ import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} import java.nio.ByteBuffer import scala.collection.JavaConversions._ -import scala.collection.mutable.{HashMap, ListBuffer, Map} +import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer, Map} import scala.util.{Try, Success, Failure} import com.google.common.base.Objects @@ -477,17 +477,32 @@ private[spark] class Client( } else { Nil } + val primaryPyFile = + if (args.primaryPyFile != null) { + Seq("--primary-py-file", args.primaryPyFile) + } else { + Nil + } + val pyFiles = + if (args.pyFiles != null) { + Seq("--py-files", args.pyFiles) + } else { + Nil + } val amClass = if (isClusterMode) { Class.forName("org.apache.spark.deploy.yarn.ApplicationMaster").getName } else { Class.forName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName } + if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) { + args.userArgs = ArrayBuffer(args.primaryPyFile, args.pyFiles) ++ args.userArgs + } val userArgs = args.userArgs.flatMap { arg => Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg)) } val amArgs = - Seq(amClass) ++ userClass ++ userJar ++ userArgs ++ + Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ pyFiles ++ userArgs ++ Seq( "--executor-memory", args.executorMemory.toString + "m", "--executor-cores", args.executorCores.toString, diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 5eb2023802..3bc7eb1abf 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -30,7 +30,9 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) var archives: String = null var userJar: String = null var userClass: String = null - var userArgs: Seq[String] = Seq[String]() + var pyFiles: String = null + var primaryPyFile: String = null + var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]() var executorMemory = 1024 // MB var executorCores = 1 var numExecutors = DEFAULT_NUMBER_EXECUTORS @@ -132,7 +134,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) } private def parseArgs(inputArgs: List[String]): Unit = { - val userArgsBuffer = new ArrayBuffer[String]() var args = inputArgs while (!args.isEmpty) { @@ -145,11 +146,15 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) userClass = value args = tail + case ("--primary-py-file") :: value :: tail => + primaryPyFile = value + args = tail + case ("--args" | "--arg") :: value :: tail => if (args(0) == "--args") { println("--args is deprecated. Use --arg instead.") } - userArgsBuffer += value + userArgs += value args = tail case ("--master-class" | "--am-class") :: value :: tail => @@ -205,6 +210,10 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) addJars = value args = tail + case ("--py-files") :: value :: tail => + pyFiles = value + args = tail + case ("--files") :: value :: tail => files = value args = tail @@ -219,8 +228,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) throw new IllegalArgumentException(getUsageMessage(args)) } } - - userArgs = userArgsBuffer.readOnly } private def getUsageMessage(unknownParam: List[String] = null): String = { @@ -232,6 +239,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) | --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster | mode) | --class CLASS_NAME Name of your application's main class (required) + | --primary-py-file A main Python file | --arg ARG Argument to be passed to your application's main class. | Multiple invocations are possible, each will be passed in order. | --num-executors NUM Number of executors to start (Default: 2) @@ -244,6 +252,8 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) | 'default') | --addJars jars Comma separated list of local jars that want SparkContext.addJar | to work with. + | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to + | place on the PYTHONPATH for Python apps. | --files files Comma separated list of files to be distributed with the job. | --archives archives Comma separated list of archives to be distributed with the job. """.stripMargin diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index d79b85e867..7165918e1b 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -45,6 +45,29 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit |log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n """.stripMargin + private val TEST_PYFILE = """ + |import sys + |from operator import add + | + |from pyspark import SparkConf , SparkContext + |if __name__ == "__main__": + | if len(sys.argv) != 3: + | print >> sys.stderr, "Usage: test.py [master] [result file]" + | exit(-1) + | conf = SparkConf() + | conf.setMaster(sys.argv[1]).setAppName("python test in yarn cluster mode") + | sc = SparkContext(conf=conf) + | status = open(sys.argv[2],'w') + | result = "failure" + | rdd = sc.parallelize(range(10)) + | cnt = rdd.count() + | if cnt == 10: + | result = "success" + | status.write(result) + | status.close() + | sc.stop() + """.stripMargin + private var yarnCluster: MiniYARNCluster = _ private var tempDir: File = _ private var fakeSparkJar: File = _ @@ -98,6 +121,9 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit } fakeSparkJar = File.createTempFile("sparkJar", null, tempDir) + val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) + sys.props += ("spark.yarn.appMasterEnv.SPARK_HOME" -> sparkHome) + sys.props += ("spark.executorEnv.SPARK_HOME" -> sparkHome) sys.props += ("spark.yarn.jar" -> ("local:" + fakeSparkJar.getAbsolutePath())) sys.props += ("spark.executor.instances" -> "1") sys.props += ("spark.driver.extraClassPath" -> childClasspath) @@ -146,6 +172,24 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit assert(Utils.exceptionString(exception).contains("Application finished with failed status")) } + test("run Python application in yarn-cluster mode") { + val primaryPyFile = new File(tempDir, "test.py") + Files.write(TEST_PYFILE, primaryPyFile, Charsets.UTF_8) + val pyFile = new File(tempDir, "test2.py") + Files.write(TEST_PYFILE, pyFile, Charsets.UTF_8) + var result = File.createTempFile("result", null, tempDir) + + val args = Array("--class", "org.apache.spark.deploy.PythonRunner", + "--primary-py-file", primaryPyFile.getAbsolutePath(), + "--py-files", pyFile.getAbsolutePath(), + "--arg", "yarn-cluster", + "--arg", result.getAbsolutePath(), + "--name", "python test in yarn-cluster mode", + "--num-executors", "1") + Client.main(args) + checkResult(result) + } + /** * This is a workaround for an issue with yarn-cluster mode: the Client class will not provide * any sort of error when the job process finishes successfully, but the job itself fails. So -- cgit v1.2.3