aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorlianhuiwang <lianhuiwang09@gmail.com>2015-02-02 12:32:28 -0800
committerAndrew Or <andrew@databricks.com>2015-02-02 12:32:28 -0800
commitf5e63751f0ed50ceafdc2ec5173b161a5155b646 (patch)
tree75a199b5fa514ecd48ef634666e4d32d2036ef1b /yarn
parentb2047b55c5fc85de6b63276d8ab9610d2496e08b (diff)
downloadspark-f5e63751f0ed50ceafdc2ec5173b161a5155b646.tar.gz
spark-f5e63751f0ed50ceafdc2ec5173b161a5155b646.tar.bz2
spark-f5e63751f0ed50ceafdc2ec5173b161a5155b646.zip
[SPARK-5173]support python application running on yarn cluster mode
now when we run python application on yarn cluster mode through spark-submit, spark-submit does not support python application on yarn cluster mode. so i modify code of submit and yarn's AM in order to support it. through specifying .py file or primaryResource file via spark-submit, we can make pyspark run in yarn-cluster mode. example:spark-submit --master yarn-master --num-executors 1 --driver-memory 1g --executor-memory 1g xx.py --primaryResource yy.conf this config is same as pyspark on yarn-client mode. firstly,we put local path of .py or primaryResource to yarn's dist.files.that can be distributed on slave nodes.and then in spark-submit we transfer --py-files and --primaryResource to yarn.Client and use "org.apache.spark.deploy.PythonRunner" to user class that can run .py files on ApplicationMaster. in yarn.Client we transfer --py-files and --primaryResource to ApplicationMaster. in ApplicationMaster, user's class is org.apache.spark.deploy.PythonRunner, and user's args is primaryResource and -py-files. so that can make pyspark run on ApplicationMaster. JoshRosen tgravescs sryza Author: lianhuiwang <lianhuiwang09@gmail.com> Author: Wang Lianhui <lianhuiwang09@gmail.com> Closes #3976 from lianhuiwang/SPARK-5173 and squashes the following commits: 28a8a58 [lianhuiwang] fix variable name 67f8cee [lianhuiwang] update with andrewor's comments 0319ae3 [lianhuiwang] address with sryza's comments 2385ef6 [lianhuiwang] address with sryza's comments 03640ab [lianhuiwang] add sparkHome to env 47d2fc3 [lianhuiwang] fix test 2adc8f5 [lianhuiwang] add spark.test.home d60bc60 [lianhuiwang] fix test 5b30064 [lianhuiwang] add test 097a5ec [lianhuiwang] fix line length exceeds 100 905a106 [lianhuiwang] update with sryza and andrewor 's comments f1f55b6 [lianhuiwang] when yarn-cluster, all python files can be non-local 172eec1 [Wang Lianhui] fix a min submit's bug 9c941bc [lianhuiwang] support python application running on yarn cluster mode
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala14
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala13
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala19
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala20
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala44
5 files changed, 98 insertions, 12 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index d3e327b249..eb328b2b8a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.SparkException
-import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil}
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.scheduler.cluster.YarnSchedulerBackend
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -135,7 +135,7 @@ private[spark] class ApplicationMaster(
.get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
// Call this to force generation of secret so it gets populated into the
- // Hadoop UGI. This has to happen before the startUserClass which does a
+ // Hadoop UGI. This has to happen before the startUserApplication which does a
// doAs in order for the credentials to be passed on to the executor containers.
val securityMgr = new SecurityManager(sparkConf)
@@ -254,7 +254,7 @@ private[spark] class ApplicationMaster(
private def runDriver(securityMgr: SecurityManager): Unit = {
addAmIpFilter()
- userClassThread = startUserClass()
+ userClassThread = startUserApplication()
// This a bit hacky, but we need to wait until the spark.driver.port property has
// been set by the Thread executing the user class.
@@ -448,9 +448,13 @@ private[spark] class ApplicationMaster(
*
* Returns the user thread that was started.
*/
- private def startUserClass(): Thread = {
- logInfo("Starting the user JAR in a separate Thread")
+ private def startUserApplication(): Thread = {
+ logInfo("Starting the user application in a separate Thread")
System.setProperty("spark.executor.instances", args.numExecutors.toString)
+ if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
+ System.setProperty("spark.submit.pyFiles",
+ PythonRunner.formatPaths(args.pyFiles).mkString(","))
+ }
val mainMethod = Class.forName(args.userClass, false,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index d76a63276d..e1a992af3a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -24,6 +24,8 @@ import collection.mutable.ArrayBuffer
class ApplicationMasterArguments(val args: Array[String]) {
var userJar: String = null
var userClass: String = null
+ var primaryPyFile: String = null
+ var pyFiles: String = null
var userArgs: Seq[String] = Seq[String]()
var executorMemory = 1024
var executorCores = 1
@@ -48,6 +50,14 @@ class ApplicationMasterArguments(val args: Array[String]) {
userClass = value
args = tail
+ case ("--primary-py-file") :: value :: tail =>
+ primaryPyFile = value
+ args = tail
+
+ case ("--py-files") :: value :: tail =>
+ pyFiles = value
+ args = tail
+
case ("--args" | "--arg") :: value :: tail =>
userArgsBuffer += value
args = tail
@@ -81,6 +91,9 @@ class ApplicationMasterArguments(val args: Array[String]) {
|Options:
| --jar JAR_PATH Path to your application's JAR file
| --class CLASS_NAME Name of your application's main class
+ | --primary-py-file A main Python file
+ | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to
+ | place on the PYTHONPATH for Python apps.
| --args ARGS Arguments to be passed to your application's main class.
| Multiple invocations are possible, each will be passed in order.
| --num-executors NUM Number of executors to start (Default: 2)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 1a18e6509e..91e8574e94 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -21,7 +21,7 @@ import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
import java.nio.ByteBuffer
import scala.collection.JavaConversions._
-import scala.collection.mutable.{HashMap, ListBuffer, Map}
+import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer, Map}
import scala.util.{Try, Success, Failure}
import com.google.common.base.Objects
@@ -477,17 +477,32 @@ private[spark] class Client(
} else {
Nil
}
+ val primaryPyFile =
+ if (args.primaryPyFile != null) {
+ Seq("--primary-py-file", args.primaryPyFile)
+ } else {
+ Nil
+ }
+ val pyFiles =
+ if (args.pyFiles != null) {
+ Seq("--py-files", args.pyFiles)
+ } else {
+ Nil
+ }
val amClass =
if (isClusterMode) {
Class.forName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
Class.forName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
+ if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
+ args.userArgs = ArrayBuffer(args.primaryPyFile, args.pyFiles) ++ args.userArgs
+ }
val userArgs = args.userArgs.flatMap { arg =>
Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
}
val amArgs =
- Seq(amClass) ++ userClass ++ userJar ++ userArgs ++
+ Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ pyFiles ++ userArgs ++
Seq(
"--executor-memory", args.executorMemory.toString + "m",
"--executor-cores", args.executorCores.toString,
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 5eb2023802..3bc7eb1abf 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -30,7 +30,9 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
var archives: String = null
var userJar: String = null
var userClass: String = null
- var userArgs: Seq[String] = Seq[String]()
+ var pyFiles: String = null
+ var primaryPyFile: String = null
+ var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
var executorMemory = 1024 // MB
var executorCores = 1
var numExecutors = DEFAULT_NUMBER_EXECUTORS
@@ -132,7 +134,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
}
private def parseArgs(inputArgs: List[String]): Unit = {
- val userArgsBuffer = new ArrayBuffer[String]()
var args = inputArgs
while (!args.isEmpty) {
@@ -145,11 +146,15 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
userClass = value
args = tail
+ case ("--primary-py-file") :: value :: tail =>
+ primaryPyFile = value
+ args = tail
+
case ("--args" | "--arg") :: value :: tail =>
if (args(0) == "--args") {
println("--args is deprecated. Use --arg instead.")
}
- userArgsBuffer += value
+ userArgs += value
args = tail
case ("--master-class" | "--am-class") :: value :: tail =>
@@ -205,6 +210,10 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
addJars = value
args = tail
+ case ("--py-files") :: value :: tail =>
+ pyFiles = value
+ args = tail
+
case ("--files") :: value :: tail =>
files = value
args = tail
@@ -219,8 +228,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
throw new IllegalArgumentException(getUsageMessage(args))
}
}
-
- userArgs = userArgsBuffer.readOnly
}
private def getUsageMessage(unknownParam: List[String] = null): String = {
@@ -232,6 +239,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
| --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster
| mode)
| --class CLASS_NAME Name of your application's main class (required)
+ | --primary-py-file A main Python file
| --arg ARG Argument to be passed to your application's main class.
| Multiple invocations are possible, each will be passed in order.
| --num-executors NUM Number of executors to start (Default: 2)
@@ -244,6 +252,8 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
| 'default')
| --addJars jars Comma separated list of local jars that want SparkContext.addJar
| to work with.
+ | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to
+ | place on the PYTHONPATH for Python apps.
| --files files Comma separated list of files to be distributed with the job.
| --archives archives Comma separated list of archives to be distributed with the job.
""".stripMargin
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index d79b85e867..7165918e1b 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -45,6 +45,29 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
|log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
""".stripMargin
+ private val TEST_PYFILE = """
+ |import sys
+ |from operator import add
+ |
+ |from pyspark import SparkConf , SparkContext
+ |if __name__ == "__main__":
+ | if len(sys.argv) != 3:
+ | print >> sys.stderr, "Usage: test.py [master] [result file]"
+ | exit(-1)
+ | conf = SparkConf()
+ | conf.setMaster(sys.argv[1]).setAppName("python test in yarn cluster mode")
+ | sc = SparkContext(conf=conf)
+ | status = open(sys.argv[2],'w')
+ | result = "failure"
+ | rdd = sc.parallelize(range(10))
+ | cnt = rdd.count()
+ | if cnt == 10:
+ | result = "success"
+ | status.write(result)
+ | status.close()
+ | sc.stop()
+ """.stripMargin
+
private var yarnCluster: MiniYARNCluster = _
private var tempDir: File = _
private var fakeSparkJar: File = _
@@ -98,6 +121,9 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
}
fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
+ val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
+ sys.props += ("spark.yarn.appMasterEnv.SPARK_HOME" -> sparkHome)
+ sys.props += ("spark.executorEnv.SPARK_HOME" -> sparkHome)
sys.props += ("spark.yarn.jar" -> ("local:" + fakeSparkJar.getAbsolutePath()))
sys.props += ("spark.executor.instances" -> "1")
sys.props += ("spark.driver.extraClassPath" -> childClasspath)
@@ -146,6 +172,24 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
assert(Utils.exceptionString(exception).contains("Application finished with failed status"))
}
+ test("run Python application in yarn-cluster mode") {
+ val primaryPyFile = new File(tempDir, "test.py")
+ Files.write(TEST_PYFILE, primaryPyFile, Charsets.UTF_8)
+ val pyFile = new File(tempDir, "test2.py")
+ Files.write(TEST_PYFILE, pyFile, Charsets.UTF_8)
+ var result = File.createTempFile("result", null, tempDir)
+
+ val args = Array("--class", "org.apache.spark.deploy.PythonRunner",
+ "--primary-py-file", primaryPyFile.getAbsolutePath(),
+ "--py-files", pyFile.getAbsolutePath(),
+ "--arg", "yarn-cluster",
+ "--arg", result.getAbsolutePath(),
+ "--name", "python test in yarn-cluster mode",
+ "--num-executors", "1")
+ Client.main(args)
+ checkResult(result)
+ }
+
/**
* This is a workaround for an issue with yarn-cluster mode: the Client class will not provide
* any sort of error when the job process finishes successfully, but the job itself fails. So