aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-02-09 21:17:06 -0800
committerAndrew Or <andrew@databricks.com>2015-02-09 21:17:28 -0800
commit20a6013106b56a1a1cc3e8cda092330ffbe77cc3 (patch)
tree381068b96cbb3278ce4c2fd50bbf185fe1d6c3e9 /yarn
parent36c4e1d75933dc843acb747b91dc12e75ad1df42 (diff)
downloadspark-20a6013106b56a1a1cc3e8cda092330ffbe77cc3.tar.gz
spark-20a6013106b56a1a1cc3e8cda092330ffbe77cc3.tar.bz2
spark-20a6013106b56a1a1cc3e8cda092330ffbe77cc3.zip
[SPARK-2996] Implement userClassPathFirst for driver, yarn.
Yarn's config option `spark.yarn.user.classpath.first` does not work the same way as `spark.files.userClassPathFirst`; Yarn's version is a lot more dangerous, in that it modifies the system classpath, instead of restricting the changes to the user's class loader. So this change implements the behavior of the latter for Yarn, and deprecates the more dangerous choice. To be able to achieve feature-parity, I also implemented the option for drivers (the existing option only applies to executors). So now there are two options, each controlling whether to apply userClassPathFirst to the driver or executors. The old option was deprecated, and aliased to the new one (`spark.executor.userClassPathFirst`). The existing "child-first" class loader also had to be fixed. It didn't handle resources, and it was also doing some things that ended up causing JVM errors depending on how things were being called. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #3233 from vanzin/SPARK-2996 and squashes the following commits: 9cf9cf1 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 a1499e2 [Marcelo Vanzin] Remove SPARK_HOME propagation. fa7df88 [Marcelo Vanzin] Remove 'test.resource' file, create it dynamically. a8c69f1 [Marcelo Vanzin] Review feedback. cabf962 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 a1b8d7e [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 3f768e3 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 2ce3c7a [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 0e6d6be [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 70d4044 [Marcelo Vanzin] Fix pyspark/yarn-cluster test. 0fe7777 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 0e6ef19 [Marcelo Vanzin] Move class loaders around and make names more meaninful. fe970a7 [Marcelo Vanzin] Review feedback. 25d4fed [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 3cb6498 [Marcelo Vanzin] Call the right loadClass() method on the parent. fbb8ab5 [Marcelo Vanzin] Add locking in loadClass() to avoid deadlocks. 2e6c4b7 [Marcelo Vanzin] Mention new setting in documentation. b6497f9 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 a10f379 [Marcelo Vanzin] Some feedback. 3730151 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 f513871 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 44010b6 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 7b57cba [Marcelo Vanzin] Remove now outdated message. 5304d64 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 35949c8 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 54e1a98 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 d1273b2 [Marcelo Vanzin] Add test file to rat exclude. fa1aafa [Marcelo Vanzin] Remove write check on user jars. 89d8072 [Marcelo Vanzin] Cleanups. a963ea3 [Marcelo Vanzin] Implement spark.driver.userClassPathFirst for standalone cluster mode. 50afa5f [Marcelo Vanzin] Fix Yarn executor command line. 7d14397 [Marcelo Vanzin] Register user jars in executor up front. 7f8603c [Marcelo Vanzin] Fix yarn-cluster mode without userClassPathFirst. 20373f5 [Marcelo Vanzin] Fix ClientBaseSuite. 55c88fa [Marcelo Vanzin] Run all Yarn integration tests via spark-submit. 0b64d92 [Marcelo Vanzin] Add deprecation warning to yarn option. 4a84d87 [Marcelo Vanzin] Fix the child-first class loader. d0394b8 [Marcelo Vanzin] Add "deprecated configs" to SparkConf. 46d8cf2 [Marcelo Vanzin] Update doc with new option, change name to "userClassPathFirst". a314f2d [Marcelo Vanzin] Enable driver class path isolation in SparkSubmit. 91f7e54 [Marcelo Vanzin] [yarn] Enable executor class path isolation. a853e74 [Marcelo Vanzin] Re-work CoarseGrainedExecutorBackend command line arguments. 89522ef [Marcelo Vanzin] Add class path isolation support for Yarn cluster mode.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala25
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala133
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala25
-rw-r--r--yarn/src/test/resources/log4j.properties4
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala6
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala276
6 files changed, 301 insertions, 168 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 4cc320c5d5..a9bf861d16 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -19,9 +19,9 @@ package org.apache.spark.deploy.yarn
import scala.util.control.NonFatal
-import java.io.IOException
+import java.io.{File, IOException}
import java.lang.reflect.InvocationTargetException
-import java.net.Socket
+import java.net.{Socket, URL}
import java.util.concurrent.atomic.AtomicReference
import akka.actor._
@@ -38,7 +38,8 @@ import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil}
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.scheduler.cluster.YarnSchedulerBackend
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
+import org.apache.spark.util.{AkkaUtils, ChildFirstURLClassLoader, MutableURLClassLoader,
+ SignalLogger, Utils}
/**
* Common application master functionality for Spark on Yarn.
@@ -244,7 +245,6 @@ private[spark] class ApplicationMaster(
host: String,
port: String,
isClusterMode: Boolean): Unit = {
-
val driverUrl = AkkaUtils.address(
AkkaUtils.protocol(actorSystem),
SparkEnv.driverActorSystemName,
@@ -453,12 +453,24 @@ private[spark] class ApplicationMaster(
private def startUserApplication(): Thread = {
logInfo("Starting the user application in a separate Thread")
System.setProperty("spark.executor.instances", args.numExecutors.toString)
+
+ val classpath = Client.getUserClasspath(sparkConf)
+ val urls = classpath.map { entry =>
+ new URL("file:" + new File(entry.getPath()).getAbsolutePath())
+ }
+ val userClassLoader =
+ if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
+ new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
+ } else {
+ new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
+ }
+
if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
System.setProperty("spark.submit.pyFiles",
PythonRunner.formatPaths(args.pyFiles).mkString(","))
}
- val mainMethod = Class.forName(args.userClass, false,
- Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
+ val mainMethod = userClassLoader.loadClass(args.userClass)
+ .getMethod("main", classOf[Array[String]])
val userThread = new Thread {
override def run() {
@@ -483,6 +495,7 @@ private[spark] class ApplicationMaster(
}
}
}
+ userThread.setContextClassLoader(userClassLoader)
userThread.setName("Driver")
userThread.start()
userThread
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 8afc1ccdad..46d9df9348 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -183,8 +183,7 @@ private[spark] class Client(
private[yarn] def copyFileToRemote(
destDir: Path,
srcPath: Path,
- replication: Short,
- setPerms: Boolean = false): Path = {
+ replication: Short): Path = {
val destFs = destDir.getFileSystem(hadoopConf)
val srcFs = srcPath.getFileSystem(hadoopConf)
var destPath = srcPath
@@ -193,9 +192,7 @@ private[spark] class Client(
logInfo(s"Uploading resource $srcPath -> $destPath")
FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf)
destFs.setReplication(destPath, replication)
- if (setPerms) {
- destFs.setPermission(destPath, new FsPermission(APP_FILE_PERMISSION))
- }
+ destFs.setPermission(destPath, new FsPermission(APP_FILE_PERMISSION))
} else {
logInfo(s"Source and destination file systems are the same. Not copying $srcPath")
}
@@ -239,23 +236,22 @@ private[spark] class Client(
/**
* Copy the given main resource to the distributed cache if the scheme is not "local".
* Otherwise, set the corresponding key in our SparkConf to handle it downstream.
- * Each resource is represented by a 4-tuple of:
+ * Each resource is represented by a 3-tuple of:
* (1) destination resource name,
* (2) local path to the resource,
- * (3) Spark property key to set if the scheme is not local, and
- * (4) whether to set permissions for this resource
+ * (3) Spark property key to set if the scheme is not local
*/
List(
- (SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR, false),
- (APP_JAR, args.userJar, CONF_SPARK_USER_JAR, true),
- ("log4j.properties", oldLog4jConf.orNull, null, false)
- ).foreach { case (destName, _localPath, confKey, setPermissions) =>
+ (SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR),
+ (APP_JAR, args.userJar, CONF_SPARK_USER_JAR),
+ ("log4j.properties", oldLog4jConf.orNull, null)
+ ).foreach { case (destName, _localPath, confKey) =>
val localPath: String = if (_localPath != null) _localPath.trim() else ""
if (!localPath.isEmpty()) {
val localURI = new URI(localPath)
if (localURI.getScheme != LOCAL_SCHEME) {
val src = getQualifiedLocalPath(localURI, hadoopConf)
- val destPath = copyFileToRemote(dst, src, replication, setPermissions)
+ val destPath = copyFileToRemote(dst, src, replication)
val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
distCacheMgr.addResource(destFs, hadoopConf, destPath,
localResources, LocalResourceType.FILE, destName, statCache)
@@ -707,7 +703,7 @@ object Client extends Logging {
* Return the path to the given application's staging directory.
*/
private def getAppStagingDir(appId: ApplicationId): String = {
- SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
+ buildPath(SPARK_STAGING, appId.toString())
}
/**
@@ -783,7 +779,13 @@ object Client extends Logging {
/**
* Populate the classpath entry in the given environment map.
- * This includes the user jar, Spark jar, and any extra application jars.
+ *
+ * User jars are generally not added to the JVM's system classpath; those are handled by the AM
+ * and executor backend. When the deprecated `spark.yarn.user.classpath.first` is used, user jars
+ * are included in the system classpath, though. The extra class path and other uploaded files are
+ * always made available through the system class path.
+ *
+ * @param args Client arguments (when starting the AM) or null (when starting executors).
*/
private[yarn] def populateClasspath(
args: ClientArguments,
@@ -795,48 +797,38 @@ object Client extends Logging {
addClasspathEntry(
YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env
)
-
- // Normally the users app.jar is last in case conflicts with spark jars
if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) {
- addUserClasspath(args, sparkConf, env)
- addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env)
- populateHadoopClasspath(conf, env)
- } else {
- addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env)
- populateHadoopClasspath(conf, env)
- addUserClasspath(args, sparkConf, env)
+ val userClassPath =
+ if (args != null) {
+ getUserClasspath(Option(args.userJar), Option(args.addJars))
+ } else {
+ getUserClasspath(sparkConf)
+ }
+ userClassPath.foreach { x =>
+ addFileToClasspath(x, null, env)
+ }
}
-
- // Append all jar files under the working directory to the classpath.
- addClasspathEntry(
- YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + "*", env
- )
+ addFileToClasspath(new URI(sparkJar(sparkConf)), SPARK_JAR, env)
+ populateHadoopClasspath(conf, env)
+ sys.env.get(ENV_DIST_CLASSPATH).foreach(addClasspathEntry(_, env))
}
/**
- * Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly
- * to the classpath.
+ * Returns a list of URIs representing the user classpath.
+ *
+ * @param conf Spark configuration.
*/
- private def addUserClasspath(
- args: ClientArguments,
- conf: SparkConf,
- env: HashMap[String, String]): Unit = {
-
- // If `args` is not null, we are launching an AM container.
- // Otherwise, we are launching executor containers.
- val (mainJar, secondaryJars) =
- if (args != null) {
- (args.userJar, args.addJars)
- } else {
- (conf.get(CONF_SPARK_USER_JAR, null), conf.get(CONF_SPARK_YARN_SECONDARY_JARS, null))
- }
+ def getUserClasspath(conf: SparkConf): Array[URI] = {
+ getUserClasspath(conf.getOption(CONF_SPARK_USER_JAR),
+ conf.getOption(CONF_SPARK_YARN_SECONDARY_JARS))
+ }
- addFileToClasspath(mainJar, APP_JAR, env)
- if (secondaryJars != null) {
- secondaryJars.split(",").filter(_.nonEmpty).foreach { jar =>
- addFileToClasspath(jar, null, env)
- }
- }
+ private def getUserClasspath(
+ mainJar: Option[String],
+ secondaryJars: Option[String]): Array[URI] = {
+ val mainUri = mainJar.orElse(Some(APP_JAR)).map(new URI(_))
+ val secondaryUris = secondaryJars.map(_.split(",")).toSeq.flatten.map(new URI(_))
+ (mainUri ++ secondaryUris).toArray
}
/**
@@ -847,27 +839,19 @@ object Client extends Logging {
*
* If not a "local:" file and no alternate name, the environment is not modified.
*
- * @param path Path to add to classpath (optional).
+ * @param uri URI to add to classpath (optional).
* @param fileName Alternate name for the file (optional).
* @param env Map holding the environment variables.
*/
private def addFileToClasspath(
- path: String,
+ uri: URI,
fileName: String,
env: HashMap[String, String]): Unit = {
- if (path != null) {
- scala.util.control.Exception.ignoring(classOf[URISyntaxException]) {
- val uri = new URI(path)
- if (uri.getScheme == LOCAL_SCHEME) {
- addClasspathEntry(uri.getPath, env)
- return
- }
- }
- }
- if (fileName != null) {
- addClasspathEntry(
- YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + fileName, env
- )
+ if (uri != null && uri.getScheme == LOCAL_SCHEME) {
+ addClasspathEntry(uri.getPath, env)
+ } else if (fileName != null) {
+ addClasspathEntry(buildPath(
+ YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env)
}
}
@@ -963,4 +947,23 @@ object Client extends Logging {
new Path(qualifiedURI)
}
+ /**
+ * Whether to consider jars provided by the user to have precedence over the Spark jars when
+ * loading user classes.
+ */
+ def isUserClassPathFirst(conf: SparkConf, isDriver: Boolean): Boolean = {
+ if (isDriver) {
+ conf.getBoolean("spark.driver.userClassPathFirst", false)
+ } else {
+ conf.getBoolean("spark.executor.userClassPathFirst", false)
+ }
+ }
+
+ /**
+ * Joins all the path components using Path.SEPARATOR.
+ */
+ def buildPath(components: String*): String = {
+ components.mkString(Path.SEPARATOR)
+ }
+
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 7cd8c5f0f9..6d5b8fda76 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -17,6 +17,7 @@
package org.apache.spark.deploy.yarn
+import java.io.File
import java.net.URI
import java.nio.ByteBuffer
@@ -57,7 +58,7 @@ class ExecutorRunnable(
var nmClient: NMClient = _
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
lazy val env = prepareEnvironment(container)
-
+
def run = {
logInfo("Starting Executor Container")
nmClient = NMClient.createNMClient()
@@ -185,6 +186,16 @@ class ExecutorRunnable(
// For log4j configuration to reference
javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
+ val userClassPath = Client.getUserClasspath(sparkConf).flatMap { uri =>
+ val absPath =
+ if (new File(uri.getPath()).isAbsolute()) {
+ uri.getPath()
+ } else {
+ Client.buildPath(Environment.PWD.$(), uri.getPath())
+ }
+ Seq("--user-class-path", "file:" + absPath)
+ }.toSeq
+
val commands = prefixEnv ++ Seq(
YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java",
"-server",
@@ -196,11 +207,13 @@ class ExecutorRunnable(
"-XX:OnOutOfMemoryError='kill %p'") ++
javaOpts ++
Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
- masterAddress.toString,
- slaveId.toString,
- hostname.toString,
- executorCores.toString,
- appId,
+ "--driver-url", masterAddress.toString,
+ "--executor-id", slaveId.toString,
+ "--hostname", hostname.toString,
+ "--cores", executorCores.toString,
+ "--app-id", appId) ++
+ userClassPath ++
+ Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
diff --git a/yarn/src/test/resources/log4j.properties b/yarn/src/test/resources/log4j.properties
index 287c8e3563..aab41fa494 100644
--- a/yarn/src/test/resources/log4j.properties
+++ b/yarn/src/test/resources/log4j.properties
@@ -16,7 +16,7 @@
#
# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
+log4j.rootCategory=DEBUG, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=true
log4j.appender.file.file=target/unit-tests.log
@@ -25,4 +25,4 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN
-org.eclipse.jetty.LEVEL=WARN
+log4j.logger.org.apache.hadoop=WARN
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 2bb3dcffd6..f8f8129d22 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -82,6 +82,7 @@ class ClientSuite extends FunSuite with Matchers {
test("Local jar URIs") {
val conf = new Configuration()
val sparkConf = new SparkConf().set(Client.CONF_SPARK_JAR, SPARK)
+ .set("spark.yarn.user.classpath.first", "true")
val env = new MutableHashMap[String, String]()
val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
@@ -98,13 +99,10 @@ class ClientSuite extends FunSuite with Matchers {
})
if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
cp should contain("{{PWD}}")
- cp should contain(s"{{PWD}}${Path.SEPARATOR}*")
} else if (Utils.isWindows) {
cp should contain("%PWD%")
- cp should contain(s"%PWD%${Path.SEPARATOR}*")
} else {
cp should contain(Environment.PWD.$())
- cp should contain(s"${Environment.PWD.$()}${File.separator}*")
}
cp should not contain (Client.SPARK_JAR)
cp should not contain (Client.APP_JAR)
@@ -117,7 +115,7 @@ class ClientSuite extends FunSuite with Matchers {
val client = spy(new Client(args, conf, sparkConf))
doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
- any(classOf[Path]), anyShort(), anyBoolean())
+ any(classOf[Path]), anyShort())
val tempDir = Utils.createTempDir()
try {
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index e39de82740..0e37276ba7 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -17,27 +17,34 @@
package org.apache.spark.deploy.yarn
-import java.io.File
+import java.io.{File, FileOutputStream, OutputStreamWriter}
+import java.util.Properties
import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._
import scala.collection.mutable
-import com.google.common.base.Charsets
+import com.google.common.base.Charsets.UTF_8
+import com.google.common.io.ByteStreams
import com.google.common.io.Files
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.MiniYARNCluster
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
+import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, TestUtils}
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded}
import org.apache.spark.util.Utils
+/**
+ * Integration tests for YARN; these tests use a mini Yarn cluster to run Spark-on-YARN
+ * applications, and require the Spark assembly to be built before they can be successfully
+ * run.
+ */
class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging {
- // log4j configuration for the Yarn containers, so that their output is collected
- // by Yarn instead of trying to overwrite unit-tests.log.
+ // log4j configuration for the YARN containers, so that their output is collected
+ // by YARN instead of trying to overwrite unit-tests.log.
private val LOG4J_CONF = """
|log4j.rootCategory=DEBUG, console
|log4j.appender.console=org.apache.log4j.ConsoleAppender
@@ -52,13 +59,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
|
|from pyspark import SparkConf , SparkContext
|if __name__ == "__main__":
- | if len(sys.argv) != 3:
- | print >> sys.stderr, "Usage: test.py [master] [result file]"
+ | if len(sys.argv) != 2:
+ | print >> sys.stderr, "Usage: test.py [result file]"
| exit(-1)
- | conf = SparkConf()
- | conf.setMaster(sys.argv[1]).setAppName("python test in yarn cluster mode")
- | sc = SparkContext(conf=conf)
- | status = open(sys.argv[2],'w')
+ | sc = SparkContext(conf=SparkConf())
+ | status = open(sys.argv[1],'w')
| result = "failure"
| rdd = sc.parallelize(range(10))
| cnt = rdd.count()
@@ -72,23 +77,17 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
private var yarnCluster: MiniYARNCluster = _
private var tempDir: File = _
private var fakeSparkJar: File = _
- private var oldConf: Map[String, String] = _
+ private var logConfDir: File = _
override def beforeAll() {
super.beforeAll()
tempDir = Utils.createTempDir()
-
- val logConfDir = new File(tempDir, "log4j")
+ logConfDir = new File(tempDir, "log4j")
logConfDir.mkdir()
val logConfFile = new File(logConfDir, "log4j.properties")
- Files.write(LOG4J_CONF, logConfFile, Charsets.UTF_8)
-
- val childClasspath = logConfDir.getAbsolutePath() + File.pathSeparator +
- sys.props("java.class.path")
-
- oldConf = sys.props.filter { case (k, v) => k.startsWith("spark.") }.toMap
+ Files.write(LOG4J_CONF, logConfFile, UTF_8)
yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1)
yarnCluster.init(new YarnConfiguration())
@@ -119,99 +118,165 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
}
logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}")
- config.foreach { e =>
- sys.props += ("spark.hadoop." + e.getKey() -> e.getValue())
- }
fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
- val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
- sys.props += ("spark.yarn.appMasterEnv.SPARK_HOME" -> sparkHome)
- sys.props += ("spark.executorEnv.SPARK_HOME" -> sparkHome)
- sys.props += ("spark.yarn.jar" -> ("local:" + fakeSparkJar.getAbsolutePath()))
- sys.props += ("spark.executor.instances" -> "1")
- sys.props += ("spark.driver.extraClassPath" -> childClasspath)
- sys.props += ("spark.executor.extraClassPath" -> childClasspath)
- sys.props += ("spark.executor.extraJavaOptions" -> "-Dfoo=\"one two three\"")
- sys.props += ("spark.driver.extraJavaOptions" -> "-Dfoo=\"one two three\"")
}
override def afterAll() {
yarnCluster.stop()
- sys.props.retain { case (k, v) => !k.startsWith("spark.") }
- sys.props ++= oldConf
super.afterAll()
}
test("run Spark in yarn-client mode") {
- var result = File.createTempFile("result", null, tempDir)
- YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath()))
- checkResult(result)
-
- // verify log urls are present
- YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info =>
- assert(info.logUrlMap.nonEmpty)
- }
+ testBasicYarnApp(true)
}
test("run Spark in yarn-cluster mode") {
- val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
- var result = File.createTempFile("result", null, tempDir)
-
- val args = Array("--class", main,
- "--jar", "file:" + fakeSparkJar.getAbsolutePath(),
- "--arg", "yarn-cluster",
- "--arg", result.getAbsolutePath(),
- "--num-executors", "1")
- Client.main(args)
- checkResult(result)
-
- // verify log urls are present.
- YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info =>
- assert(info.logUrlMap.nonEmpty)
- }
+ testBasicYarnApp(false)
}
test("run Spark in yarn-cluster mode unsuccessfully") {
- val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
-
- // Use only one argument so the driver will fail
- val args = Array("--class", main,
- "--jar", "file:" + fakeSparkJar.getAbsolutePath(),
- "--arg", "yarn-cluster",
- "--num-executors", "1")
+ // Don't provide arguments so the driver will fail.
val exception = intercept[SparkException] {
- Client.main(args)
+ runSpark(false, mainClassName(YarnClusterDriver.getClass))
+ fail("Spark application should have failed.")
}
- assert(Utils.exceptionString(exception).contains("Application finished with failed status"))
}
test("run Python application in yarn-cluster mode") {
val primaryPyFile = new File(tempDir, "test.py")
- Files.write(TEST_PYFILE, primaryPyFile, Charsets.UTF_8)
+ Files.write(TEST_PYFILE, primaryPyFile, UTF_8)
val pyFile = new File(tempDir, "test2.py")
- Files.write(TEST_PYFILE, pyFile, Charsets.UTF_8)
+ Files.write(TEST_PYFILE, pyFile, UTF_8)
var result = File.createTempFile("result", null, tempDir)
- val args = Array("--class", "org.apache.spark.deploy.PythonRunner",
- "--primary-py-file", primaryPyFile.getAbsolutePath(),
- "--py-files", pyFile.getAbsolutePath(),
- "--arg", "yarn-cluster",
- "--arg", result.getAbsolutePath(),
- "--name", "python test in yarn-cluster mode",
- "--num-executors", "1")
- Client.main(args)
+ // The sbt assembly does not include pyspark / py4j python dependencies, so we need to
+ // propagate SPARK_HOME so that those are added to PYTHONPATH. See PythonUtils.scala.
+ val sparkHome = sys.props("spark.test.home")
+ val extraConf = Map(
+ "spark.executorEnv.SPARK_HOME" -> sparkHome,
+ "spark.yarn.appMasterEnv.SPARK_HOME" -> sparkHome)
+
+ runSpark(false, primaryPyFile.getAbsolutePath(),
+ sparkArgs = Seq("--py-files", pyFile.getAbsolutePath()),
+ appArgs = Seq(result.getAbsolutePath()),
+ extraConf = extraConf)
checkResult(result)
}
+ test("user class path first in client mode") {
+ testUseClassPathFirst(true)
+ }
+
+ test("user class path first in cluster mode") {
+ testUseClassPathFirst(false)
+ }
+
+ private def testBasicYarnApp(clientMode: Boolean): Unit = {
+ var result = File.createTempFile("result", null, tempDir)
+ runSpark(clientMode, mainClassName(YarnClusterDriver.getClass),
+ appArgs = Seq(result.getAbsolutePath()))
+ checkResult(result)
+ }
+
+ private def testUseClassPathFirst(clientMode: Boolean): Unit = {
+ // Create a jar file that contains a different version of "test.resource".
+ val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir)
+ val userJar = TestUtils.createJarWithFiles(Map("test.resource" -> "OVERRIDDEN"), tempDir)
+ val driverResult = File.createTempFile("driver", null, tempDir)
+ val executorResult = File.createTempFile("executor", null, tempDir)
+ runSpark(clientMode, mainClassName(YarnClasspathTest.getClass),
+ appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()),
+ extraClassPath = Seq(originalJar.getPath()),
+ extraJars = Seq("local:" + userJar.getPath()),
+ extraConf = Map(
+ "spark.driver.userClassPathFirst" -> "true",
+ "spark.executor.userClassPathFirst" -> "true"))
+ checkResult(driverResult, "OVERRIDDEN")
+ checkResult(executorResult, "OVERRIDDEN")
+ }
+
+ private def runSpark(
+ clientMode: Boolean,
+ klass: String,
+ appArgs: Seq[String] = Nil,
+ sparkArgs: Seq[String] = Nil,
+ extraClassPath: Seq[String] = Nil,
+ extraJars: Seq[String] = Nil,
+ extraConf: Map[String, String] = Map()): Unit = {
+ val master = if (clientMode) "yarn-client" else "yarn-cluster"
+ val props = new Properties()
+
+ props.setProperty("spark.yarn.jar", "local:" + fakeSparkJar.getAbsolutePath())
+
+ val childClasspath = logConfDir.getAbsolutePath() +
+ File.pathSeparator +
+ sys.props("java.class.path") +
+ File.pathSeparator +
+ extraClassPath.mkString(File.pathSeparator)
+ props.setProperty("spark.driver.extraClassPath", childClasspath)
+ props.setProperty("spark.executor.extraClassPath", childClasspath)
+
+ // SPARK-4267: make sure java options are propagated correctly.
+ props.setProperty("spark.driver.extraJavaOptions", "-Dfoo=\"one two three\"")
+ props.setProperty("spark.executor.extraJavaOptions", "-Dfoo=\"one two three\"")
+
+ yarnCluster.getConfig().foreach { e =>
+ props.setProperty("spark.hadoop." + e.getKey(), e.getValue())
+ }
+
+ sys.props.foreach { case (k, v) =>
+ if (k.startsWith("spark.")) {
+ props.setProperty(k, v)
+ }
+ }
+
+ extraConf.foreach { case (k, v) => props.setProperty(k, v) }
+
+ val propsFile = File.createTempFile("spark", ".properties", tempDir)
+ val writer = new OutputStreamWriter(new FileOutputStream(propsFile), UTF_8)
+ props.store(writer, "Spark properties.")
+ writer.close()
+
+ val extraJarArgs = if (!extraJars.isEmpty()) Seq("--jars", extraJars.mkString(",")) else Nil
+ val mainArgs =
+ if (klass.endsWith(".py")) {
+ Seq(klass)
+ } else {
+ Seq("--class", klass, fakeSparkJar.getAbsolutePath())
+ }
+ val argv =
+ Seq(
+ new File(sys.props("spark.test.home"), "bin/spark-submit").getAbsolutePath(),
+ "--master", master,
+ "--num-executors", "1",
+ "--properties-file", propsFile.getAbsolutePath()) ++
+ extraJarArgs ++
+ sparkArgs ++
+ mainArgs ++
+ appArgs
+
+ Utils.executeAndGetOutput(argv,
+ extraEnvironment = Map("YARN_CONF_DIR" -> tempDir.getAbsolutePath()))
+ }
+
/**
* This is a workaround for an issue with yarn-cluster mode: the Client class will not provide
* any sort of error when the job process finishes successfully, but the job itself fails. So
* the tests enforce that something is written to a file after everything is ok to indicate
* that the job succeeded.
*/
- private def checkResult(result: File) = {
- var resultString = Files.toString(result, Charsets.UTF_8)
- resultString should be ("success")
+ private def checkResult(result: File): Unit = {
+ checkResult(result, "success")
+ }
+
+ private def checkResult(result: File, expected: String): Unit = {
+ var resultString = Files.toString(result, UTF_8)
+ resultString should be (expected)
+ }
+
+ private def mainClassName(klass: Class[_]): String = {
+ klass.getName().stripSuffix("$")
}
}
@@ -229,22 +294,22 @@ private object YarnClusterDriver extends Logging with Matchers {
val WAIT_TIMEOUT_MILLIS = 10000
var listener: SaveExecutorInfo = null
- def main(args: Array[String]) = {
- if (args.length != 2) {
+ def main(args: Array[String]): Unit = {
+ if (args.length != 1) {
System.err.println(
s"""
|Invalid command line: ${args.mkString(" ")}
|
- |Usage: YarnClusterDriver [master] [result file]
+ |Usage: YarnClusterDriver [result file]
""".stripMargin)
System.exit(1)
}
listener = new SaveExecutorInfo
- val sc = new SparkContext(new SparkConf().setMaster(args(0))
+ val sc = new SparkContext(new SparkConf()
.setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
sc.addSparkListener(listener)
- val status = new File(args(1))
+ val status = new File(args(0))
var result = "failure"
try {
val data = sc.parallelize(1 to 4, 4).collect().toSet
@@ -253,7 +318,48 @@ private object YarnClusterDriver extends Logging with Matchers {
result = "success"
} finally {
sc.stop()
- Files.write(result, status, Charsets.UTF_8)
+ Files.write(result, status, UTF_8)
+ }
+
+ // verify log urls are present
+ listener.addedExecutorInfos.values.foreach { info =>
+ assert(info.logUrlMap.nonEmpty)
+ }
+ }
+
+}
+
+private object YarnClasspathTest {
+
+ def main(args: Array[String]): Unit = {
+ if (args.length != 2) {
+ System.err.println(
+ s"""
+ |Invalid command line: ${args.mkString(" ")}
+ |
+ |Usage: YarnClasspathTest [driver result file] [executor result file]
+ """.stripMargin)
+ System.exit(1)
+ }
+
+ readResource(args(0))
+ val sc = new SparkContext(new SparkConf())
+ try {
+ sc.parallelize(Seq(1)).foreach { x => readResource(args(1)) }
+ } finally {
+ sc.stop()
+ }
+ }
+
+ private def readResource(resultPath: String): Unit = {
+ var result = "failure"
+ try {
+ val ccl = Thread.currentThread().getContextClassLoader()
+ val resource = ccl.getResourceAsStream("test.resource")
+ val bytes = ByteStreams.toByteArray(resource)
+ result = new String(bytes, 0, bytes.length, UTF_8)
+ } finally {
+ Files.write(result, new File(resultPath), UTF_8)
}
}