aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-06-10 13:17:29 -0700
committerAndrew Or <andrew@databricks.com>2015-06-10 13:17:29 -0700
commit38112905bc3b33f2ae75274afba1c30e116f6e46 (patch)
treea207e54fe8f1efe3d6c9c8455e3bc55453b47564 /yarn
parent8f7308f9c49805b9486aaae5f60e4481e8ba24e8 (diff)
downloadspark-38112905bc3b33f2ae75274afba1c30e116f6e46.tar.gz
spark-38112905bc3b33f2ae75274afba1c30e116f6e46.tar.bz2
spark-38112905bc3b33f2ae75274afba1c30e116f6e46.zip
[SPARK-5479] [YARN] Handle --py-files correctly in YARN.
The bug description is a little misleading: the actual issue is that .py files are not handled correctly when distributed by YARN. They're added to "spark.submit.pyFiles", which, when processed by context.py, explicitly whitelists certain extensions (see PACKAGE_EXTENSIONS), and that does not include .py files. On top of that, archives were not handled at all! They made it to the driver's python path, but never made it to executors, since the mechanism used to propagate their location (spark.submit.pyFiles) only works on the driver side. So, instead, ignore "spark.submit.pyFiles" and just build PYTHONPATH correctly for both driver and executors. Individual .py files are placed in a subdirectory of the container's local dir in the cluster, which is then added to the python path. Archives are added directly. The change, as a side effect, ends up solving the symptom described in the bug. The issue was not that the files were not being distributed, but that they were never made visible to the python application running under Spark. Also included is a proper unit test for running python on YARN, which broke in several different ways with the previous code. A short walk around of the changes: - SparkSubmit does not try to be smart about how YARN handles python files anymore. It just passes down the configs to the YARN client code. - The YARN client distributes python files and archives differently, placing the files in a subdirectory. - The YARN client now sets PYTHONPATH for the processes it launches; to properly handle different locations, it uses YARN's support for embedding env variables, so to avoid YARN expanding those at the wrong time, SparkConf is now propagated to the AM using a conf file instead of command line options. - Because the Client initialization code is a maze of implicit dependencies, some code needed to be moved around to make sure all needed state was available when the code ran. - The pyspark tests in YarnClusterSuite now actually distribute and try to use both a python file and an archive containing a different python module. Also added a yarn-client tests for completeness. - I cleaned up some of the code around distributing files to YARN, to avoid adding more copied & pasted code to handle the new files being distributed. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #6360 from vanzin/SPARK-5479 and squashes the following commits: bcaf7e6 [Marcelo Vanzin] Feedback. c47501f [Marcelo Vanzin] Fix yarn-client mode. 46b1d0c [Marcelo Vanzin] Merge branch 'master' into SPARK-5479 c743778 [Marcelo Vanzin] Only pyspark cares about python archives. c8e5a82 [Marcelo Vanzin] Actually run pyspark in client mode. 705571d [Marcelo Vanzin] Move some code to the YARN module. 1dd4d0c [Marcelo Vanzin] Review feedback. 71ee736 [Marcelo Vanzin] Merge branch 'master' into SPARK-5479 220358b [Marcelo Vanzin] Scalastyle. cdbb990 [Marcelo Vanzin] Merge branch 'master' into SPARK-5479 7fe3cd4 [Marcelo Vanzin] No need to distribute primary file to executors. 09045f1 [Marcelo Vanzin] Style. 943cbf4 [Marcelo Vanzin] [SPARK-5479] [yarn] Handle --py-files correctly in YARN.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala20
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala12
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala295
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala4
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala5
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala4
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala61
7 files changed, 253 insertions, 148 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 002d7b6eaf..83dafa4a12 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.rpc._
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.SparkException
-import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -46,6 +46,14 @@ private[spark] class ApplicationMaster(
client: YarnRMClient)
extends Logging {
+ // Load the properties file with the Spark configuration and set entries as system properties,
+ // so that user code run inside the AM also has access to them.
+ if (args.propertiesFile != null) {
+ Utils.getPropertiesFromFile(args.propertiesFile).foreach { case (k, v) =>
+ sys.props(k) = v
+ }
+ }
+
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.
@@ -490,9 +498,11 @@ private[spark] class ApplicationMaster(
new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
}
+ var userArgs = args.userArgs
if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
- System.setProperty("spark.submit.pyFiles",
- PythonRunner.formatPaths(args.pyFiles).mkString(","))
+ // When running pyspark, the app is run using PythonRunner. The second argument is the list
+ // of files to add to PYTHONPATH, which Client.scala already handles, so it's empty.
+ userArgs = Seq(args.primaryPyFile, "") ++ userArgs
}
if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
// TODO(davies): add R dependencies here
@@ -503,9 +513,7 @@ private[spark] class ApplicationMaster(
val userThread = new Thread {
override def run() {
try {
- val mainArgs = new Array[String](args.userArgs.size)
- args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
- mainMethod.invoke(null, mainArgs)
+ mainMethod.invoke(null, userArgs.toArray)
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
logDebug("Done running users class")
} catch {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index ae6dc1094d..68e9f6b4db 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -26,11 +26,11 @@ class ApplicationMasterArguments(val args: Array[String]) {
var userClass: String = null
var primaryPyFile: String = null
var primaryRFile: String = null
- var pyFiles: String = null
- var userArgs: Seq[String] = Seq[String]()
+ var userArgs: Seq[String] = Nil
var executorMemory = 1024
var executorCores = 1
var numExecutors = DEFAULT_NUMBER_EXECUTORS
+ var propertiesFile: String = null
parseArgs(args.toList)
@@ -59,10 +59,6 @@ class ApplicationMasterArguments(val args: Array[String]) {
primaryRFile = value
args = tail
- case ("--py-files") :: value :: tail =>
- pyFiles = value
- args = tail
-
case ("--args" | "--arg") :: value :: tail =>
userArgsBuffer += value
args = tail
@@ -79,6 +75,10 @@ class ApplicationMasterArguments(val args: Array[String]) {
executorCores = value
args = tail
+ case ("--properties-file") :: value :: tail =>
+ propertiesFile = value
+ args = tail
+
case _ =>
printUsageAndExit(1, args)
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index f4d43214b0..ec9402afff 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -17,11 +17,12 @@
package org.apache.spark.deploy.yarn
-import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, IOException}
+import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, IOException,
+ OutputStreamWriter}
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
import java.nio.ByteBuffer
import java.security.PrivilegedExceptionAction
-import java.util.UUID
+import java.util.{Properties, UUID}
import java.util.zip.{ZipEntry, ZipOutputStream}
import scala.collection.JavaConversions._
@@ -29,6 +30,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map}
import scala.reflect.runtime.universe
import scala.util.{Try, Success, Failure}
+import com.google.common.base.Charsets.UTF_8
import com.google.common.base.Objects
import com.google.common.io.Files
@@ -247,7 +249,9 @@ private[spark] class Client(
* This is used for setting up a container launch context for our ApplicationMaster.
* Exposed for testing.
*/
- def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
+ def prepareLocalResources(
+ appStagingDir: String,
+ pySparkArchives: Seq[String]): HashMap[String, LocalResource] = {
logInfo("Preparing resources for our AM container")
// Upload Spark and the application JAR to the remote file system if necessary,
// and add them as local resources to the application master.
@@ -277,20 +281,6 @@ private[spark] class Client(
"for alternatives.")
}
- // If we passed in a keytab, make sure we copy the keytab to the staging directory on
- // HDFS, and setup the relevant environment vars, so the AM can login again.
- if (loginFromKeytab) {
- logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" +
- " via the YARN Secure Distributed Cache.")
- val localUri = new URI(args.keytab)
- val localPath = getQualifiedLocalPath(localUri, hadoopConf)
- val destinationPath = copyFileToRemote(dst, localPath, replication)
- val destFs = FileSystem.get(destinationPath.toUri(), hadoopConf)
- distCacheMgr.addResource(
- destFs, hadoopConf, destinationPath, localResources, LocalResourceType.FILE,
- sparkConf.get("spark.yarn.keytab"), statCache, appMasterOnly = true)
- }
-
def addDistributedUri(uri: URI): Boolean = {
val uriStr = uri.toString()
if (distributedUris.contains(uriStr)) {
@@ -303,6 +293,57 @@ private[spark] class Client(
}
/**
+ * Distribute a file to the cluster.
+ *
+ * If the file's path is a "local:" URI, it's actually not distributed. Other files are copied
+ * to HDFS (if not already there) and added to the application's distributed cache.
+ *
+ * @param path URI of the file to distribute.
+ * @param resType Type of resource being distributed.
+ * @param destName Name of the file in the distributed cache.
+ * @param targetDir Subdirectory where to place the file.
+ * @param appMasterOnly Whether to distribute only to the AM.
+ * @return A 2-tuple. First item is whether the file is a "local:" URI. Second item is the
+ * localized path for non-local paths, or the input `path` for local paths.
+ * The localized path will be null if the URI has already been added to the cache.
+ */
+ def distribute(
+ path: String,
+ resType: LocalResourceType = LocalResourceType.FILE,
+ destName: Option[String] = None,
+ targetDir: Option[String] = None,
+ appMasterOnly: Boolean = false): (Boolean, String) = {
+ val localURI = new URI(path.trim())
+ if (localURI.getScheme != LOCAL_SCHEME) {
+ if (addDistributedUri(localURI)) {
+ val localPath = getQualifiedLocalPath(localURI, hadoopConf)
+ val linkname = targetDir.map(_ + "/").getOrElse("") +
+ destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName())
+ val destPath = copyFileToRemote(dst, localPath, replication)
+ distCacheMgr.addResource(
+ fs, hadoopConf, destPath, localResources, resType, linkname, statCache,
+ appMasterOnly = appMasterOnly)
+ (false, linkname)
+ } else {
+ (false, null)
+ }
+ } else {
+ (true, path.trim())
+ }
+ }
+
+ // If we passed in a keytab, make sure we copy the keytab to the staging directory on
+ // HDFS, and setup the relevant environment vars, so the AM can login again.
+ if (loginFromKeytab) {
+ logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" +
+ " via the YARN Secure Distributed Cache.")
+ val (_, localizedPath) = distribute(args.keytab,
+ destName = Some(sparkConf.get("spark.yarn.keytab")),
+ appMasterOnly = true)
+ require(localizedPath != null, "Keytab file already distributed.")
+ }
+
+ /**
* Copy the given main resource to the distributed cache if the scheme is not "local".
* Otherwise, set the corresponding key in our SparkConf to handle it downstream.
* Each resource is represented by a 3-tuple of:
@@ -314,33 +355,18 @@ private[spark] class Client(
(SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR),
(APP_JAR, args.userJar, CONF_SPARK_USER_JAR),
("log4j.properties", oldLog4jConf.orNull, null)
- ).foreach { case (destName, _localPath, confKey) =>
- val localPath: String = if (_localPath != null) _localPath.trim() else ""
- if (!localPath.isEmpty()) {
- val localURI = new URI(localPath)
- if (localURI.getScheme != LOCAL_SCHEME) {
- if (addDistributedUri(localURI)) {
- val src = getQualifiedLocalPath(localURI, hadoopConf)
- val destPath = copyFileToRemote(dst, src, replication)
- val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
- distCacheMgr.addResource(destFs, hadoopConf, destPath,
- localResources, LocalResourceType.FILE, destName, statCache)
- }
- } else if (confKey != null) {
+ ).foreach { case (destName, path, confKey) =>
+ if (path != null && !path.trim().isEmpty()) {
+ val (isLocal, localizedPath) = distribute(path, destName = Some(destName))
+ if (isLocal && confKey != null) {
+ require(localizedPath != null, s"Path $path already distributed.")
// If the resource is intended for local use only, handle this downstream
// by setting the appropriate property
- sparkConf.set(confKey, localPath)
+ sparkConf.set(confKey, localizedPath)
}
}
}
- createConfArchive().foreach { file =>
- require(addDistributedUri(file.toURI()))
- val destPath = copyFileToRemote(dst, new Path(file.toURI()), replication)
- distCacheMgr.addResource(fs, hadoopConf, destPath, localResources, LocalResourceType.ARCHIVE,
- LOCALIZED_HADOOP_CONF_DIR, statCache, appMasterOnly = true)
- }
-
/**
* Do the same for any additional resources passed in through ClientArguments.
* Each resource category is represented by a 3-tuple of:
@@ -356,21 +382,10 @@ private[spark] class Client(
).foreach { case (flist, resType, addToClasspath) =>
if (flist != null && !flist.isEmpty()) {
flist.split(',').foreach { file =>
- val localURI = new URI(file.trim())
- if (localURI.getScheme != LOCAL_SCHEME) {
- if (addDistributedUri(localURI)) {
- val localPath = new Path(localURI)
- val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
- val destPath = copyFileToRemote(dst, localPath, replication)
- distCacheMgr.addResource(
- fs, hadoopConf, destPath, localResources, resType, linkname, statCache)
- if (addToClasspath) {
- cachedSecondaryJarLinks += linkname
- }
- }
- } else if (addToClasspath) {
- // Resource is intended for local use only and should be added to the class path
- cachedSecondaryJarLinks += file.trim()
+ val (_, localizedPath) = distribute(file, resType = resType)
+ require(localizedPath != null)
+ if (addToClasspath) {
+ cachedSecondaryJarLinks += localizedPath
}
}
}
@@ -379,11 +394,31 @@ private[spark] class Client(
sparkConf.set(CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
}
+ if (isClusterMode && args.primaryPyFile != null) {
+ distribute(args.primaryPyFile, appMasterOnly = true)
+ }
+
+ pySparkArchives.foreach { f => distribute(f) }
+
+ // The python files list needs to be treated especially. All files that are not an
+ // archive need to be placed in a subdirectory that will be added to PYTHONPATH.
+ args.pyFiles.foreach { f =>
+ val targetDir = if (f.endsWith(".py")) Some(LOCALIZED_PYTHON_DIR) else None
+ distribute(f, targetDir = targetDir)
+ }
+
+ // Distribute an archive with Hadoop and Spark configuration for the AM.
+ val (_, confLocalizedPath) = distribute(createConfArchive().getAbsolutePath(),
+ resType = LocalResourceType.ARCHIVE,
+ destName = Some(LOCALIZED_CONF_DIR),
+ appMasterOnly = true)
+ require(confLocalizedPath != null)
+
localResources
}
/**
- * Create an archive with the Hadoop config files for distribution.
+ * Create an archive with the config files for distribution.
*
* These are only used by the AM, since executors will use the configuration object broadcast by
* the driver. The files are zipped and added to the job as an archive, so that YARN will explode
@@ -395,8 +430,11 @@ private[spark] class Client(
*
* Currently this makes a shallow copy of the conf directory. If there are cases where a
* Hadoop config directory contains subdirectories, this code will have to be fixed.
+ *
+ * The archive also contains some Spark configuration. Namely, it saves the contents of
+ * SparkConf in a file to be loaded by the AM process.
*/
- private def createConfArchive(): Option[File] = {
+ private def createConfArchive(): File = {
val hadoopConfFiles = new HashMap[String, File]()
Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey =>
sys.env.get(envKey).foreach { path =>
@@ -411,28 +449,32 @@ private[spark] class Client(
}
}
- if (!hadoopConfFiles.isEmpty) {
- val hadoopConfArchive = File.createTempFile(LOCALIZED_HADOOP_CONF_DIR, ".zip",
- new File(Utils.getLocalDir(sparkConf)))
+ val confArchive = File.createTempFile(LOCALIZED_CONF_DIR, ".zip",
+ new File(Utils.getLocalDir(sparkConf)))
+ val confStream = new ZipOutputStream(new FileOutputStream(confArchive))
- val hadoopConfStream = new ZipOutputStream(new FileOutputStream(hadoopConfArchive))
- try {
- hadoopConfStream.setLevel(0)
- hadoopConfFiles.foreach { case (name, file) =>
- if (file.canRead()) {
- hadoopConfStream.putNextEntry(new ZipEntry(name))
- Files.copy(file, hadoopConfStream)
- hadoopConfStream.closeEntry()
- }
+ try {
+ confStream.setLevel(0)
+ hadoopConfFiles.foreach { case (name, file) =>
+ if (file.canRead()) {
+ confStream.putNextEntry(new ZipEntry(name))
+ Files.copy(file, confStream)
+ confStream.closeEntry()
}
- } finally {
- hadoopConfStream.close()
}
- Some(hadoopConfArchive)
- } else {
- None
+ // Save Spark configuration to a file in the archive.
+ val props = new Properties()
+ sparkConf.getAll.foreach { case (k, v) => props.setProperty(k, v) }
+ confStream.putNextEntry(new ZipEntry(SPARK_CONF_FILE))
+ val writer = new OutputStreamWriter(confStream, UTF_8)
+ props.store(writer, "Spark configuration.")
+ writer.flush()
+ confStream.closeEntry()
+ } finally {
+ confStream.close()
}
+ confArchive
}
/**
@@ -460,7 +502,9 @@ private[spark] class Client(
/**
* Set up the environment for launching our ApplicationMaster container.
*/
- private def setupLaunchEnv(stagingDir: String): HashMap[String, String] = {
+ private def setupLaunchEnv(
+ stagingDir: String,
+ pySparkArchives: Seq[String]): HashMap[String, String] = {
logInfo("Setting up the launch environment for our AM container")
val env = new HashMap[String, String]()
val extraCp = sparkConf.getOption("spark.driver.extraClassPath")
@@ -478,9 +522,6 @@ private[spark] class Client(
val renewalInterval = getTokenRenewalInterval(stagingDirPath)
sparkConf.set("spark.yarn.token.renewal.interval", renewalInterval.toString)
}
- // Set the environment variables to be passed on to the executors.
- distCacheMgr.setDistFilesEnv(env)
- distCacheMgr.setDistArchivesEnv(env)
// Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.*
val amEnvPrefix = "spark.yarn.appMasterEnv."
@@ -497,15 +538,32 @@ private[spark] class Client(
env("SPARK_YARN_USER_ENV") = userEnvs
}
- // if spark.submit.pyArchives is in sparkConf, append pyArchives to PYTHONPATH
- // that can be passed on to the ApplicationMaster and the executors.
- if (sparkConf.contains("spark.submit.pyArchives")) {
- var pythonPath = sparkConf.get("spark.submit.pyArchives")
- if (env.contains("PYTHONPATH")) {
- pythonPath = Seq(env.get("PYTHONPATH"), pythonPath).mkString(File.pathSeparator)
+ // If pyFiles contains any .py files, we need to add LOCALIZED_PYTHON_DIR to the PYTHONPATH
+ // of the container processes too. Add all non-.py files directly to PYTHONPATH.
+ //
+ // NOTE: the code currently does not handle .py files defined with a "local:" scheme.
+ val pythonPath = new ListBuffer[String]()
+ val (pyFiles, pyArchives) = args.pyFiles.partition(_.endsWith(".py"))
+ if (pyFiles.nonEmpty) {
+ pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
+ LOCALIZED_PYTHON_DIR)
+ }
+ (pySparkArchives ++ pyArchives).foreach { path =>
+ val uri = new URI(path)
+ if (uri.getScheme != LOCAL_SCHEME) {
+ pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
+ new Path(path).getName())
+ } else {
+ pythonPath += uri.getPath()
}
- env("PYTHONPATH") = pythonPath
- sparkConf.setExecutorEnv("PYTHONPATH", pythonPath)
+ }
+
+ // Finally, update the Spark config to propagate PYTHONPATH to the AM and executors.
+ if (pythonPath.nonEmpty) {
+ val pythonPathStr = (sys.env.get("PYTHONPATH") ++ pythonPath)
+ .mkString(YarnSparkHadoopUtil.getClassPathSeparator)
+ env("PYTHONPATH") = pythonPathStr
+ sparkConf.setExecutorEnv("PYTHONPATH", pythonPathStr)
}
// In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to
@@ -555,8 +613,19 @@ private[spark] class Client(
logInfo("Setting up container launch context for our AM")
val appId = newAppResponse.getApplicationId
val appStagingDir = getAppStagingDir(appId)
- val localResources = prepareLocalResources(appStagingDir)
- val launchEnv = setupLaunchEnv(appStagingDir)
+ val pySparkArchives =
+ if (sys.props.getOrElse("spark.yarn.isPython", "false").toBoolean) {
+ findPySparkArchives()
+ } else {
+ Nil
+ }
+ val launchEnv = setupLaunchEnv(appStagingDir, pySparkArchives)
+ val localResources = prepareLocalResources(appStagingDir, pySparkArchives)
+
+ // Set the environment variables to be passed on to the executors.
+ distCacheMgr.setDistFilesEnv(launchEnv)
+ distCacheMgr.setDistArchivesEnv(launchEnv)
+
val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
amContainer.setLocalResources(localResources)
amContainer.setEnvironment(launchEnv)
@@ -596,13 +665,6 @@ private[spark] class Client(
javaOpts += "-XX:CMSIncrementalDutyCycle=10"
}
- // Forward the Spark configuration to the application master / executors.
- // TODO: it might be nicer to pass these as an internal environment variable rather than
- // as Java options, due to complications with string parsing of nested quotes.
- for ((k, v) <- sparkConf.getAll) {
- javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v")
- }
-
// Include driver-specific java options if we are launching a driver
if (isClusterMode) {
val driverOpts = sparkConf.getOption("spark.driver.extraJavaOptions")
@@ -655,14 +717,8 @@ private[spark] class Client(
Nil
}
val primaryPyFile =
- if (args.primaryPyFile != null) {
- Seq("--primary-py-file", args.primaryPyFile)
- } else {
- Nil
- }
- val pyFiles =
- if (args.pyFiles != null) {
- Seq("--py-files", args.pyFiles)
+ if (isClusterMode && args.primaryPyFile != null) {
+ Seq("--primary-py-file", new Path(args.primaryPyFile).getName())
} else {
Nil
}
@@ -678,9 +734,6 @@ private[spark] class Client(
} else {
Class.forName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
- if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
- args.userArgs = ArrayBuffer(args.primaryPyFile, args.pyFiles) ++ args.userArgs
- }
if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
args.userArgs = ArrayBuffer(args.primaryRFile) ++ args.userArgs
}
@@ -688,11 +741,13 @@ private[spark] class Client(
Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
}
val amArgs =
- Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ pyFiles ++ primaryRFile ++
+ Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++
userArgs ++ Seq(
"--executor-memory", args.executorMemory.toString + "m",
"--executor-cores", args.executorCores.toString,
- "--num-executors ", args.numExecutors.toString)
+ "--num-executors ", args.numExecutors.toString,
+ "--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
+ LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
// Command for the ApplicationMaster
val commands = prefixEnv ++ Seq(
@@ -857,6 +912,22 @@ private[spark] class Client(
}
}
}
+
+ private def findPySparkArchives(): Seq[String] = {
+ sys.env.get("PYSPARK_ARCHIVES_PATH")
+ .map(_.split(",").toSeq)
+ .getOrElse {
+ val pyLibPath = Seq(sys.env("SPARK_HOME"), "python", "lib").mkString(File.separator)
+ val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
+ require(pyArchivesFile.exists(),
+ "pyspark.zip not found; cannot run pyspark application in YARN mode.")
+ val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip")
+ require(py4jFile.exists(),
+ "py4j-0.8.2.1-src.zip not found; cannot run pyspark application in YARN mode.")
+ Seq(pyArchivesFile.getAbsolutePath(), py4jFile.getAbsolutePath())
+ }
+ }
+
}
object Client extends Logging {
@@ -907,8 +978,14 @@ object Client extends Logging {
// Distribution-defined classpath to add to processes
val ENV_DIST_CLASSPATH = "SPARK_DIST_CLASSPATH"
- // Subdirectory where the user's hadoop config files will be placed.
- val LOCALIZED_HADOOP_CONF_DIR = "__hadoop_conf__"
+ // Subdirectory where the user's Spark and Hadoop config files will be placed.
+ val LOCALIZED_CONF_DIR = "__spark_conf__"
+
+ // Name of the file in the conf archive containing Spark configuration.
+ val SPARK_CONF_FILE = "__spark_conf__.properties"
+
+ // Subdirectory where the user's python files (not archives) will be placed.
+ val LOCALIZED_PYTHON_DIR = "__pyfiles__"
/**
* Find the user-defined Spark jar if configured, or return the jar containing this
@@ -1033,7 +1110,7 @@ object Client extends Logging {
if (isAM) {
addClasspathEntry(
YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR +
- LOCALIZED_HADOOP_CONF_DIR, env)
+ LOCALIZED_CONF_DIR, env)
}
if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 9c7b1b3988..35e990602a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -30,7 +30,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
var archives: String = null
var userJar: String = null
var userClass: String = null
- var pyFiles: String = null
+ var pyFiles: Seq[String] = Nil
var primaryPyFile: String = null
var primaryRFile: String = null
var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
@@ -228,7 +228,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
args = tail
case ("--py-files") :: value :: tail =>
- pyFiles = value
+ pyFiles = value.split(",")
args = tail
case ("--files") :: value :: tail =>
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 99c05329b4..1c8d7ec576 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -76,7 +76,8 @@ private[spark] class YarnClientSchedulerBackend(
("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"),
("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"),
- ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue")
+ ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
+ ("--py-files", null, "spark.submit.pyFiles")
)
// Warn against the following deprecated environment variables: env var -> suggestion
val deprecatedEnvVars = Map(
@@ -86,7 +87,7 @@ private[spark] class YarnClientSchedulerBackend(
optionTuples.foreach { case (optionName, envVar, sparkProp) =>
if (sc.getConf.contains(sparkProp)) {
extraArgs += (optionName, sc.getConf.get(sparkProp))
- } else if (System.getenv(envVar) != null) {
+ } else if (envVar != null && System.getenv(envVar) != null) {
extraArgs += (optionName, System.getenv(envVar))
if (deprecatedEnvVars.contains(envVar)) {
logWarning(s"NOTE: $envVar is deprecated. Use ${deprecatedEnvVars(envVar)} instead.")
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 01d33c9ce9..4ec976aa31 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -113,7 +113,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll {
Environment.PWD.$()
}
cp should contain(pwdVar)
- cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_HADOOP_CONF_DIR}")
+ cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_CONF_DIR}")
cp should not contain (Client.SPARK_JAR)
cp should not contain (Client.APP_JAR)
}
@@ -129,7 +129,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll {
val tempDir = Utils.createTempDir()
try {
- client.prepareLocalResources(tempDir.getAbsolutePath())
+ client.prepareLocalResources(tempDir.getAbsolutePath(), Nil)
sparkConf.getOption(Client.CONF_SPARK_USER_JAR) should be (Some(USER))
// The non-local path should be propagated by name only, since it will end up in the app's
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 93d587d0cb..a0f25ba450 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -56,6 +56,7 @@ class YarnClusterSuite extends SparkFunSuite with BeforeAndAfterAll with Matcher
""".stripMargin
private val TEST_PYFILE = """
+ |import mod1, mod2
|import sys
|from operator import add
|
@@ -67,7 +68,7 @@ class YarnClusterSuite extends SparkFunSuite with BeforeAndAfterAll with Matcher
| sc = SparkContext(conf=SparkConf())
| status = open(sys.argv[1],'w')
| result = "failure"
- | rdd = sc.parallelize(range(10))
+ | rdd = sc.parallelize(range(10)).map(lambda x: x * mod1.func() * mod2.func())
| cnt = rdd.count()
| if cnt == 10:
| result = "success"
@@ -76,6 +77,11 @@ class YarnClusterSuite extends SparkFunSuite with BeforeAndAfterAll with Matcher
| sc.stop()
""".stripMargin
+ private val TEST_PYMODULE = """
+ |def func():
+ | return 42
+ """.stripMargin
+
private var yarnCluster: MiniYARNCluster = _
private var tempDir: File = _
private var fakeSparkJar: File = _
@@ -124,7 +130,7 @@ class YarnClusterSuite extends SparkFunSuite with BeforeAndAfterAll with Matcher
logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}")
fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
- hadoopConfDir = new File(tempDir, Client.LOCALIZED_HADOOP_CONF_DIR)
+ hadoopConfDir = new File(tempDir, Client.LOCALIZED_CONF_DIR)
assert(hadoopConfDir.mkdir())
File.createTempFile("token", ".txt", hadoopConfDir)
}
@@ -151,26 +157,12 @@ class YarnClusterSuite extends SparkFunSuite with BeforeAndAfterAll with Matcher
}
}
- // Enable this once fix SPARK-6700
- test("run Python application in yarn-cluster mode") {
- val primaryPyFile = new File(tempDir, "test.py")
- Files.write(TEST_PYFILE, primaryPyFile, UTF_8)
- val pyFile = new File(tempDir, "test2.py")
- Files.write(TEST_PYFILE, pyFile, UTF_8)
- var result = File.createTempFile("result", null, tempDir)
+ test("run Python application in yarn-client mode") {
+ testPySpark(true)
+ }
- // The sbt assembly does not include pyspark / py4j python dependencies, so we need to
- // propagate SPARK_HOME so that those are added to PYTHONPATH. See PythonUtils.scala.
- val sparkHome = sys.props("spark.test.home")
- val extraConf = Map(
- "spark.executorEnv.SPARK_HOME" -> sparkHome,
- "spark.yarn.appMasterEnv.SPARK_HOME" -> sparkHome)
-
- runSpark(false, primaryPyFile.getAbsolutePath(),
- sparkArgs = Seq("--py-files", pyFile.getAbsolutePath()),
- appArgs = Seq(result.getAbsolutePath()),
- extraConf = extraConf)
- checkResult(result)
+ test("run Python application in yarn-cluster mode") {
+ testPySpark(false)
}
test("user class path first in client mode") {
@@ -188,6 +180,33 @@ class YarnClusterSuite extends SparkFunSuite with BeforeAndAfterAll with Matcher
checkResult(result)
}
+ private def testPySpark(clientMode: Boolean): Unit = {
+ val primaryPyFile = new File(tempDir, "test.py")
+ Files.write(TEST_PYFILE, primaryPyFile, UTF_8)
+
+ val moduleDir =
+ if (clientMode) {
+ // In client-mode, .py files added with --py-files are not visible in the driver.
+ // This is something that the launcher library would have to handle.
+ tempDir
+ } else {
+ val subdir = new File(tempDir, "pyModules")
+ subdir.mkdir()
+ subdir
+ }
+ val pyModule = new File(moduleDir, "mod1.py")
+ Files.write(TEST_PYMODULE, pyModule, UTF_8)
+
+ val mod2Archive = TestUtils.createJarWithFiles(Map("mod2.py" -> TEST_PYMODULE), moduleDir)
+ val pyFiles = Seq(pyModule.getAbsolutePath(), mod2Archive.getPath()).mkString(",")
+ val result = File.createTempFile("result", null, tempDir)
+
+ runSpark(clientMode, primaryPyFile.getAbsolutePath(),
+ sparkArgs = Seq("--py-files", pyFiles),
+ appArgs = Seq(result.getAbsolutePath()))
+ checkResult(result)
+ }
+
private def testUseClassPathFirst(clientMode: Boolean): Unit = {
// Create a jar file that contains a different version of "test.resource".
val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir)