From 20a6013106b56a1a1cc3e8cda092330ffbe77cc3 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 9 Feb 2015 21:17:06 -0800 Subject: [SPARK-2996] Implement userClassPathFirst for driver, yarn. Yarn's config option `spark.yarn.user.classpath.first` does not work the same way as `spark.files.userClassPathFirst`; Yarn's version is a lot more dangerous, in that it modifies the system classpath, instead of restricting the changes to the user's class loader. So this change implements the behavior of the latter for Yarn, and deprecates the more dangerous choice. To be able to achieve feature-parity, I also implemented the option for drivers (the existing option only applies to executors). So now there are two options, each controlling whether to apply userClassPathFirst to the driver or executors. The old option was deprecated, and aliased to the new one (`spark.executor.userClassPathFirst`). The existing "child-first" class loader also had to be fixed. It didn't handle resources, and it was also doing some things that ended up causing JVM errors depending on how things were being called. Author: Marcelo Vanzin Closes #3233 from vanzin/SPARK-2996 and squashes the following commits: 9cf9cf1 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 a1499e2 [Marcelo Vanzin] Remove SPARK_HOME propagation. fa7df88 [Marcelo Vanzin] Remove 'test.resource' file, create it dynamically. a8c69f1 [Marcelo Vanzin] Review feedback. cabf962 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 a1b8d7e [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 3f768e3 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 2ce3c7a [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 0e6d6be [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 70d4044 [Marcelo Vanzin] Fix pyspark/yarn-cluster test. 0fe7777 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 0e6ef19 [Marcelo Vanzin] Move class loaders around and make names more meaninful. fe970a7 [Marcelo Vanzin] Review feedback. 25d4fed [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 3cb6498 [Marcelo Vanzin] Call the right loadClass() method on the parent. fbb8ab5 [Marcelo Vanzin] Add locking in loadClass() to avoid deadlocks. 2e6c4b7 [Marcelo Vanzin] Mention new setting in documentation. b6497f9 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 a10f379 [Marcelo Vanzin] Some feedback. 3730151 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 f513871 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 44010b6 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 7b57cba [Marcelo Vanzin] Remove now outdated message. 5304d64 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 35949c8 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 54e1a98 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 d1273b2 [Marcelo Vanzin] Add test file to rat exclude. fa1aafa [Marcelo Vanzin] Remove write check on user jars. 89d8072 [Marcelo Vanzin] Cleanups. a963ea3 [Marcelo Vanzin] Implement spark.driver.userClassPathFirst for standalone cluster mode. 50afa5f [Marcelo Vanzin] Fix Yarn executor command line. 7d14397 [Marcelo Vanzin] Register user jars in executor up front. 7f8603c [Marcelo Vanzin] Fix yarn-cluster mode without userClassPathFirst. 20373f5 [Marcelo Vanzin] Fix ClientBaseSuite. 55c88fa [Marcelo Vanzin] Run all Yarn integration tests via spark-submit. 0b64d92 [Marcelo Vanzin] Add deprecation warning to yarn option. 4a84d87 [Marcelo Vanzin] Fix the child-first class loader. d0394b8 [Marcelo Vanzin] Add "deprecated configs" to SparkConf. 46d8cf2 [Marcelo Vanzin] Update doc with new option, change name to "userClassPathFirst". a314f2d [Marcelo Vanzin] Enable driver class path isolation in SparkSubmit. 91f7e54 [Marcelo Vanzin] [yarn] Enable executor class path isolation. a853e74 [Marcelo Vanzin] Re-work CoarseGrainedExecutorBackend command line arguments. 89522ef [Marcelo Vanzin] Add class path isolation support for Yarn cluster mode. --- .../spark/deploy/yarn/ApplicationMaster.scala | 25 +- .../org/apache/spark/deploy/yarn/Client.scala | 133 +++++----- .../spark/deploy/yarn/ExecutorRunnable.scala | 25 +- yarn/src/test/resources/log4j.properties | 4 +- .../org/apache/spark/deploy/yarn/ClientSuite.scala | 6 +- .../spark/deploy/yarn/YarnClusterSuite.scala | 276 ++++++++++++++------- 6 files changed, 301 insertions(+), 168 deletions(-) (limited to 'yarn') diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 4cc320c5d5..a9bf861d16 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -19,9 +19,9 @@ package org.apache.spark.deploy.yarn import scala.util.control.NonFatal -import java.io.IOException +import java.io.{File, IOException} import java.lang.reflect.InvocationTargetException -import java.net.Socket +import java.net.{Socket, URL} import java.util.concurrent.atomic.AtomicReference import akka.actor._ @@ -38,7 +38,8 @@ import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil} import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.scheduler.cluster.YarnSchedulerBackend import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ -import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils} +import org.apache.spark.util.{AkkaUtils, ChildFirstURLClassLoader, MutableURLClassLoader, + SignalLogger, Utils} /** * Common application master functionality for Spark on Yarn. @@ -244,7 +245,6 @@ private[spark] class ApplicationMaster( host: String, port: String, isClusterMode: Boolean): Unit = { - val driverUrl = AkkaUtils.address( AkkaUtils.protocol(actorSystem), SparkEnv.driverActorSystemName, @@ -453,12 +453,24 @@ private[spark] class ApplicationMaster( private def startUserApplication(): Thread = { logInfo("Starting the user application in a separate Thread") System.setProperty("spark.executor.instances", args.numExecutors.toString) + + val classpath = Client.getUserClasspath(sparkConf) + val urls = classpath.map { entry => + new URL("file:" + new File(entry.getPath()).getAbsolutePath()) + } + val userClassLoader = + if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) { + new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader) + } else { + new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader) + } + if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) { System.setProperty("spark.submit.pyFiles", PythonRunner.formatPaths(args.pyFiles).mkString(",")) } - val mainMethod = Class.forName(args.userClass, false, - Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) + val mainMethod = userClassLoader.loadClass(args.userClass) + .getMethod("main", classOf[Array[String]]) val userThread = new Thread { override def run() { @@ -483,6 +495,7 @@ private[spark] class ApplicationMaster( } } } + userThread.setContextClassLoader(userClassLoader) userThread.setName("Driver") userThread.start() userThread diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 8afc1ccdad..46d9df9348 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -183,8 +183,7 @@ private[spark] class Client( private[yarn] def copyFileToRemote( destDir: Path, srcPath: Path, - replication: Short, - setPerms: Boolean = false): Path = { + replication: Short): Path = { val destFs = destDir.getFileSystem(hadoopConf) val srcFs = srcPath.getFileSystem(hadoopConf) var destPath = srcPath @@ -193,9 +192,7 @@ private[spark] class Client( logInfo(s"Uploading resource $srcPath -> $destPath") FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf) destFs.setReplication(destPath, replication) - if (setPerms) { - destFs.setPermission(destPath, new FsPermission(APP_FILE_PERMISSION)) - } + destFs.setPermission(destPath, new FsPermission(APP_FILE_PERMISSION)) } else { logInfo(s"Source and destination file systems are the same. Not copying $srcPath") } @@ -239,23 +236,22 @@ private[spark] class Client( /** * Copy the given main resource to the distributed cache if the scheme is not "local". * Otherwise, set the corresponding key in our SparkConf to handle it downstream. - * Each resource is represented by a 4-tuple of: + * Each resource is represented by a 3-tuple of: * (1) destination resource name, * (2) local path to the resource, - * (3) Spark property key to set if the scheme is not local, and - * (4) whether to set permissions for this resource + * (3) Spark property key to set if the scheme is not local */ List( - (SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR, false), - (APP_JAR, args.userJar, CONF_SPARK_USER_JAR, true), - ("log4j.properties", oldLog4jConf.orNull, null, false) - ).foreach { case (destName, _localPath, confKey, setPermissions) => + (SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR), + (APP_JAR, args.userJar, CONF_SPARK_USER_JAR), + ("log4j.properties", oldLog4jConf.orNull, null) + ).foreach { case (destName, _localPath, confKey) => val localPath: String = if (_localPath != null) _localPath.trim() else "" if (!localPath.isEmpty()) { val localURI = new URI(localPath) if (localURI.getScheme != LOCAL_SCHEME) { val src = getQualifiedLocalPath(localURI, hadoopConf) - val destPath = copyFileToRemote(dst, src, replication, setPermissions) + val destPath = copyFileToRemote(dst, src, replication) val destFs = FileSystem.get(destPath.toUri(), hadoopConf) distCacheMgr.addResource(destFs, hadoopConf, destPath, localResources, LocalResourceType.FILE, destName, statCache) @@ -707,7 +703,7 @@ object Client extends Logging { * Return the path to the given application's staging directory. */ private def getAppStagingDir(appId: ApplicationId): String = { - SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR + buildPath(SPARK_STAGING, appId.toString()) } /** @@ -783,7 +779,13 @@ object Client extends Logging { /** * Populate the classpath entry in the given environment map. - * This includes the user jar, Spark jar, and any extra application jars. + * + * User jars are generally not added to the JVM's system classpath; those are handled by the AM + * and executor backend. When the deprecated `spark.yarn.user.classpath.first` is used, user jars + * are included in the system classpath, though. The extra class path and other uploaded files are + * always made available through the system class path. + * + * @param args Client arguments (when starting the AM) or null (when starting executors). */ private[yarn] def populateClasspath( args: ClientArguments, @@ -795,48 +797,38 @@ object Client extends Logging { addClasspathEntry( YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env ) - - // Normally the users app.jar is last in case conflicts with spark jars if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) { - addUserClasspath(args, sparkConf, env) - addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env) - populateHadoopClasspath(conf, env) - } else { - addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env) - populateHadoopClasspath(conf, env) - addUserClasspath(args, sparkConf, env) + val userClassPath = + if (args != null) { + getUserClasspath(Option(args.userJar), Option(args.addJars)) + } else { + getUserClasspath(sparkConf) + } + userClassPath.foreach { x => + addFileToClasspath(x, null, env) + } } - - // Append all jar files under the working directory to the classpath. - addClasspathEntry( - YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + "*", env - ) + addFileToClasspath(new URI(sparkJar(sparkConf)), SPARK_JAR, env) + populateHadoopClasspath(conf, env) + sys.env.get(ENV_DIST_CLASSPATH).foreach(addClasspathEntry(_, env)) } /** - * Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly - * to the classpath. + * Returns a list of URIs representing the user classpath. + * + * @param conf Spark configuration. */ - private def addUserClasspath( - args: ClientArguments, - conf: SparkConf, - env: HashMap[String, String]): Unit = { - - // If `args` is not null, we are launching an AM container. - // Otherwise, we are launching executor containers. - val (mainJar, secondaryJars) = - if (args != null) { - (args.userJar, args.addJars) - } else { - (conf.get(CONF_SPARK_USER_JAR, null), conf.get(CONF_SPARK_YARN_SECONDARY_JARS, null)) - } + def getUserClasspath(conf: SparkConf): Array[URI] = { + getUserClasspath(conf.getOption(CONF_SPARK_USER_JAR), + conf.getOption(CONF_SPARK_YARN_SECONDARY_JARS)) + } - addFileToClasspath(mainJar, APP_JAR, env) - if (secondaryJars != null) { - secondaryJars.split(",").filter(_.nonEmpty).foreach { jar => - addFileToClasspath(jar, null, env) - } - } + private def getUserClasspath( + mainJar: Option[String], + secondaryJars: Option[String]): Array[URI] = { + val mainUri = mainJar.orElse(Some(APP_JAR)).map(new URI(_)) + val secondaryUris = secondaryJars.map(_.split(",")).toSeq.flatten.map(new URI(_)) + (mainUri ++ secondaryUris).toArray } /** @@ -847,27 +839,19 @@ object Client extends Logging { * * If not a "local:" file and no alternate name, the environment is not modified. * - * @param path Path to add to classpath (optional). + * @param uri URI to add to classpath (optional). * @param fileName Alternate name for the file (optional). * @param env Map holding the environment variables. */ private def addFileToClasspath( - path: String, + uri: URI, fileName: String, env: HashMap[String, String]): Unit = { - if (path != null) { - scala.util.control.Exception.ignoring(classOf[URISyntaxException]) { - val uri = new URI(path) - if (uri.getScheme == LOCAL_SCHEME) { - addClasspathEntry(uri.getPath, env) - return - } - } - } - if (fileName != null) { - addClasspathEntry( - YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + fileName, env - ) + if (uri != null && uri.getScheme == LOCAL_SCHEME) { + addClasspathEntry(uri.getPath, env) + } else if (fileName != null) { + addClasspathEntry(buildPath( + YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env) } } @@ -963,4 +947,23 @@ object Client extends Logging { new Path(qualifiedURI) } + /** + * Whether to consider jars provided by the user to have precedence over the Spark jars when + * loading user classes. + */ + def isUserClassPathFirst(conf: SparkConf, isDriver: Boolean): Boolean = { + if (isDriver) { + conf.getBoolean("spark.driver.userClassPathFirst", false) + } else { + conf.getBoolean("spark.executor.userClassPathFirst", false) + } + } + + /** + * Joins all the path components using Path.SEPARATOR. + */ + def buildPath(components: String*): String = { + components.mkString(Path.SEPARATOR) + } + } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 7cd8c5f0f9..6d5b8fda76 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.yarn +import java.io.File import java.net.URI import java.nio.ByteBuffer @@ -57,7 +58,7 @@ class ExecutorRunnable( var nmClient: NMClient = _ val yarnConf: YarnConfiguration = new YarnConfiguration(conf) lazy val env = prepareEnvironment(container) - + def run = { logInfo("Starting Executor Container") nmClient = NMClient.createNMClient() @@ -185,6 +186,16 @@ class ExecutorRunnable( // For log4j configuration to reference javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR) + val userClassPath = Client.getUserClasspath(sparkConf).flatMap { uri => + val absPath = + if (new File(uri.getPath()).isAbsolute()) { + uri.getPath() + } else { + Client.buildPath(Environment.PWD.$(), uri.getPath()) + } + Seq("--user-class-path", "file:" + absPath) + }.toSeq + val commands = prefixEnv ++ Seq( YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server", @@ -196,11 +207,13 @@ class ExecutorRunnable( "-XX:OnOutOfMemoryError='kill %p'") ++ javaOpts ++ Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend", - masterAddress.toString, - slaveId.toString, - hostname.toString, - executorCores.toString, - appId, + "--driver-url", masterAddress.toString, + "--executor-id", slaveId.toString, + "--hostname", hostname.toString, + "--cores", executorCores.toString, + "--app-id", appId) ++ + userClassPath ++ + Seq( "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") diff --git a/yarn/src/test/resources/log4j.properties b/yarn/src/test/resources/log4j.properties index 287c8e3563..aab41fa494 100644 --- a/yarn/src/test/resources/log4j.properties +++ b/yarn/src/test/resources/log4j.properties @@ -16,7 +16,7 @@ # # Set everything to be logged to the file target/unit-tests.log -log4j.rootCategory=INFO, file +log4j.rootCategory=DEBUG, file log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=true log4j.appender.file.file=target/unit-tests.log @@ -25,4 +25,4 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{ # Ignore messages below warning level from Jetty, because it's a bit verbose log4j.logger.org.eclipse.jetty=WARN -org.eclipse.jetty.LEVEL=WARN +log4j.logger.org.apache.hadoop=WARN diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 2bb3dcffd6..f8f8129d22 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -82,6 +82,7 @@ class ClientSuite extends FunSuite with Matchers { test("Local jar URIs") { val conf = new Configuration() val sparkConf = new SparkConf().set(Client.CONF_SPARK_JAR, SPARK) + .set("spark.yarn.user.classpath.first", "true") val env = new MutableHashMap[String, String]() val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) @@ -98,13 +99,10 @@ class ClientSuite extends FunSuite with Matchers { }) if (classOf[Environment].getMethods().exists(_.getName == "$$")) { cp should contain("{{PWD}}") - cp should contain(s"{{PWD}}${Path.SEPARATOR}*") } else if (Utils.isWindows) { cp should contain("%PWD%") - cp should contain(s"%PWD%${Path.SEPARATOR}*") } else { cp should contain(Environment.PWD.$()) - cp should contain(s"${Environment.PWD.$()}${File.separator}*") } cp should not contain (Client.SPARK_JAR) cp should not contain (Client.APP_JAR) @@ -117,7 +115,7 @@ class ClientSuite extends FunSuite with Matchers { val client = spy(new Client(args, conf, sparkConf)) doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]), - any(classOf[Path]), anyShort(), anyBoolean()) + any(classOf[Path]), anyShort()) val tempDir = Utils.createTempDir() try { diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index e39de82740..0e37276ba7 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -17,27 +17,34 @@ package org.apache.spark.deploy.yarn -import java.io.File +import java.io.{File, FileOutputStream, OutputStreamWriter} +import java.util.Properties import java.util.concurrent.TimeUnit import scala.collection.JavaConversions._ import scala.collection.mutable -import com.google.common.base.Charsets +import com.google.common.base.Charsets.UTF_8 +import com.google.common.io.ByteStreams import com.google.common.io.Files import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.server.MiniYARNCluster import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} -import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException} +import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, TestUtils} import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded} import org.apache.spark.util.Utils +/** + * Integration tests for YARN; these tests use a mini Yarn cluster to run Spark-on-YARN + * applications, and require the Spark assembly to be built before they can be successfully + * run. + */ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging { - // log4j configuration for the Yarn containers, so that their output is collected - // by Yarn instead of trying to overwrite unit-tests.log. + // log4j configuration for the YARN containers, so that their output is collected + // by YARN instead of trying to overwrite unit-tests.log. private val LOG4J_CONF = """ |log4j.rootCategory=DEBUG, console |log4j.appender.console=org.apache.log4j.ConsoleAppender @@ -52,13 +59,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit | |from pyspark import SparkConf , SparkContext |if __name__ == "__main__": - | if len(sys.argv) != 3: - | print >> sys.stderr, "Usage: test.py [master] [result file]" + | if len(sys.argv) != 2: + | print >> sys.stderr, "Usage: test.py [result file]" | exit(-1) - | conf = SparkConf() - | conf.setMaster(sys.argv[1]).setAppName("python test in yarn cluster mode") - | sc = SparkContext(conf=conf) - | status = open(sys.argv[2],'w') + | sc = SparkContext(conf=SparkConf()) + | status = open(sys.argv[1],'w') | result = "failure" | rdd = sc.parallelize(range(10)) | cnt = rdd.count() @@ -72,23 +77,17 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit private var yarnCluster: MiniYARNCluster = _ private var tempDir: File = _ private var fakeSparkJar: File = _ - private var oldConf: Map[String, String] = _ + private var logConfDir: File = _ override def beforeAll() { super.beforeAll() tempDir = Utils.createTempDir() - - val logConfDir = new File(tempDir, "log4j") + logConfDir = new File(tempDir, "log4j") logConfDir.mkdir() val logConfFile = new File(logConfDir, "log4j.properties") - Files.write(LOG4J_CONF, logConfFile, Charsets.UTF_8) - - val childClasspath = logConfDir.getAbsolutePath() + File.pathSeparator + - sys.props("java.class.path") - - oldConf = sys.props.filter { case (k, v) => k.startsWith("spark.") }.toMap + Files.write(LOG4J_CONF, logConfFile, UTF_8) yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1) yarnCluster.init(new YarnConfiguration()) @@ -119,99 +118,165 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit } logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}") - config.foreach { e => - sys.props += ("spark.hadoop." + e.getKey() -> e.getValue()) - } fakeSparkJar = File.createTempFile("sparkJar", null, tempDir) - val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) - sys.props += ("spark.yarn.appMasterEnv.SPARK_HOME" -> sparkHome) - sys.props += ("spark.executorEnv.SPARK_HOME" -> sparkHome) - sys.props += ("spark.yarn.jar" -> ("local:" + fakeSparkJar.getAbsolutePath())) - sys.props += ("spark.executor.instances" -> "1") - sys.props += ("spark.driver.extraClassPath" -> childClasspath) - sys.props += ("spark.executor.extraClassPath" -> childClasspath) - sys.props += ("spark.executor.extraJavaOptions" -> "-Dfoo=\"one two three\"") - sys.props += ("spark.driver.extraJavaOptions" -> "-Dfoo=\"one two three\"") } override def afterAll() { yarnCluster.stop() - sys.props.retain { case (k, v) => !k.startsWith("spark.") } - sys.props ++= oldConf super.afterAll() } test("run Spark in yarn-client mode") { - var result = File.createTempFile("result", null, tempDir) - YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath())) - checkResult(result) - - // verify log urls are present - YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info => - assert(info.logUrlMap.nonEmpty) - } + testBasicYarnApp(true) } test("run Spark in yarn-cluster mode") { - val main = YarnClusterDriver.getClass.getName().stripSuffix("$") - var result = File.createTempFile("result", null, tempDir) - - val args = Array("--class", main, - "--jar", "file:" + fakeSparkJar.getAbsolutePath(), - "--arg", "yarn-cluster", - "--arg", result.getAbsolutePath(), - "--num-executors", "1") - Client.main(args) - checkResult(result) - - // verify log urls are present. - YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info => - assert(info.logUrlMap.nonEmpty) - } + testBasicYarnApp(false) } test("run Spark in yarn-cluster mode unsuccessfully") { - val main = YarnClusterDriver.getClass.getName().stripSuffix("$") - - // Use only one argument so the driver will fail - val args = Array("--class", main, - "--jar", "file:" + fakeSparkJar.getAbsolutePath(), - "--arg", "yarn-cluster", - "--num-executors", "1") + // Don't provide arguments so the driver will fail. val exception = intercept[SparkException] { - Client.main(args) + runSpark(false, mainClassName(YarnClusterDriver.getClass)) + fail("Spark application should have failed.") } - assert(Utils.exceptionString(exception).contains("Application finished with failed status")) } test("run Python application in yarn-cluster mode") { val primaryPyFile = new File(tempDir, "test.py") - Files.write(TEST_PYFILE, primaryPyFile, Charsets.UTF_8) + Files.write(TEST_PYFILE, primaryPyFile, UTF_8) val pyFile = new File(tempDir, "test2.py") - Files.write(TEST_PYFILE, pyFile, Charsets.UTF_8) + Files.write(TEST_PYFILE, pyFile, UTF_8) var result = File.createTempFile("result", null, tempDir) - val args = Array("--class", "org.apache.spark.deploy.PythonRunner", - "--primary-py-file", primaryPyFile.getAbsolutePath(), - "--py-files", pyFile.getAbsolutePath(), - "--arg", "yarn-cluster", - "--arg", result.getAbsolutePath(), - "--name", "python test in yarn-cluster mode", - "--num-executors", "1") - Client.main(args) + // The sbt assembly does not include pyspark / py4j python dependencies, so we need to + // propagate SPARK_HOME so that those are added to PYTHONPATH. See PythonUtils.scala. + val sparkHome = sys.props("spark.test.home") + val extraConf = Map( + "spark.executorEnv.SPARK_HOME" -> sparkHome, + "spark.yarn.appMasterEnv.SPARK_HOME" -> sparkHome) + + runSpark(false, primaryPyFile.getAbsolutePath(), + sparkArgs = Seq("--py-files", pyFile.getAbsolutePath()), + appArgs = Seq(result.getAbsolutePath()), + extraConf = extraConf) checkResult(result) } + test("user class path first in client mode") { + testUseClassPathFirst(true) + } + + test("user class path first in cluster mode") { + testUseClassPathFirst(false) + } + + private def testBasicYarnApp(clientMode: Boolean): Unit = { + var result = File.createTempFile("result", null, tempDir) + runSpark(clientMode, mainClassName(YarnClusterDriver.getClass), + appArgs = Seq(result.getAbsolutePath())) + checkResult(result) + } + + private def testUseClassPathFirst(clientMode: Boolean): Unit = { + // Create a jar file that contains a different version of "test.resource". + val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir) + val userJar = TestUtils.createJarWithFiles(Map("test.resource" -> "OVERRIDDEN"), tempDir) + val driverResult = File.createTempFile("driver", null, tempDir) + val executorResult = File.createTempFile("executor", null, tempDir) + runSpark(clientMode, mainClassName(YarnClasspathTest.getClass), + appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()), + extraClassPath = Seq(originalJar.getPath()), + extraJars = Seq("local:" + userJar.getPath()), + extraConf = Map( + "spark.driver.userClassPathFirst" -> "true", + "spark.executor.userClassPathFirst" -> "true")) + checkResult(driverResult, "OVERRIDDEN") + checkResult(executorResult, "OVERRIDDEN") + } + + private def runSpark( + clientMode: Boolean, + klass: String, + appArgs: Seq[String] = Nil, + sparkArgs: Seq[String] = Nil, + extraClassPath: Seq[String] = Nil, + extraJars: Seq[String] = Nil, + extraConf: Map[String, String] = Map()): Unit = { + val master = if (clientMode) "yarn-client" else "yarn-cluster" + val props = new Properties() + + props.setProperty("spark.yarn.jar", "local:" + fakeSparkJar.getAbsolutePath()) + + val childClasspath = logConfDir.getAbsolutePath() + + File.pathSeparator + + sys.props("java.class.path") + + File.pathSeparator + + extraClassPath.mkString(File.pathSeparator) + props.setProperty("spark.driver.extraClassPath", childClasspath) + props.setProperty("spark.executor.extraClassPath", childClasspath) + + // SPARK-4267: make sure java options are propagated correctly. + props.setProperty("spark.driver.extraJavaOptions", "-Dfoo=\"one two three\"") + props.setProperty("spark.executor.extraJavaOptions", "-Dfoo=\"one two three\"") + + yarnCluster.getConfig().foreach { e => + props.setProperty("spark.hadoop." + e.getKey(), e.getValue()) + } + + sys.props.foreach { case (k, v) => + if (k.startsWith("spark.")) { + props.setProperty(k, v) + } + } + + extraConf.foreach { case (k, v) => props.setProperty(k, v) } + + val propsFile = File.createTempFile("spark", ".properties", tempDir) + val writer = new OutputStreamWriter(new FileOutputStream(propsFile), UTF_8) + props.store(writer, "Spark properties.") + writer.close() + + val extraJarArgs = if (!extraJars.isEmpty()) Seq("--jars", extraJars.mkString(",")) else Nil + val mainArgs = + if (klass.endsWith(".py")) { + Seq(klass) + } else { + Seq("--class", klass, fakeSparkJar.getAbsolutePath()) + } + val argv = + Seq( + new File(sys.props("spark.test.home"), "bin/spark-submit").getAbsolutePath(), + "--master", master, + "--num-executors", "1", + "--properties-file", propsFile.getAbsolutePath()) ++ + extraJarArgs ++ + sparkArgs ++ + mainArgs ++ + appArgs + + Utils.executeAndGetOutput(argv, + extraEnvironment = Map("YARN_CONF_DIR" -> tempDir.getAbsolutePath())) + } + /** * This is a workaround for an issue with yarn-cluster mode: the Client class will not provide * any sort of error when the job process finishes successfully, but the job itself fails. So * the tests enforce that something is written to a file after everything is ok to indicate * that the job succeeded. */ - private def checkResult(result: File) = { - var resultString = Files.toString(result, Charsets.UTF_8) - resultString should be ("success") + private def checkResult(result: File): Unit = { + checkResult(result, "success") + } + + private def checkResult(result: File, expected: String): Unit = { + var resultString = Files.toString(result, UTF_8) + resultString should be (expected) + } + + private def mainClassName(klass: Class[_]): String = { + klass.getName().stripSuffix("$") } } @@ -229,22 +294,22 @@ private object YarnClusterDriver extends Logging with Matchers { val WAIT_TIMEOUT_MILLIS = 10000 var listener: SaveExecutorInfo = null - def main(args: Array[String]) = { - if (args.length != 2) { + def main(args: Array[String]): Unit = { + if (args.length != 1) { System.err.println( s""" |Invalid command line: ${args.mkString(" ")} | - |Usage: YarnClusterDriver [master] [result file] + |Usage: YarnClusterDriver [result file] """.stripMargin) System.exit(1) } listener = new SaveExecutorInfo - val sc = new SparkContext(new SparkConf().setMaster(args(0)) + val sc = new SparkContext(new SparkConf() .setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns")) sc.addSparkListener(listener) - val status = new File(args(1)) + val status = new File(args(0)) var result = "failure" try { val data = sc.parallelize(1 to 4, 4).collect().toSet @@ -253,7 +318,48 @@ private object YarnClusterDriver extends Logging with Matchers { result = "success" } finally { sc.stop() - Files.write(result, status, Charsets.UTF_8) + Files.write(result, status, UTF_8) + } + + // verify log urls are present + listener.addedExecutorInfos.values.foreach { info => + assert(info.logUrlMap.nonEmpty) + } + } + +} + +private object YarnClasspathTest { + + def main(args: Array[String]): Unit = { + if (args.length != 2) { + System.err.println( + s""" + |Invalid command line: ${args.mkString(" ")} + | + |Usage: YarnClasspathTest [driver result file] [executor result file] + """.stripMargin) + System.exit(1) + } + + readResource(args(0)) + val sc = new SparkContext(new SparkConf()) + try { + sc.parallelize(Seq(1)).foreach { x => readResource(args(1)) } + } finally { + sc.stop() + } + } + + private def readResource(resultPath: String): Unit = { + var result = "failure" + try { + val ccl = Thread.currentThread().getContextClassLoader() + val resource = ccl.getResourceAsStream("test.resource") + val bytes = ByteStreams.toByteArray(resource) + result = new String(bytes, 0, bytes.length, UTF_8) + } finally { + Files.write(result, new File(resultPath), UTF_8) } } -- cgit v1.2.3