aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorlianhuiwang <lianhuiwang09@gmail.com>2015-02-02 12:32:28 -0800
committerAndrew Or <andrew@databricks.com>2015-02-02 12:32:28 -0800
commitf5e63751f0ed50ceafdc2ec5173b161a5155b646 (patch)
tree75a199b5fa514ecd48ef634666e4d32d2036ef1b /core
parentb2047b55c5fc85de6b63276d8ab9610d2496e08b (diff)
downloadspark-f5e63751f0ed50ceafdc2ec5173b161a5155b646.tar.gz
spark-f5e63751f0ed50ceafdc2ec5173b161a5155b646.tar.bz2
spark-f5e63751f0ed50ceafdc2ec5173b161a5155b646.zip
[SPARK-5173]support python application running on yarn cluster mode
now when we run python application on yarn cluster mode through spark-submit, spark-submit does not support python application on yarn cluster mode. so i modify code of submit and yarn's AM in order to support it. through specifying .py file or primaryResource file via spark-submit, we can make pyspark run in yarn-cluster mode. example:spark-submit --master yarn-master --num-executors 1 --driver-memory 1g --executor-memory 1g xx.py --primaryResource yy.conf this config is same as pyspark on yarn-client mode. firstly,we put local path of .py or primaryResource to yarn's dist.files.that can be distributed on slave nodes.and then in spark-submit we transfer --py-files and --primaryResource to yarn.Client and use "org.apache.spark.deploy.PythonRunner" to user class that can run .py files on ApplicationMaster. in yarn.Client we transfer --py-files and --primaryResource to ApplicationMaster. in ApplicationMaster, user's class is org.apache.spark.deploy.PythonRunner, and user's args is primaryResource and -py-files. so that can make pyspark run on ApplicationMaster. JoshRosen tgravescs sryza Author: lianhuiwang <lianhuiwang09@gmail.com> Author: Wang Lianhui <lianhuiwang09@gmail.com> Closes #3976 from lianhuiwang/SPARK-5173 and squashes the following commits: 28a8a58 [lianhuiwang] fix variable name 67f8cee [lianhuiwang] update with andrewor's comments 0319ae3 [lianhuiwang] address with sryza's comments 2385ef6 [lianhuiwang] address with sryza's comments 03640ab [lianhuiwang] add sparkHome to env 47d2fc3 [lianhuiwang] fix test 2adc8f5 [lianhuiwang] add spark.test.home d60bc60 [lianhuiwang] fix test 5b30064 [lianhuiwang] add test 097a5ec [lianhuiwang] fix line length exceeds 100 905a106 [lianhuiwang] update with sryza and andrewor 's comments f1f55b6 [lianhuiwang] when yarn-cluster, all python files can be non-local 172eec1 [Wang Lianhui] fix a min submit's bug 9c941bc [lianhuiwang] support python application running on yarn cluster mode
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala49
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala12
3 files changed, 43 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
index 039c8719e2..53e18c4bce 100644
--- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -26,7 +26,7 @@ import org.apache.spark.api.python.PythonUtils
import org.apache.spark.util.{RedirectThread, Utils}
/**
- * A main class used by spark-submit to launch Python applications. It executes python as a
+ * A main class used to launch Python applications. It executes python as a
* subprocess and then has it connect back to the JVM to access system properties, etc.
*/
object PythonRunner {
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index c240bcd705..02021be9f9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -23,6 +23,8 @@ import java.net.URL
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
+import org.apache.hadoop.fs.Path
+
import org.apache.spark.executor.ExecutorURLClassLoader
import org.apache.spark.util.Utils
@@ -134,12 +136,27 @@ object SparkSubmit {
}
}
+ val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
+
+ // Require all python files to be local, so we can add them to the PYTHONPATH
+ // In YARN cluster mode, python files are distributed as regular files, which can be non-local
+ if (args.isPython && !isYarnCluster) {
+ if (Utils.nonLocalPaths(args.primaryResource).nonEmpty) {
+ printErrorAndExit(s"Only local python files are supported: $args.primaryResource")
+ }
+ val nonLocalPyFiles = Utils.nonLocalPaths(args.pyFiles).mkString(",")
+ if (nonLocalPyFiles.nonEmpty) {
+ printErrorAndExit(s"Only local additional python files are supported: $nonLocalPyFiles")
+ }
+ }
+
// The following modes are not supported or applicable
(clusterManager, deployMode) match {
case (MESOS, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.")
- case (_, CLUSTER) if args.isPython =>
- printErrorAndExit("Cluster deploy mode is currently not supported for python applications.")
+ case (STANDALONE, CLUSTER) if args.isPython =>
+ printErrorAndExit("Cluster deploy mode is currently not supported for python " +
+ "applications on standalone clusters.")
case (_, CLUSTER) if isShell(args.primaryResource) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
case (_, CLUSTER) if isSqlShell(args.mainClass) =>
@@ -150,7 +167,7 @@ object SparkSubmit {
}
// If we're running a python app, set the main class to our specific python runner
- if (args.isPython) {
+ if (args.isPython && deployMode == CLIENT) {
if (args.primaryResource == PYSPARK_SHELL) {
args.mainClass = "py4j.GatewayServer"
args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0")
@@ -167,6 +184,13 @@ object SparkSubmit {
}
}
+ // In yarn-cluster mode for a python app, add primary resource and pyFiles to files
+ // that can be distributed with the job
+ if (args.isPython && isYarnCluster) {
+ args.files = mergeFileLists(args.files, args.primaryResource)
+ args.files = mergeFileLists(args.files, args.pyFiles)
+ }
+
// Special flag to avoid deprecation warnings at the client
sysProps("SPARK_SUBMIT") = "true"
@@ -245,7 +269,6 @@ object SparkSubmit {
// Add the application jar automatically so the user doesn't have to call sc.addJar
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
// For python files, the primary resource is already distributed as a regular file
- val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
if (!isYarnCluster && !args.isPython) {
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
if (isUserJar(args.primaryResource)) {
@@ -270,10 +293,22 @@ object SparkSubmit {
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
- if (args.primaryResource != SPARK_INTERNAL) {
- childArgs += ("--jar", args.primaryResource)
+ if (args.isPython) {
+ val mainPyFile = new Path(args.primaryResource).getName
+ childArgs += ("--primary-py-file", mainPyFile)
+ if (args.pyFiles != null) {
+ // These files will be distributed to each machine's working directory, so strip the
+ // path prefix
+ val pyFilesNames = args.pyFiles.split(",").map(p => (new Path(p)).getName).mkString(",")
+ childArgs += ("--py-files", pyFilesNames)
+ }
+ childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
+ } else {
+ if (args.primaryResource != SPARK_INTERNAL) {
+ childArgs += ("--jar", args.primaryResource)
+ }
+ childArgs += ("--class", args.mainClass)
}
- childArgs += ("--class", args.mainClass)
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 81ec08cb6d..73e921fd83 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -179,18 +179,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script")
}
- // Require all python files to be local, so we can add them to the PYTHONPATH
- if (isPython) {
- if (Utils.nonLocalPaths(primaryResource).nonEmpty) {
- SparkSubmit.printErrorAndExit(s"Only local python files are supported: $primaryResource")
- }
- val nonLocalPyFiles = Utils.nonLocalPaths(pyFiles).mkString(",")
- if (nonLocalPyFiles.nonEmpty) {
- SparkSubmit.printErrorAndExit(
- s"Only local additional python files are supported: $nonLocalPyFiles")
- }
- }
-
if (master.startsWith("yarn")) {
val hasHadoopEnv = env.contains("HADOOP_CONF_DIR") || env.contains("YARN_CONF_DIR")
if (!hasHadoopEnv && !Utils.isTesting) {