From f5e63751f0ed50ceafdc2ec5173b161a5155b646 Mon Sep 17 00:00:00 2001 From: lianhuiwang Date: Mon, 2 Feb 2015 12:32:28 -0800 Subject: [SPARK-5173]support python application running on yarn cluster mode now when we run python application on yarn cluster mode through spark-submit, spark-submit does not support python application on yarn cluster mode. so i modify code of submit and yarn's AM in order to support it. through specifying .py file or primaryResource file via spark-submit, we can make pyspark run in yarn-cluster mode. example:spark-submit --master yarn-master --num-executors 1 --driver-memory 1g --executor-memory 1g xx.py --primaryResource yy.conf this config is same as pyspark on yarn-client mode. firstly,we put local path of .py or primaryResource to yarn's dist.files.that can be distributed on slave nodes.and then in spark-submit we transfer --py-files and --primaryResource to yarn.Client and use "org.apache.spark.deploy.PythonRunner" to user class that can run .py files on ApplicationMaster. in yarn.Client we transfer --py-files and --primaryResource to ApplicationMaster. in ApplicationMaster, user's class is org.apache.spark.deploy.PythonRunner, and user's args is primaryResource and -py-files. so that can make pyspark run on ApplicationMaster. JoshRosen tgravescs sryza Author: lianhuiwang Author: Wang Lianhui Closes #3976 from lianhuiwang/SPARK-5173 and squashes the following commits: 28a8a58 [lianhuiwang] fix variable name 67f8cee [lianhuiwang] update with andrewor's comments 0319ae3 [lianhuiwang] address with sryza's comments 2385ef6 [lianhuiwang] address with sryza's comments 03640ab [lianhuiwang] add sparkHome to env 47d2fc3 [lianhuiwang] fix test 2adc8f5 [lianhuiwang] add spark.test.home d60bc60 [lianhuiwang] fix test 5b30064 [lianhuiwang] add test 097a5ec [lianhuiwang] fix line length exceeds 100 905a106 [lianhuiwang] update with sryza and andrewor 's comments f1f55b6 [lianhuiwang] when yarn-cluster, all python files can be non-local 172eec1 [Wang Lianhui] fix a min submit's bug 9c941bc [lianhuiwang] support python application running on yarn cluster mode --- .../spark/deploy/yarn/ApplicationMaster.scala | 14 ++++--- .../deploy/yarn/ApplicationMasterArguments.scala | 13 +++++++ .../org/apache/spark/deploy/yarn/Client.scala | 19 +++++++++- .../apache/spark/deploy/yarn/ClientArguments.scala | 20 +++++++--- .../spark/deploy/yarn/YarnClusterSuite.scala | 44 ++++++++++++++++++++++ 5 files changed, 98 insertions(+), 12 deletions(-) (limited to 'yarn') diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index d3e327b249..eb328b2b8a 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv} import org.apache.spark.SparkException -import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil} import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.scheduler.cluster.YarnSchedulerBackend import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ @@ -135,7 +135,7 @@ private[spark] class ApplicationMaster( .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY) // Call this to force generation of secret so it gets populated into the - // Hadoop UGI. This has to happen before the startUserClass which does a + // Hadoop UGI. This has to happen before the startUserApplication which does a // doAs in order for the credentials to be passed on to the executor containers. val securityMgr = new SecurityManager(sparkConf) @@ -254,7 +254,7 @@ private[spark] class ApplicationMaster( private def runDriver(securityMgr: SecurityManager): Unit = { addAmIpFilter() - userClassThread = startUserClass() + userClassThread = startUserApplication() // This a bit hacky, but we need to wait until the spark.driver.port property has // been set by the Thread executing the user class. @@ -448,9 +448,13 @@ private[spark] class ApplicationMaster( * * Returns the user thread that was started. */ - private def startUserClass(): Thread = { - logInfo("Starting the user JAR in a separate Thread") + private def startUserApplication(): Thread = { + logInfo("Starting the user application in a separate Thread") System.setProperty("spark.executor.instances", args.numExecutors.toString) + if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) { + System.setProperty("spark.submit.pyFiles", + PythonRunner.formatPaths(args.pyFiles).mkString(",")) + } val mainMethod = Class.forName(args.userClass, false, Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala index d76a63276d..e1a992af3a 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -24,6 +24,8 @@ import collection.mutable.ArrayBuffer class ApplicationMasterArguments(val args: Array[String]) { var userJar: String = null var userClass: String = null + var primaryPyFile: String = null + var pyFiles: String = null var userArgs: Seq[String] = Seq[String]() var executorMemory = 1024 var executorCores = 1 @@ -48,6 +50,14 @@ class ApplicationMasterArguments(val args: Array[String]) { userClass = value args = tail + case ("--primary-py-file") :: value :: tail => + primaryPyFile = value + args = tail + + case ("--py-files") :: value :: tail => + pyFiles = value + args = tail + case ("--args" | "--arg") :: value :: tail => userArgsBuffer += value args = tail @@ -81,6 +91,9 @@ class ApplicationMasterArguments(val args: Array[String]) { |Options: | --jar JAR_PATH Path to your application's JAR file | --class CLASS_NAME Name of your application's main class + | --primary-py-file A main Python file + | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to + | place on the PYTHONPATH for Python apps. | --args ARGS Arguments to be passed to your application's main class. | Multiple invocations are possible, each will be passed in order. | --num-executors NUM Number of executors to start (Default: 2) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 1a18e6509e..91e8574e94 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -21,7 +21,7 @@ import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} import java.nio.ByteBuffer import scala.collection.JavaConversions._ -import scala.collection.mutable.{HashMap, ListBuffer, Map} +import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer, Map} import scala.util.{Try, Success, Failure} import com.google.common.base.Objects @@ -477,17 +477,32 @@ private[spark] class Client( } else { Nil } + val primaryPyFile = + if (args.primaryPyFile != null) { + Seq("--primary-py-file", args.primaryPyFile) + } else { + Nil + } + val pyFiles = + if (args.pyFiles != null) { + Seq("--py-files", args.pyFiles) + } else { + Nil + } val amClass = if (isClusterMode) { Class.forName("org.apache.spark.deploy.yarn.ApplicationMaster").getName } else { Class.forName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName } + if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) { + args.userArgs = ArrayBuffer(args.primaryPyFile, args.pyFiles) ++ args.userArgs + } val userArgs = args.userArgs.flatMap { arg => Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg)) } val amArgs = - Seq(amClass) ++ userClass ++ userJar ++ userArgs ++ + Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ pyFiles ++ userArgs ++ Seq( "--executor-memory", args.executorMemory.toString + "m", "--executor-cores", args.executorCores.toString, diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 5eb2023802..3bc7eb1abf 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -30,7 +30,9 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) var archives: String = null var userJar: String = null var userClass: String = null - var userArgs: Seq[String] = Seq[String]() + var pyFiles: String = null + var primaryPyFile: String = null + var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]() var executorMemory = 1024 // MB var executorCores = 1 var numExecutors = DEFAULT_NUMBER_EXECUTORS @@ -132,7 +134,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) } private def parseArgs(inputArgs: List[String]): Unit = { - val userArgsBuffer = new ArrayBuffer[String]() var args = inputArgs while (!args.isEmpty) { @@ -145,11 +146,15 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) userClass = value args = tail + case ("--primary-py-file") :: value :: tail => + primaryPyFile = value + args = tail + case ("--args" | "--arg") :: value :: tail => if (args(0) == "--args") { println("--args is deprecated. Use --arg instead.") } - userArgsBuffer += value + userArgs += value args = tail case ("--master-class" | "--am-class") :: value :: tail => @@ -205,6 +210,10 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) addJars = value args = tail + case ("--py-files") :: value :: tail => + pyFiles = value + args = tail + case ("--files") :: value :: tail => files = value args = tail @@ -219,8 +228,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) throw new IllegalArgumentException(getUsageMessage(args)) } } - - userArgs = userArgsBuffer.readOnly } private def getUsageMessage(unknownParam: List[String] = null): String = { @@ -232,6 +239,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) | --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster | mode) | --class CLASS_NAME Name of your application's main class (required) + | --primary-py-file A main Python file | --arg ARG Argument to be passed to your application's main class. | Multiple invocations are possible, each will be passed in order. | --num-executors NUM Number of executors to start (Default: 2) @@ -244,6 +252,8 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) | 'default') | --addJars jars Comma separated list of local jars that want SparkContext.addJar | to work with. + | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to + | place on the PYTHONPATH for Python apps. | --files files Comma separated list of files to be distributed with the job. | --archives archives Comma separated list of archives to be distributed with the job. """.stripMargin diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index d79b85e867..7165918e1b 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -45,6 +45,29 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit |log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n """.stripMargin + private val TEST_PYFILE = """ + |import sys + |from operator import add + | + |from pyspark import SparkConf , SparkContext + |if __name__ == "__main__": + | if len(sys.argv) != 3: + | print >> sys.stderr, "Usage: test.py [master] [result file]" + | exit(-1) + | conf = SparkConf() + | conf.setMaster(sys.argv[1]).setAppName("python test in yarn cluster mode") + | sc = SparkContext(conf=conf) + | status = open(sys.argv[2],'w') + | result = "failure" + | rdd = sc.parallelize(range(10)) + | cnt = rdd.count() + | if cnt == 10: + | result = "success" + | status.write(result) + | status.close() + | sc.stop() + """.stripMargin + private var yarnCluster: MiniYARNCluster = _ private var tempDir: File = _ private var fakeSparkJar: File = _ @@ -98,6 +121,9 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit } fakeSparkJar = File.createTempFile("sparkJar", null, tempDir) + val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) + sys.props += ("spark.yarn.appMasterEnv.SPARK_HOME" -> sparkHome) + sys.props += ("spark.executorEnv.SPARK_HOME" -> sparkHome) sys.props += ("spark.yarn.jar" -> ("local:" + fakeSparkJar.getAbsolutePath())) sys.props += ("spark.executor.instances" -> "1") sys.props += ("spark.driver.extraClassPath" -> childClasspath) @@ -146,6 +172,24 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit assert(Utils.exceptionString(exception).contains("Application finished with failed status")) } + test("run Python application in yarn-cluster mode") { + val primaryPyFile = new File(tempDir, "test.py") + Files.write(TEST_PYFILE, primaryPyFile, Charsets.UTF_8) + val pyFile = new File(tempDir, "test2.py") + Files.write(TEST_PYFILE, pyFile, Charsets.UTF_8) + var result = File.createTempFile("result", null, tempDir) + + val args = Array("--class", "org.apache.spark.deploy.PythonRunner", + "--primary-py-file", primaryPyFile.getAbsolutePath(), + "--py-files", pyFile.getAbsolutePath(), + "--arg", "yarn-cluster", + "--arg", result.getAbsolutePath(), + "--name", "python test in yarn-cluster mode", + "--num-executors", "1") + Client.main(args) + checkResult(result) + } + /** * This is a workaround for an issue with yarn-cluster mode: the Client class will not provide * any sort of error when the job process finishes successfully, but the job itself fails. So -- cgit v1.2.3