aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala49
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala12
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala14
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala13
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala19
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala20
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala44
8 files changed, 141 insertions, 32 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
index 039c8719e2..53e18c4bce 100644
--- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -26,7 +26,7 @@ import org.apache.spark.api.python.PythonUtils
import org.apache.spark.util.{RedirectThread, Utils}
/**
- * A main class used by spark-submit to launch Python applications. It executes python as a
+ * A main class used to launch Python applications. It executes python as a
* subprocess and then has it connect back to the JVM to access system properties, etc.
*/
object PythonRunner {
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index c240bcd705..02021be9f9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -23,6 +23,8 @@ import java.net.URL
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
+import org.apache.hadoop.fs.Path
+
import org.apache.spark.executor.ExecutorURLClassLoader
import org.apache.spark.util.Utils
@@ -134,12 +136,27 @@ object SparkSubmit {
}
}
+ val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
+
+ // Require all python files to be local, so we can add them to the PYTHONPATH
+ // In YARN cluster mode, python files are distributed as regular files, which can be non-local
+ if (args.isPython && !isYarnCluster) {
+ if (Utils.nonLocalPaths(args.primaryResource).nonEmpty) {
+ printErrorAndExit(s"Only local python files are supported: $args.primaryResource")
+ }
+ val nonLocalPyFiles = Utils.nonLocalPaths(args.pyFiles).mkString(",")
+ if (nonLocalPyFiles.nonEmpty) {
+ printErrorAndExit(s"Only local additional python files are supported: $nonLocalPyFiles")
+ }
+ }
+
// The following modes are not supported or applicable
(clusterManager, deployMode) match {
case (MESOS, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.")
- case (_, CLUSTER) if args.isPython =>
- printErrorAndExit("Cluster deploy mode is currently not supported for python applications.")
+ case (STANDALONE, CLUSTER) if args.isPython =>
+ printErrorAndExit("Cluster deploy mode is currently not supported for python " +
+ "applications on standalone clusters.")
case (_, CLUSTER) if isShell(args.primaryResource) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
case (_, CLUSTER) if isSqlShell(args.mainClass) =>
@@ -150,7 +167,7 @@ object SparkSubmit {
}
// If we're running a python app, set the main class to our specific python runner
- if (args.isPython) {
+ if (args.isPython && deployMode == CLIENT) {
if (args.primaryResource == PYSPARK_SHELL) {
args.mainClass = "py4j.GatewayServer"
args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0")
@@ -167,6 +184,13 @@ object SparkSubmit {
}
}
+ // In yarn-cluster mode for a python app, add primary resource and pyFiles to files
+ // that can be distributed with the job
+ if (args.isPython && isYarnCluster) {
+ args.files = mergeFileLists(args.files, args.primaryResource)
+ args.files = mergeFileLists(args.files, args.pyFiles)
+ }
+
// Special flag to avoid deprecation warnings at the client
sysProps("SPARK_SUBMIT") = "true"
@@ -245,7 +269,6 @@ object SparkSubmit {
// Add the application jar automatically so the user doesn't have to call sc.addJar
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
// For python files, the primary resource is already distributed as a regular file
- val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
if (!isYarnCluster && !args.isPython) {
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
if (isUserJar(args.primaryResource)) {
@@ -270,10 +293,22 @@ object SparkSubmit {
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
- if (args.primaryResource != SPARK_INTERNAL) {
- childArgs += ("--jar", args.primaryResource)
+ if (args.isPython) {
+ val mainPyFile = new Path(args.primaryResource).getName
+ childArgs += ("--primary-py-file", mainPyFile)
+ if (args.pyFiles != null) {
+ // These files will be distributed to each machine's working directory, so strip the
+ // path prefix
+ val pyFilesNames = args.pyFiles.split(",").map(p => (new Path(p)).getName).mkString(",")
+ childArgs += ("--py-files", pyFilesNames)
+ }
+ childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
+ } else {
+ if (args.primaryResource != SPARK_INTERNAL) {
+ childArgs += ("--jar", args.primaryResource)
+ }
+ childArgs += ("--class", args.mainClass)
}
- childArgs += ("--class", args.mainClass)
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 81ec08cb6d..73e921fd83 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -179,18 +179,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script")
}
- // Require all python files to be local, so we can add them to the PYTHONPATH
- if (isPython) {
- if (Utils.nonLocalPaths(primaryResource).nonEmpty) {
- SparkSubmit.printErrorAndExit(s"Only local python files are supported: $primaryResource")
- }
- val nonLocalPyFiles = Utils.nonLocalPaths(pyFiles).mkString(",")
- if (nonLocalPyFiles.nonEmpty) {
- SparkSubmit.printErrorAndExit(
- s"Only local additional python files are supported: $nonLocalPyFiles")
- }
- }
-
if (master.startsWith("yarn")) {
val hasHadoopEnv = env.contains("HADOOP_CONF_DIR") || env.contains("YARN_CONF_DIR")
if (!hasHadoopEnv && !Utils.isTesting) {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index d3e327b249..eb328b2b8a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.SparkException
-import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil}
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.scheduler.cluster.YarnSchedulerBackend
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -135,7 +135,7 @@ private[spark] class ApplicationMaster(
.get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
// Call this to force generation of secret so it gets populated into the
- // Hadoop UGI. This has to happen before the startUserClass which does a
+ // Hadoop UGI. This has to happen before the startUserApplication which does a
// doAs in order for the credentials to be passed on to the executor containers.
val securityMgr = new SecurityManager(sparkConf)
@@ -254,7 +254,7 @@ private[spark] class ApplicationMaster(
private def runDriver(securityMgr: SecurityManager): Unit = {
addAmIpFilter()
- userClassThread = startUserClass()
+ userClassThread = startUserApplication()
// This a bit hacky, but we need to wait until the spark.driver.port property has
// been set by the Thread executing the user class.
@@ -448,9 +448,13 @@ private[spark] class ApplicationMaster(
*
* Returns the user thread that was started.
*/
- private def startUserClass(): Thread = {
- logInfo("Starting the user JAR in a separate Thread")
+ private def startUserApplication(): Thread = {
+ logInfo("Starting the user application in a separate Thread")
System.setProperty("spark.executor.instances", args.numExecutors.toString)
+ if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
+ System.setProperty("spark.submit.pyFiles",
+ PythonRunner.formatPaths(args.pyFiles).mkString(","))
+ }
val mainMethod = Class.forName(args.userClass, false,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index d76a63276d..e1a992af3a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -24,6 +24,8 @@ import collection.mutable.ArrayBuffer
class ApplicationMasterArguments(val args: Array[String]) {
var userJar: String = null
var userClass: String = null
+ var primaryPyFile: String = null
+ var pyFiles: String = null
var userArgs: Seq[String] = Seq[String]()
var executorMemory = 1024
var executorCores = 1
@@ -48,6 +50,14 @@ class ApplicationMasterArguments(val args: Array[String]) {
userClass = value
args = tail
+ case ("--primary-py-file") :: value :: tail =>
+ primaryPyFile = value
+ args = tail
+
+ case ("--py-files") :: value :: tail =>
+ pyFiles = value
+ args = tail
+
case ("--args" | "--arg") :: value :: tail =>
userArgsBuffer += value
args = tail
@@ -81,6 +91,9 @@ class ApplicationMasterArguments(val args: Array[String]) {
|Options:
| --jar JAR_PATH Path to your application's JAR file
| --class CLASS_NAME Name of your application's main class
+ | --primary-py-file A main Python file
+ | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to
+ | place on the PYTHONPATH for Python apps.
| --args ARGS Arguments to be passed to your application's main class.
| Multiple invocations are possible, each will be passed in order.
| --num-executors NUM Number of executors to start (Default: 2)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 1a18e6509e..91e8574e94 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -21,7 +21,7 @@ import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
import java.nio.ByteBuffer
import scala.collection.JavaConversions._
-import scala.collection.mutable.{HashMap, ListBuffer, Map}
+import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer, Map}
import scala.util.{Try, Success, Failure}
import com.google.common.base.Objects
@@ -477,17 +477,32 @@ private[spark] class Client(
} else {
Nil
}
+ val primaryPyFile =
+ if (args.primaryPyFile != null) {
+ Seq("--primary-py-file", args.primaryPyFile)
+ } else {
+ Nil
+ }
+ val pyFiles =
+ if (args.pyFiles != null) {
+ Seq("--py-files", args.pyFiles)
+ } else {
+ Nil
+ }
val amClass =
if (isClusterMode) {
Class.forName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
Class.forName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
+ if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
+ args.userArgs = ArrayBuffer(args.primaryPyFile, args.pyFiles) ++ args.userArgs
+ }
val userArgs = args.userArgs.flatMap { arg =>
Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
}
val amArgs =
- Seq(amClass) ++ userClass ++ userJar ++ userArgs ++
+ Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ pyFiles ++ userArgs ++
Seq(
"--executor-memory", args.executorMemory.toString + "m",
"--executor-cores", args.executorCores.toString,
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 5eb2023802..3bc7eb1abf 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -30,7 +30,9 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
var archives: String = null
var userJar: String = null
var userClass: String = null
- var userArgs: Seq[String] = Seq[String]()
+ var pyFiles: String = null
+ var primaryPyFile: String = null
+ var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
var executorMemory = 1024 // MB
var executorCores = 1
var numExecutors = DEFAULT_NUMBER_EXECUTORS
@@ -132,7 +134,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
}
private def parseArgs(inputArgs: List[String]): Unit = {
- val userArgsBuffer = new ArrayBuffer[String]()
var args = inputArgs
while (!args.isEmpty) {
@@ -145,11 +146,15 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
userClass = value
args = tail
+ case ("--primary-py-file") :: value :: tail =>
+ primaryPyFile = value
+ args = tail
+
case ("--args" | "--arg") :: value :: tail =>
if (args(0) == "--args") {
println("--args is deprecated. Use --arg instead.")
}
- userArgsBuffer += value
+ userArgs += value
args = tail
case ("--master-class" | "--am-class") :: value :: tail =>
@@ -205,6 +210,10 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
addJars = value
args = tail
+ case ("--py-files") :: value :: tail =>
+ pyFiles = value
+ args = tail
+
case ("--files") :: value :: tail =>
files = value
args = tail
@@ -219,8 +228,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
throw new IllegalArgumentException(getUsageMessage(args))
}
}
-
- userArgs = userArgsBuffer.readOnly
}
private def getUsageMessage(unknownParam: List[String] = null): String = {
@@ -232,6 +239,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
| --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster
| mode)
| --class CLASS_NAME Name of your application's main class (required)
+ | --primary-py-file A main Python file
| --arg ARG Argument to be passed to your application's main class.
| Multiple invocations are possible, each will be passed in order.
| --num-executors NUM Number of executors to start (Default: 2)
@@ -244,6 +252,8 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
| 'default')
| --addJars jars Comma separated list of local jars that want SparkContext.addJar
| to work with.
+ | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to
+ | place on the PYTHONPATH for Python apps.
| --files files Comma separated list of files to be distributed with the job.
| --archives archives Comma separated list of archives to be distributed with the job.
""".stripMargin
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index d79b85e867..7165918e1b 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -45,6 +45,29 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
|log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
""".stripMargin
+ private val TEST_PYFILE = """
+ |import sys
+ |from operator import add
+ |
+ |from pyspark import SparkConf , SparkContext
+ |if __name__ == "__main__":
+ | if len(sys.argv) != 3:
+ | print >> sys.stderr, "Usage: test.py [master] [result file]"
+ | exit(-1)
+ | conf = SparkConf()
+ | conf.setMaster(sys.argv[1]).setAppName("python test in yarn cluster mode")
+ | sc = SparkContext(conf=conf)
+ | status = open(sys.argv[2],'w')
+ | result = "failure"
+ | rdd = sc.parallelize(range(10))
+ | cnt = rdd.count()
+ | if cnt == 10:
+ | result = "success"
+ | status.write(result)
+ | status.close()
+ | sc.stop()
+ """.stripMargin
+
private var yarnCluster: MiniYARNCluster = _
private var tempDir: File = _
private var fakeSparkJar: File = _
@@ -98,6 +121,9 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
}
fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
+ val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
+ sys.props += ("spark.yarn.appMasterEnv.SPARK_HOME" -> sparkHome)
+ sys.props += ("spark.executorEnv.SPARK_HOME" -> sparkHome)
sys.props += ("spark.yarn.jar" -> ("local:" + fakeSparkJar.getAbsolutePath()))
sys.props += ("spark.executor.instances" -> "1")
sys.props += ("spark.driver.extraClassPath" -> childClasspath)
@@ -146,6 +172,24 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
assert(Utils.exceptionString(exception).contains("Application finished with failed status"))
}
+ test("run Python application in yarn-cluster mode") {
+ val primaryPyFile = new File(tempDir, "test.py")
+ Files.write(TEST_PYFILE, primaryPyFile, Charsets.UTF_8)
+ val pyFile = new File(tempDir, "test2.py")
+ Files.write(TEST_PYFILE, pyFile, Charsets.UTF_8)
+ var result = File.createTempFile("result", null, tempDir)
+
+ val args = Array("--class", "org.apache.spark.deploy.PythonRunner",
+ "--primary-py-file", primaryPyFile.getAbsolutePath(),
+ "--py-files", pyFile.getAbsolutePath(),
+ "--arg", "yarn-cluster",
+ "--arg", result.getAbsolutePath(),
+ "--name", "python test in yarn-cluster mode",
+ "--num-executors", "1")
+ Client.main(args)
+ checkResult(result)
+ }
+
/**
* This is a workaround for an issue with yarn-cluster mode: the Client class will not provide
* any sort of error when the job process finishes successfully, but the job itself fails. So