aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala77
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala20
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala12
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala295
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala4
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala5
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala4
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala61
8 files changed, 270 insertions, 208 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index a0eae77426..b8978e25a0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -324,55 +324,20 @@ object SparkSubmit {
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
args.mainClass = "org.apache.spark.deploy.PythonRunner"
args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
- args.files = mergeFileLists(args.files, args.primaryResource)
+ if (clusterManager != YARN) {
+ // The YARN backend distributes the primary file differently, so don't merge it.
+ args.files = mergeFileLists(args.files, args.primaryResource)
+ }
+ }
+ if (clusterManager != YARN) {
+ // The YARN backend handles python files differently, so don't merge the lists.
+ args.files = mergeFileLists(args.files, args.pyFiles)
}
- args.files = mergeFileLists(args.files, args.pyFiles)
if (args.pyFiles != null) {
sysProps("spark.submit.pyFiles") = args.pyFiles
}
}
- // In yarn mode for a python app, add pyspark archives to files
- // that can be distributed with the job
- if (args.isPython && clusterManager == YARN) {
- var pyArchives: String = null
- val pyArchivesEnvOpt = sys.env.get("PYSPARK_ARCHIVES_PATH")
- if (pyArchivesEnvOpt.isDefined) {
- pyArchives = pyArchivesEnvOpt.get
- } else {
- if (!sys.env.contains("SPARK_HOME")) {
- printErrorAndExit("SPARK_HOME does not exist for python application in yarn mode.")
- }
- val pythonPath = new ArrayBuffer[String]
- for (sparkHome <- sys.env.get("SPARK_HOME")) {
- val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator)
- val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
- if (!pyArchivesFile.exists()) {
- printErrorAndExit("pyspark.zip does not exist for python application in yarn mode.")
- }
- val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip")
- if (!py4jFile.exists()) {
- printErrorAndExit("py4j-0.8.2.1-src.zip does not exist for python application " +
- "in yarn mode.")
- }
- pythonPath += pyArchivesFile.getAbsolutePath()
- pythonPath += py4jFile.getAbsolutePath()
- }
- pyArchives = pythonPath.mkString(",")
- }
-
- pyArchives = pyArchives.split(",").map { localPath =>
- val localURI = Utils.resolveURI(localPath)
- if (localURI.getScheme != "local") {
- args.files = mergeFileLists(args.files, localURI.toString)
- new Path(localPath).getName
- } else {
- localURI.getPath
- }
- }.mkString(File.pathSeparator)
- sysProps("spark.submit.pyArchives") = pyArchives
- }
-
// If we're running a R app, set the main class to our specific R runner
if (args.isR && deployMode == CLIENT) {
if (args.primaryResource == SPARKR_SHELL) {
@@ -386,19 +351,10 @@ object SparkSubmit {
}
}
- if (isYarnCluster) {
- // In yarn-cluster mode for a python app, add primary resource and pyFiles to files
- // that can be distributed with the job
- if (args.isPython) {
- args.files = mergeFileLists(args.files, args.primaryResource)
- args.files = mergeFileLists(args.files, args.pyFiles)
- }
-
+ if (isYarnCluster && args.isR) {
// In yarn-cluster mode for a R app, add primary resource to files
// that can be distributed with the job
- if (args.isR) {
- args.files = mergeFileLists(args.files, args.primaryResource)
- }
+ args.files = mergeFileLists(args.files, args.primaryResource)
}
// Special flag to avoid deprecation warnings at the client
@@ -515,17 +471,18 @@ object SparkSubmit {
}
}
+ // Let YARN know it's a pyspark app, so it distributes needed libraries.
+ if (clusterManager == YARN && args.isPython) {
+ sysProps.put("spark.yarn.isPython", "true")
+ }
+
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
if (args.isPython) {
- val mainPyFile = new Path(args.primaryResource).getName
- childArgs += ("--primary-py-file", mainPyFile)
+ childArgs += ("--primary-py-file", args.primaryResource)
if (args.pyFiles != null) {
- // These files will be distributed to each machine's working directory, so strip the
- // path prefix
- val pyFilesNames = args.pyFiles.split(",").map(p => (new Path(p)).getName).mkString(",")
- childArgs += ("--py-files", pyFilesNames)
+ childArgs += ("--py-files", args.pyFiles)
}
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
} else if (args.isR) {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 002d7b6eaf..83dafa4a12 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.rpc._
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.SparkException
-import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -46,6 +46,14 @@ private[spark] class ApplicationMaster(
client: YarnRMClient)
extends Logging {
+ // Load the properties file with the Spark configuration and set entries as system properties,
+ // so that user code run inside the AM also has access to them.
+ if (args.propertiesFile != null) {
+ Utils.getPropertiesFromFile(args.propertiesFile).foreach { case (k, v) =>
+ sys.props(k) = v
+ }
+ }
+
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.
@@ -490,9 +498,11 @@ private[spark] class ApplicationMaster(
new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
}
+ var userArgs = args.userArgs
if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
- System.setProperty("spark.submit.pyFiles",
- PythonRunner.formatPaths(args.pyFiles).mkString(","))
+ // When running pyspark, the app is run using PythonRunner. The second argument is the list
+ // of files to add to PYTHONPATH, which Client.scala already handles, so it's empty.
+ userArgs = Seq(args.primaryPyFile, "") ++ userArgs
}
if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
// TODO(davies): add R dependencies here
@@ -503,9 +513,7 @@ private[spark] class ApplicationMaster(
val userThread = new Thread {
override def run() {
try {
- val mainArgs = new Array[String](args.userArgs.size)
- args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
- mainMethod.invoke(null, mainArgs)
+ mainMethod.invoke(null, userArgs.toArray)
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
logDebug("Done running users class")
} catch {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index ae6dc1094d..68e9f6b4db 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -26,11 +26,11 @@ class ApplicationMasterArguments(val args: Array[String]) {
var userClass: String = null
var primaryPyFile: String = null
var primaryRFile: String = null
- var pyFiles: String = null
- var userArgs: Seq[String] = Seq[String]()
+ var userArgs: Seq[String] = Nil
var executorMemory = 1024
var executorCores = 1
var numExecutors = DEFAULT_NUMBER_EXECUTORS
+ var propertiesFile: String = null
parseArgs(args.toList)
@@ -59,10 +59,6 @@ class ApplicationMasterArguments(val args: Array[String]) {
primaryRFile = value
args = tail
- case ("--py-files") :: value :: tail =>
- pyFiles = value
- args = tail
-
case ("--args" | "--arg") :: value :: tail =>
userArgsBuffer += value
args = tail
@@ -79,6 +75,10 @@ class ApplicationMasterArguments(val args: Array[String]) {
executorCores = value
args = tail
+ case ("--properties-file") :: value :: tail =>
+ propertiesFile = value
+ args = tail
+
case _ =>
printUsageAndExit(1, args)
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index f4d43214b0..ec9402afff 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -17,11 +17,12 @@
package org.apache.spark.deploy.yarn
-import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, IOException}
+import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, IOException,
+ OutputStreamWriter}
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
import java.nio.ByteBuffer
import java.security.PrivilegedExceptionAction
-import java.util.UUID
+import java.util.{Properties, UUID}
import java.util.zip.{ZipEntry, ZipOutputStream}
import scala.collection.JavaConversions._
@@ -29,6 +30,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map}
import scala.reflect.runtime.universe
import scala.util.{Try, Success, Failure}
+import com.google.common.base.Charsets.UTF_8
import com.google.common.base.Objects
import com.google.common.io.Files
@@ -247,7 +249,9 @@ private[spark] class Client(
* This is used for setting up a container launch context for our ApplicationMaster.
* Exposed for testing.
*/
- def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
+ def prepareLocalResources(
+ appStagingDir: String,
+ pySparkArchives: Seq[String]): HashMap[String, LocalResource] = {
logInfo("Preparing resources for our AM container")
// Upload Spark and the application JAR to the remote file system if necessary,
// and add them as local resources to the application master.
@@ -277,20 +281,6 @@ private[spark] class Client(
"for alternatives.")
}
- // If we passed in a keytab, make sure we copy the keytab to the staging directory on
- // HDFS, and setup the relevant environment vars, so the AM can login again.
- if (loginFromKeytab) {
- logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" +
- " via the YARN Secure Distributed Cache.")
- val localUri = new URI(args.keytab)
- val localPath = getQualifiedLocalPath(localUri, hadoopConf)
- val destinationPath = copyFileToRemote(dst, localPath, replication)
- val destFs = FileSystem.get(destinationPath.toUri(), hadoopConf)
- distCacheMgr.addResource(
- destFs, hadoopConf, destinationPath, localResources, LocalResourceType.FILE,
- sparkConf.get("spark.yarn.keytab"), statCache, appMasterOnly = true)
- }
-
def addDistributedUri(uri: URI): Boolean = {
val uriStr = uri.toString()
if (distributedUris.contains(uriStr)) {
@@ -303,6 +293,57 @@ private[spark] class Client(
}
/**
+ * Distribute a file to the cluster.
+ *
+ * If the file's path is a "local:" URI, it's actually not distributed. Other files are copied
+ * to HDFS (if not already there) and added to the application's distributed cache.
+ *
+ * @param path URI of the file to distribute.
+ * @param resType Type of resource being distributed.
+ * @param destName Name of the file in the distributed cache.
+ * @param targetDir Subdirectory where to place the file.
+ * @param appMasterOnly Whether to distribute only to the AM.
+ * @return A 2-tuple. First item is whether the file is a "local:" URI. Second item is the
+ * localized path for non-local paths, or the input `path` for local paths.
+ * The localized path will be null if the URI has already been added to the cache.
+ */
+ def distribute(
+ path: String,
+ resType: LocalResourceType = LocalResourceType.FILE,
+ destName: Option[String] = None,
+ targetDir: Option[String] = None,
+ appMasterOnly: Boolean = false): (Boolean, String) = {
+ val localURI = new URI(path.trim())
+ if (localURI.getScheme != LOCAL_SCHEME) {
+ if (addDistributedUri(localURI)) {
+ val localPath = getQualifiedLocalPath(localURI, hadoopConf)
+ val linkname = targetDir.map(_ + "/").getOrElse("") +
+ destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName())
+ val destPath = copyFileToRemote(dst, localPath, replication)
+ distCacheMgr.addResource(
+ fs, hadoopConf, destPath, localResources, resType, linkname, statCache,
+ appMasterOnly = appMasterOnly)
+ (false, linkname)
+ } else {
+ (false, null)
+ }
+ } else {
+ (true, path.trim())
+ }
+ }
+
+ // If we passed in a keytab, make sure we copy the keytab to the staging directory on
+ // HDFS, and setup the relevant environment vars, so the AM can login again.
+ if (loginFromKeytab) {
+ logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" +
+ " via the YARN Secure Distributed Cache.")
+ val (_, localizedPath) = distribute(args.keytab,
+ destName = Some(sparkConf.get("spark.yarn.keytab")),
+ appMasterOnly = true)
+ require(localizedPath != null, "Keytab file already distributed.")
+ }
+
+ /**
* Copy the given main resource to the distributed cache if the scheme is not "local".
* Otherwise, set the corresponding key in our SparkConf to handle it downstream.
* Each resource is represented by a 3-tuple of:
@@ -314,33 +355,18 @@ private[spark] class Client(
(SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR),
(APP_JAR, args.userJar, CONF_SPARK_USER_JAR),
("log4j.properties", oldLog4jConf.orNull, null)
- ).foreach { case (destName, _localPath, confKey) =>
- val localPath: String = if (_localPath != null) _localPath.trim() else ""
- if (!localPath.isEmpty()) {
- val localURI = new URI(localPath)
- if (localURI.getScheme != LOCAL_SCHEME) {
- if (addDistributedUri(localURI)) {
- val src = getQualifiedLocalPath(localURI, hadoopConf)
- val destPath = copyFileToRemote(dst, src, replication)
- val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
- distCacheMgr.addResource(destFs, hadoopConf, destPath,
- localResources, LocalResourceType.FILE, destName, statCache)
- }
- } else if (confKey != null) {
+ ).foreach { case (destName, path, confKey) =>
+ if (path != null && !path.trim().isEmpty()) {
+ val (isLocal, localizedPath) = distribute(path, destName = Some(destName))
+ if (isLocal && confKey != null) {
+ require(localizedPath != null, s"Path $path already distributed.")
// If the resource is intended for local use only, handle this downstream
// by setting the appropriate property
- sparkConf.set(confKey, localPath)
+ sparkConf.set(confKey, localizedPath)
}
}
}
- createConfArchive().foreach { file =>
- require(addDistributedUri(file.toURI()))
- val destPath = copyFileToRemote(dst, new Path(file.toURI()), replication)
- distCacheMgr.addResource(fs, hadoopConf, destPath, localResources, LocalResourceType.ARCHIVE,
- LOCALIZED_HADOOP_CONF_DIR, statCache, appMasterOnly = true)
- }
-
/**
* Do the same for any additional resources passed in through ClientArguments.
* Each resource category is represented by a 3-tuple of:
@@ -356,21 +382,10 @@ private[spark] class Client(
).foreach { case (flist, resType, addToClasspath) =>
if (flist != null && !flist.isEmpty()) {
flist.split(',').foreach { file =>
- val localURI = new URI(file.trim())
- if (localURI.getScheme != LOCAL_SCHEME) {
- if (addDistributedUri(localURI)) {
- val localPath = new Path(localURI)
- val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
- val destPath = copyFileToRemote(dst, localPath, replication)
- distCacheMgr.addResource(
- fs, hadoopConf, destPath, localResources, resType, linkname, statCache)
- if (addToClasspath) {
- cachedSecondaryJarLinks += linkname
- }
- }
- } else if (addToClasspath) {
- // Resource is intended for local use only and should be added to the class path
- cachedSecondaryJarLinks += file.trim()
+ val (_, localizedPath) = distribute(file, resType = resType)
+ require(localizedPath != null)
+ if (addToClasspath) {
+ cachedSecondaryJarLinks += localizedPath
}
}
}
@@ -379,11 +394,31 @@ private[spark] class Client(
sparkConf.set(CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
}
+ if (isClusterMode && args.primaryPyFile != null) {
+ distribute(args.primaryPyFile, appMasterOnly = true)
+ }
+
+ pySparkArchives.foreach { f => distribute(f) }
+
+ // The python files list needs to be treated especially. All files that are not an
+ // archive need to be placed in a subdirectory that will be added to PYTHONPATH.
+ args.pyFiles.foreach { f =>
+ val targetDir = if (f.endsWith(".py")) Some(LOCALIZED_PYTHON_DIR) else None
+ distribute(f, targetDir = targetDir)
+ }
+
+ // Distribute an archive with Hadoop and Spark configuration for the AM.
+ val (_, confLocalizedPath) = distribute(createConfArchive().getAbsolutePath(),
+ resType = LocalResourceType.ARCHIVE,
+ destName = Some(LOCALIZED_CONF_DIR),
+ appMasterOnly = true)
+ require(confLocalizedPath != null)
+
localResources
}
/**
- * Create an archive with the Hadoop config files for distribution.
+ * Create an archive with the config files for distribution.
*
* These are only used by the AM, since executors will use the configuration object broadcast by
* the driver. The files are zipped and added to the job as an archive, so that YARN will explode
@@ -395,8 +430,11 @@ private[spark] class Client(
*
* Currently this makes a shallow copy of the conf directory. If there are cases where a
* Hadoop config directory contains subdirectories, this code will have to be fixed.
+ *
+ * The archive also contains some Spark configuration. Namely, it saves the contents of
+ * SparkConf in a file to be loaded by the AM process.
*/
- private def createConfArchive(): Option[File] = {
+ private def createConfArchive(): File = {
val hadoopConfFiles = new HashMap[String, File]()
Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey =>
sys.env.get(envKey).foreach { path =>
@@ -411,28 +449,32 @@ private[spark] class Client(
}
}
- if (!hadoopConfFiles.isEmpty) {
- val hadoopConfArchive = File.createTempFile(LOCALIZED_HADOOP_CONF_DIR, ".zip",
- new File(Utils.getLocalDir(sparkConf)))
+ val confArchive = File.createTempFile(LOCALIZED_CONF_DIR, ".zip",
+ new File(Utils.getLocalDir(sparkConf)))
+ val confStream = new ZipOutputStream(new FileOutputStream(confArchive))
- val hadoopConfStream = new ZipOutputStream(new FileOutputStream(hadoopConfArchive))
- try {
- hadoopConfStream.setLevel(0)
- hadoopConfFiles.foreach { case (name, file) =>
- if (file.canRead()) {
- hadoopConfStream.putNextEntry(new ZipEntry(name))
- Files.copy(file, hadoopConfStream)
- hadoopConfStream.closeEntry()
- }
+ try {
+ confStream.setLevel(0)
+ hadoopConfFiles.foreach { case (name, file) =>
+ if (file.canRead()) {
+ confStream.putNextEntry(new ZipEntry(name))
+ Files.copy(file, confStream)
+ confStream.closeEntry()
}
- } finally {
- hadoopConfStream.close()
}
- Some(hadoopConfArchive)
- } else {
- None
+ // Save Spark configuration to a file in the archive.
+ val props = new Properties()
+ sparkConf.getAll.foreach { case (k, v) => props.setProperty(k, v) }
+ confStream.putNextEntry(new ZipEntry(SPARK_CONF_FILE))
+ val writer = new OutputStreamWriter(confStream, UTF_8)
+ props.store(writer, "Spark configuration.")
+ writer.flush()
+ confStream.closeEntry()
+ } finally {
+ confStream.close()
}
+ confArchive
}
/**
@@ -460,7 +502,9 @@ private[spark] class Client(
/**
* Set up the environment for launching our ApplicationMaster container.
*/
- private def setupLaunchEnv(stagingDir: String): HashMap[String, String] = {
+ private def setupLaunchEnv(
+ stagingDir: String,
+ pySparkArchives: Seq[String]): HashMap[String, String] = {
logInfo("Setting up the launch environment for our AM container")
val env = new HashMap[String, String]()
val extraCp = sparkConf.getOption("spark.driver.extraClassPath")
@@ -478,9 +522,6 @@ private[spark] class Client(
val renewalInterval = getTokenRenewalInterval(stagingDirPath)
sparkConf.set("spark.yarn.token.renewal.interval", renewalInterval.toString)
}
- // Set the environment variables to be passed on to the executors.
- distCacheMgr.setDistFilesEnv(env)
- distCacheMgr.setDistArchivesEnv(env)
// Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.*
val amEnvPrefix = "spark.yarn.appMasterEnv."
@@ -497,15 +538,32 @@ private[spark] class Client(
env("SPARK_YARN_USER_ENV") = userEnvs
}
- // if spark.submit.pyArchives is in sparkConf, append pyArchives to PYTHONPATH
- // that can be passed on to the ApplicationMaster and the executors.
- if (sparkConf.contains("spark.submit.pyArchives")) {
- var pythonPath = sparkConf.get("spark.submit.pyArchives")
- if (env.contains("PYTHONPATH")) {
- pythonPath = Seq(env.get("PYTHONPATH"), pythonPath).mkString(File.pathSeparator)
+ // If pyFiles contains any .py files, we need to add LOCALIZED_PYTHON_DIR to the PYTHONPATH
+ // of the container processes too. Add all non-.py files directly to PYTHONPATH.
+ //
+ // NOTE: the code currently does not handle .py files defined with a "local:" scheme.
+ val pythonPath = new ListBuffer[String]()
+ val (pyFiles, pyArchives) = args.pyFiles.partition(_.endsWith(".py"))
+ if (pyFiles.nonEmpty) {
+ pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
+ LOCALIZED_PYTHON_DIR)
+ }
+ (pySparkArchives ++ pyArchives).foreach { path =>
+ val uri = new URI(path)
+ if (uri.getScheme != LOCAL_SCHEME) {
+ pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
+ new Path(path).getName())
+ } else {
+ pythonPath += uri.getPath()
}
- env("PYTHONPATH") = pythonPath
- sparkConf.setExecutorEnv("PYTHONPATH", pythonPath)
+ }
+
+ // Finally, update the Spark config to propagate PYTHONPATH to the AM and executors.
+ if (pythonPath.nonEmpty) {
+ val pythonPathStr = (sys.env.get("PYTHONPATH") ++ pythonPath)
+ .mkString(YarnSparkHadoopUtil.getClassPathSeparator)
+ env("PYTHONPATH") = pythonPathStr
+ sparkConf.setExecutorEnv("PYTHONPATH", pythonPathStr)
}
// In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to
@@ -555,8 +613,19 @@ private[spark] class Client(
logInfo("Setting up container launch context for our AM")
val appId = newAppResponse.getApplicationId
val appStagingDir = getAppStagingDir(appId)
- val localResources = prepareLocalResources(appStagingDir)
- val launchEnv = setupLaunchEnv(appStagingDir)
+ val pySparkArchives =
+ if (sys.props.getOrElse("spark.yarn.isPython", "false").toBoolean) {
+ findPySparkArchives()
+ } else {
+ Nil
+ }
+ val launchEnv = setupLaunchEnv(appStagingDir, pySparkArchives)
+ val localResources = prepareLocalResources(appStagingDir, pySparkArchives)
+
+ // Set the environment variables to be passed on to the executors.
+ distCacheMgr.setDistFilesEnv(launchEnv)
+ distCacheMgr.setDistArchivesEnv(launchEnv)
+
val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
amContainer.setLocalResources(localResources)
amContainer.setEnvironment(launchEnv)
@@ -596,13 +665,6 @@ private[spark] class Client(
javaOpts += "-XX:CMSIncrementalDutyCycle=10"
}
- // Forward the Spark configuration to the application master / executors.
- // TODO: it might be nicer to pass these as an internal environment variable rather than
- // as Java options, due to complications with string parsing of nested quotes.
- for ((k, v) <- sparkConf.getAll) {
- javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v")
- }
-
// Include driver-specific java options if we are launching a driver
if (isClusterMode) {
val driverOpts = sparkConf.getOption("spark.driver.extraJavaOptions")
@@ -655,14 +717,8 @@ private[spark] class Client(
Nil
}
val primaryPyFile =
- if (args.primaryPyFile != null) {
- Seq("--primary-py-file", args.primaryPyFile)
- } else {
- Nil
- }
- val pyFiles =
- if (args.pyFiles != null) {
- Seq("--py-files", args.pyFiles)
+ if (isClusterMode && args.primaryPyFile != null) {
+ Seq("--primary-py-file", new Path(args.primaryPyFile).getName())
} else {
Nil
}
@@ -678,9 +734,6 @@ private[spark] class Client(
} else {
Class.forName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
- if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
- args.userArgs = ArrayBuffer(args.primaryPyFile, args.pyFiles) ++ args.userArgs
- }
if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
args.userArgs = ArrayBuffer(args.primaryRFile) ++ args.userArgs
}
@@ -688,11 +741,13 @@ private[spark] class Client(
Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
}
val amArgs =
- Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ pyFiles ++ primaryRFile ++
+ Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++
userArgs ++ Seq(
"--executor-memory", args.executorMemory.toString + "m",
"--executor-cores", args.executorCores.toString,
- "--num-executors ", args.numExecutors.toString)
+ "--num-executors ", args.numExecutors.toString,
+ "--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
+ LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
// Command for the ApplicationMaster
val commands = prefixEnv ++ Seq(
@@ -857,6 +912,22 @@ private[spark] class Client(
}
}
}
+
+ private def findPySparkArchives(): Seq[String] = {
+ sys.env.get("PYSPARK_ARCHIVES_PATH")
+ .map(_.split(",").toSeq)
+ .getOrElse {
+ val pyLibPath = Seq(sys.env("SPARK_HOME"), "python", "lib").mkString(File.separator)
+ val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
+ require(pyArchivesFile.exists(),
+ "pyspark.zip not found; cannot run pyspark application in YARN mode.")
+ val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip")
+ require(py4jFile.exists(),
+ "py4j-0.8.2.1-src.zip not found; cannot run pyspark application in YARN mode.")
+ Seq(pyArchivesFile.getAbsolutePath(), py4jFile.getAbsolutePath())
+ }
+ }
+
}
object Client extends Logging {
@@ -907,8 +978,14 @@ object Client extends Logging {
// Distribution-defined classpath to add to processes
val ENV_DIST_CLASSPATH = "SPARK_DIST_CLASSPATH"
- // Subdirectory where the user's hadoop config files will be placed.
- val LOCALIZED_HADOOP_CONF_DIR = "__hadoop_conf__"
+ // Subdirectory where the user's Spark and Hadoop config files will be placed.
+ val LOCALIZED_CONF_DIR = "__spark_conf__"
+
+ // Name of the file in the conf archive containing Spark configuration.
+ val SPARK_CONF_FILE = "__spark_conf__.properties"
+
+ // Subdirectory where the user's python files (not archives) will be placed.
+ val LOCALIZED_PYTHON_DIR = "__pyfiles__"
/**
* Find the user-defined Spark jar if configured, or return the jar containing this
@@ -1033,7 +1110,7 @@ object Client extends Logging {
if (isAM) {
addClasspathEntry(
YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR +
- LOCALIZED_HADOOP_CONF_DIR, env)
+ LOCALIZED_CONF_DIR, env)
}
if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 9c7b1b3988..35e990602a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -30,7 +30,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
var archives: String = null
var userJar: String = null
var userClass: String = null
- var pyFiles: String = null
+ var pyFiles: Seq[String] = Nil
var primaryPyFile: String = null
var primaryRFile: String = null
var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
@@ -228,7 +228,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
args = tail
case ("--py-files") :: value :: tail =>
- pyFiles = value
+ pyFiles = value.split(",")
args = tail
case ("--files") :: value :: tail =>
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 99c05329b4..1c8d7ec576 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -76,7 +76,8 @@ private[spark] class YarnClientSchedulerBackend(
("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"),
("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"),
- ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue")
+ ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
+ ("--py-files", null, "spark.submit.pyFiles")
)
// Warn against the following deprecated environment variables: env var -> suggestion
val deprecatedEnvVars = Map(
@@ -86,7 +87,7 @@ private[spark] class YarnClientSchedulerBackend(
optionTuples.foreach { case (optionName, envVar, sparkProp) =>
if (sc.getConf.contains(sparkProp)) {
extraArgs += (optionName, sc.getConf.get(sparkProp))
- } else if (System.getenv(envVar) != null) {
+ } else if (envVar != null && System.getenv(envVar) != null) {
extraArgs += (optionName, System.getenv(envVar))
if (deprecatedEnvVars.contains(envVar)) {
logWarning(s"NOTE: $envVar is deprecated. Use ${deprecatedEnvVars(envVar)} instead.")
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 01d33c9ce9..4ec976aa31 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -113,7 +113,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll {
Environment.PWD.$()
}
cp should contain(pwdVar)
- cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_HADOOP_CONF_DIR}")
+ cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_CONF_DIR}")
cp should not contain (Client.SPARK_JAR)
cp should not contain (Client.APP_JAR)
}
@@ -129,7 +129,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll {
val tempDir = Utils.createTempDir()
try {
- client.prepareLocalResources(tempDir.getAbsolutePath())
+ client.prepareLocalResources(tempDir.getAbsolutePath(), Nil)
sparkConf.getOption(Client.CONF_SPARK_USER_JAR) should be (Some(USER))
// The non-local path should be propagated by name only, since it will end up in the app's
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 93d587d0cb..a0f25ba450 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -56,6 +56,7 @@ class YarnClusterSuite extends SparkFunSuite with BeforeAndAfterAll with Matcher
""".stripMargin
private val TEST_PYFILE = """
+ |import mod1, mod2
|import sys
|from operator import add
|
@@ -67,7 +68,7 @@ class YarnClusterSuite extends SparkFunSuite with BeforeAndAfterAll with Matcher
| sc = SparkContext(conf=SparkConf())
| status = open(sys.argv[1],'w')
| result = "failure"
- | rdd = sc.parallelize(range(10))
+ | rdd = sc.parallelize(range(10)).map(lambda x: x * mod1.func() * mod2.func())
| cnt = rdd.count()
| if cnt == 10:
| result = "success"
@@ -76,6 +77,11 @@ class YarnClusterSuite extends SparkFunSuite with BeforeAndAfterAll with Matcher
| sc.stop()
""".stripMargin
+ private val TEST_PYMODULE = """
+ |def func():
+ | return 42
+ """.stripMargin
+
private var yarnCluster: MiniYARNCluster = _
private var tempDir: File = _
private var fakeSparkJar: File = _
@@ -124,7 +130,7 @@ class YarnClusterSuite extends SparkFunSuite with BeforeAndAfterAll with Matcher
logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}")
fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
- hadoopConfDir = new File(tempDir, Client.LOCALIZED_HADOOP_CONF_DIR)
+ hadoopConfDir = new File(tempDir, Client.LOCALIZED_CONF_DIR)
assert(hadoopConfDir.mkdir())
File.createTempFile("token", ".txt", hadoopConfDir)
}
@@ -151,26 +157,12 @@ class YarnClusterSuite extends SparkFunSuite with BeforeAndAfterAll with Matcher
}
}
- // Enable this once fix SPARK-6700
- test("run Python application in yarn-cluster mode") {
- val primaryPyFile = new File(tempDir, "test.py")
- Files.write(TEST_PYFILE, primaryPyFile, UTF_8)
- val pyFile = new File(tempDir, "test2.py")
- Files.write(TEST_PYFILE, pyFile, UTF_8)
- var result = File.createTempFile("result", null, tempDir)
+ test("run Python application in yarn-client mode") {
+ testPySpark(true)
+ }
- // The sbt assembly does not include pyspark / py4j python dependencies, so we need to
- // propagate SPARK_HOME so that those are added to PYTHONPATH. See PythonUtils.scala.
- val sparkHome = sys.props("spark.test.home")
- val extraConf = Map(
- "spark.executorEnv.SPARK_HOME" -> sparkHome,
- "spark.yarn.appMasterEnv.SPARK_HOME" -> sparkHome)
-
- runSpark(false, primaryPyFile.getAbsolutePath(),
- sparkArgs = Seq("--py-files", pyFile.getAbsolutePath()),
- appArgs = Seq(result.getAbsolutePath()),
- extraConf = extraConf)
- checkResult(result)
+ test("run Python application in yarn-cluster mode") {
+ testPySpark(false)
}
test("user class path first in client mode") {
@@ -188,6 +180,33 @@ class YarnClusterSuite extends SparkFunSuite with BeforeAndAfterAll with Matcher
checkResult(result)
}
+ private def testPySpark(clientMode: Boolean): Unit = {
+ val primaryPyFile = new File(tempDir, "test.py")
+ Files.write(TEST_PYFILE, primaryPyFile, UTF_8)
+
+ val moduleDir =
+ if (clientMode) {
+ // In client-mode, .py files added with --py-files are not visible in the driver.
+ // This is something that the launcher library would have to handle.
+ tempDir
+ } else {
+ val subdir = new File(tempDir, "pyModules")
+ subdir.mkdir()
+ subdir
+ }
+ val pyModule = new File(moduleDir, "mod1.py")
+ Files.write(TEST_PYMODULE, pyModule, UTF_8)
+
+ val mod2Archive = TestUtils.createJarWithFiles(Map("mod2.py" -> TEST_PYMODULE), moduleDir)
+ val pyFiles = Seq(pyModule.getAbsolutePath(), mod2Archive.getPath()).mkString(",")
+ val result = File.createTempFile("result", null, tempDir)
+
+ runSpark(clientMode, primaryPyFile.getAbsolutePath(),
+ sparkArgs = Seq("--py-files", pyFiles),
+ appArgs = Seq(result.getAbsolutePath()))
+ checkResult(result)
+ }
+
private def testUseClassPathFirst(clientMode: Boolean): Unit = {
// Create a jar file that contains a different version of "test.resource".
val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir)