aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala25
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala133
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala25
-rw-r--r--yarn/src/test/resources/log4j.properties4
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala6
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala276
6 files changed, 301 insertions, 168 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 4cc320c5d5..a9bf861d16 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -19,9 +19,9 @@ package org.apache.spark.deploy.yarn
import scala.util.control.NonFatal
-import java.io.IOException
+import java.io.{File, IOException}
import java.lang.reflect.InvocationTargetException
-import java.net.Socket
+import java.net.{Socket, URL}
import java.util.concurrent.atomic.AtomicReference
import akka.actor._
@@ -38,7 +38,8 @@ import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil}
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.scheduler.cluster.YarnSchedulerBackend
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
+import org.apache.spark.util.{AkkaUtils, ChildFirstURLClassLoader, MutableURLClassLoader,
+ SignalLogger, Utils}
/**
* Common application master functionality for Spark on Yarn.
@@ -244,7 +245,6 @@ private[spark] class ApplicationMaster(
host: String,
port: String,
isClusterMode: Boolean): Unit = {
-
val driverUrl = AkkaUtils.address(
AkkaUtils.protocol(actorSystem),
SparkEnv.driverActorSystemName,
@@ -453,12 +453,24 @@ private[spark] class ApplicationMaster(
private def startUserApplication(): Thread = {
logInfo("Starting the user application in a separate Thread")
System.setProperty("spark.executor.instances", args.numExecutors.toString)
+
+ val classpath = Client.getUserClasspath(sparkConf)
+ val urls = classpath.map { entry =>
+ new URL("file:" + new File(entry.getPath()).getAbsolutePath())
+ }
+ val userClassLoader =
+ if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
+ new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
+ } else {
+ new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
+ }
+
if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
System.setProperty("spark.submit.pyFiles",
PythonRunner.formatPaths(args.pyFiles).mkString(","))
}
- val mainMethod = Class.forName(args.userClass, false,
- Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
+ val mainMethod = userClassLoader.loadClass(args.userClass)
+ .getMethod("main", classOf[Array[String]])
val userThread = new Thread {
override def run() {
@@ -483,6 +495,7 @@ private[spark] class ApplicationMaster(
}
}
}
+ userThread.setContextClassLoader(userClassLoader)
userThread.setName("Driver")
userThread.start()
userThread
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 8afc1ccdad..46d9df9348 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -183,8 +183,7 @@ private[spark] class Client(
private[yarn] def copyFileToRemote(
destDir: Path,
srcPath: Path,
- replication: Short,
- setPerms: Boolean = false): Path = {
+ replication: Short): Path = {
val destFs = destDir.getFileSystem(hadoopConf)
val srcFs = srcPath.getFileSystem(hadoopConf)
var destPath = srcPath
@@ -193,9 +192,7 @@ private[spark] class Client(
logInfo(s"Uploading resource $srcPath -> $destPath")
FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf)
destFs.setReplication(destPath, replication)
- if (setPerms) {
- destFs.setPermission(destPath, new FsPermission(APP_FILE_PERMISSION))
- }
+ destFs.setPermission(destPath, new FsPermission(APP_FILE_PERMISSION))
} else {
logInfo(s"Source and destination file systems are the same. Not copying $srcPath")
}
@@ -239,23 +236,22 @@ private[spark] class Client(
/**
* Copy the given main resource to the distributed cache if the scheme is not "local".
* Otherwise, set the corresponding key in our SparkConf to handle it downstream.
- * Each resource is represented by a 4-tuple of:
+ * Each resource is represented by a 3-tuple of:
* (1) destination resource name,
* (2) local path to the resource,
- * (3) Spark property key to set if the scheme is not local, and
- * (4) whether to set permissions for this resource
+ * (3) Spark property key to set if the scheme is not local
*/
List(
- (SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR, false),
- (APP_JAR, args.userJar, CONF_SPARK_USER_JAR, true),
- ("log4j.properties", oldLog4jConf.orNull, null, false)
- ).foreach { case (destName, _localPath, confKey, setPermissions) =>
+ (SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR),
+ (APP_JAR, args.userJar, CONF_SPARK_USER_JAR),
+ ("log4j.properties", oldLog4jConf.orNull, null)
+ ).foreach { case (destName, _localPath, confKey) =>
val localPath: String = if (_localPath != null) _localPath.trim() else ""
if (!localPath.isEmpty()) {
val localURI = new URI(localPath)
if (localURI.getScheme != LOCAL_SCHEME) {
val src = getQualifiedLocalPath(localURI, hadoopConf)
- val destPath = copyFileToRemote(dst, src, replication, setPermissions)
+ val destPath = copyFileToRemote(dst, src, replication)
val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
distCacheMgr.addResource(destFs, hadoopConf, destPath,
localResources, LocalResourceType.FILE, destName, statCache)
@@ -707,7 +703,7 @@ object Client extends Logging {
* Return the path to the given application's staging directory.
*/
private def getAppStagingDir(appId: ApplicationId): String = {
- SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
+ buildPath(SPARK_STAGING, appId.toString())
}
/**
@@ -783,7 +779,13 @@ object Client extends Logging {
/**
* Populate the classpath entry in the given environment map.
- * This includes the user jar, Spark jar, and any extra application jars.
+ *
+ * User jars are generally not added to the JVM's system classpath; those are handled by the AM
+ * and executor backend. When the deprecated `spark.yarn.user.classpath.first` is used, user jars
+ * are included in the system classpath, though. The extra class path and other uploaded files are
+ * always made available through the system class path.
+ *
+ * @param args Client arguments (when starting the AM) or null (when starting executors).
*/
private[yarn] def populateClasspath(
args: ClientArguments,
@@ -795,48 +797,38 @@ object Client extends Logging {
addClasspathEntry(
YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env
)
-
- // Normally the users app.jar is last in case conflicts with spark jars
if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) {
- addUserClasspath(args, sparkConf, env)
- addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env)
- populateHadoopClasspath(conf, env)
- } else {
- addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env)
- populateHadoopClasspath(conf, env)
- addUserClasspath(args, sparkConf, env)
+ val userClassPath =
+ if (args != null) {
+ getUserClasspath(Option(args.userJar), Option(args.addJars))
+ } else {
+ getUserClasspath(sparkConf)
+ }
+ userClassPath.foreach { x =>
+ addFileToClasspath(x, null, env)
+ }
}
-
- // Append all jar files under the working directory to the classpath.
- addClasspathEntry(
- YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + "*", env
- )
+ addFileToClasspath(new URI(sparkJar(sparkConf)), SPARK_JAR, env)
+ populateHadoopClasspath(conf, env)
+ sys.env.get(ENV_DIST_CLASSPATH).foreach(addClasspathEntry(_, env))
}
/**
- * Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly
- * to the classpath.
+ * Returns a list of URIs representing the user classpath.
+ *
+ * @param conf Spark configuration.
*/
- private def addUserClasspath(
- args: ClientArguments,
- conf: SparkConf,
- env: HashMap[String, String]): Unit = {
-
- // If `args` is not null, we are launching an AM container.
- // Otherwise, we are launching executor containers.
- val (mainJar, secondaryJars) =
- if (args != null) {
- (args.userJar, args.addJars)
- } else {
- (conf.get(CONF_SPARK_USER_JAR, null), conf.get(CONF_SPARK_YARN_SECONDARY_JARS, null))
- }
+ def getUserClasspath(conf: SparkConf): Array[URI] = {
+ getUserClasspath(conf.getOption(CONF_SPARK_USER_JAR),
+ conf.getOption(CONF_SPARK_YARN_SECONDARY_JARS))
+ }
- addFileToClasspath(mainJar, APP_JAR, env)
- if (secondaryJars != null) {
- secondaryJars.split(",").filter(_.nonEmpty).foreach { jar =>
- addFileToClasspath(jar, null, env)
- }
- }
+ private def getUserClasspath(
+ mainJar: Option[String],
+ secondaryJars: Option[String]): Array[URI] = {
+ val mainUri = mainJar.orElse(Some(APP_JAR)).map(new URI(_))
+ val secondaryUris = secondaryJars.map(_.split(",")).toSeq.flatten.map(new URI(_))
+ (mainUri ++ secondaryUris).toArray
}
/**
@@ -847,27 +839,19 @@ object Client extends Logging {
*
* If not a "local:" file and no alternate name, the environment is not modified.
*
- * @param path Path to add to classpath (optional).
+ * @param uri URI to add to classpath (optional).
* @param fileName Alternate name for the file (optional).
* @param env Map holding the environment variables.
*/
private def addFileToClasspath(
- path: String,
+ uri: URI,
fileName: String,
env: HashMap[String, String]): Unit = {
- if (path != null) {
- scala.util.control.Exception.ignoring(classOf[URISyntaxException]) {
- val uri = new URI(path)
- if (uri.getScheme == LOCAL_SCHEME) {
- addClasspathEntry(uri.getPath, env)
- return
- }
- }
- }
- if (fileName != null) {
- addClasspathEntry(
- YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + fileName, env
- )
+ if (uri != null && uri.getScheme == LOCAL_SCHEME) {
+ addClasspathEntry(uri.getPath, env)
+ } else if (fileName != null) {
+ addClasspathEntry(buildPath(
+ YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env)
}
}
@@ -963,4 +947,23 @@ object Client extends Logging {
new Path(qualifiedURI)
}
+ /**
+ * Whether to consider jars provided by the user to have precedence over the Spark jars when
+ * loading user classes.
+ */
+ def isUserClassPathFirst(conf: SparkConf, isDriver: Boolean): Boolean = {
+ if (isDriver) {
+ conf.getBoolean("spark.driver.userClassPathFirst", false)
+ } else {
+ conf.getBoolean("spark.executor.userClassPathFirst", false)
+ }
+ }
+
+ /**
+ * Joins all the path components using Path.SEPARATOR.
+ */
+ def buildPath(components: String*): String = {
+ components.mkString(Path.SEPARATOR)
+ }
+
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 7cd8c5f0f9..6d5b8fda76 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -17,6 +17,7 @@
package org.apache.spark.deploy.yarn
+import java.io.File
import java.net.URI
import java.nio.ByteBuffer
@@ -57,7 +58,7 @@ class ExecutorRunnable(
var nmClient: NMClient = _
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
lazy val env = prepareEnvironment(container)
-
+
def run = {
logInfo("Starting Executor Container")
nmClient = NMClient.createNMClient()
@@ -185,6 +186,16 @@ class ExecutorRunnable(
// For log4j configuration to reference
javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
+ val userClassPath = Client.getUserClasspath(sparkConf).flatMap { uri =>
+ val absPath =
+ if (new File(uri.getPath()).isAbsolute()) {
+ uri.getPath()
+ } else {
+ Client.buildPath(Environment.PWD.$(), uri.getPath())
+ }
+ Seq("--user-class-path", "file:" + absPath)
+ }.toSeq
+
val commands = prefixEnv ++ Seq(
YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java",
"-server",
@@ -196,11 +207,13 @@ class ExecutorRunnable(
"-XX:OnOutOfMemoryError='kill %p'") ++
javaOpts ++
Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
- masterAddress.toString,
- slaveId.toString,
- hostname.toString,
- executorCores.toString,
- appId,
+ "--driver-url", masterAddress.toString,
+ "--executor-id", slaveId.toString,
+ "--hostname", hostname.toString,
+ "--cores", executorCores.toString,
+ "--app-id", appId) ++
+ userClassPath ++
+ Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
diff --git a/yarn/src/test/resources/log4j.properties b/yarn/src/test/resources/log4j.properties
index 287c8e3563..aab41fa494 100644
--- a/yarn/src/test/resources/log4j.properties
+++ b/yarn/src/test/resources/log4j.properties
@@ -16,7 +16,7 @@
#
# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
+log4j.rootCategory=DEBUG, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=true
log4j.appender.file.file=target/unit-tests.log
@@ -25,4 +25,4 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN
-org.eclipse.jetty.LEVEL=WARN
+log4j.logger.org.apache.hadoop=WARN
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 2bb3dcffd6..f8f8129d22 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -82,6 +82,7 @@ class ClientSuite extends FunSuite with Matchers {
test("Local jar URIs") {
val conf = new Configuration()
val sparkConf = new SparkConf().set(Client.CONF_SPARK_JAR, SPARK)
+ .set("spark.yarn.user.classpath.first", "true")
val env = new MutableHashMap[String, String]()
val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
@@ -98,13 +99,10 @@ class ClientSuite extends FunSuite with Matchers {
})
if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
cp should contain("{{PWD}}")
- cp should contain(s"{{PWD}}${Path.SEPARATOR}*")
} else if (Utils.isWindows) {
cp should contain("%PWD%")
- cp should contain(s"%PWD%${Path.SEPARATOR}*")
} else {
cp should contain(Environment.PWD.$())
- cp should contain(s"${Environment.PWD.$()}${File.separator}*")
}
cp should not contain (Client.SPARK_JAR)
cp should not contain (Client.APP_JAR)
@@ -117,7 +115,7 @@ class ClientSuite extends FunSuite with Matchers {
val client = spy(new Client(args, conf, sparkConf))
doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
- any(classOf[Path]), anyShort(), anyBoolean())
+ any(classOf[Path]), anyShort())
val tempDir = Utils.createTempDir()
try {
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index e39de82740..0e37276ba7 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -17,27 +17,34 @@
package org.apache.spark.deploy.yarn
-import java.io.File
+import java.io.{File, FileOutputStream, OutputStreamWriter}
+import java.util.Properties
import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._
import scala.collection.mutable
-import com.google.common.base.Charsets
+import com.google.common.base.Charsets.UTF_8
+import com.google.common.io.ByteStreams
import com.google.common.io.Files
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.MiniYARNCluster
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
+import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, TestUtils}
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded}
import org.apache.spark.util.Utils
+/**
+ * Integration tests for YARN; these tests use a mini Yarn cluster to run Spark-on-YARN
+ * applications, and require the Spark assembly to be built before they can be successfully
+ * run.
+ */
class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging {
- // log4j configuration for the Yarn containers, so that their output is collected
- // by Yarn instead of trying to overwrite unit-tests.log.
+ // log4j configuration for the YARN containers, so that their output is collected
+ // by YARN instead of trying to overwrite unit-tests.log.
private val LOG4J_CONF = """
|log4j.rootCategory=DEBUG, console
|log4j.appender.console=org.apache.log4j.ConsoleAppender
@@ -52,13 +59,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
|
|from pyspark import SparkConf , SparkContext
|if __name__ == "__main__":
- | if len(sys.argv) != 3:
- | print >> sys.stderr, "Usage: test.py [master] [result file]"
+ | if len(sys.argv) != 2:
+ | print >> sys.stderr, "Usage: test.py [result file]"
| exit(-1)
- | conf = SparkConf()
- | conf.setMaster(sys.argv[1]).setAppName("python test in yarn cluster mode")
- | sc = SparkContext(conf=conf)
- | status = open(sys.argv[2],'w')
+ | sc = SparkContext(conf=SparkConf())
+ | status = open(sys.argv[1],'w')
| result = "failure"
| rdd = sc.parallelize(range(10))
| cnt = rdd.count()
@@ -72,23 +77,17 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
private var yarnCluster: MiniYARNCluster = _
private var tempDir: File = _
private var fakeSparkJar: File = _
- private var oldConf: Map[String, String] = _
+ private var logConfDir: File = _
override def beforeAll() {
super.beforeAll()
tempDir = Utils.createTempDir()
-
- val logConfDir = new File(tempDir, "log4j")
+ logConfDir = new File(tempDir, "log4j")
logConfDir.mkdir()
val logConfFile = new File(logConfDir, "log4j.properties")
- Files.write(LOG4J_CONF, logConfFile, Charsets.UTF_8)
-
- val childClasspath = logConfDir.getAbsolutePath() + File.pathSeparator +
- sys.props("java.class.path")
-
- oldConf = sys.props.filter { case (k, v) => k.startsWith("spark.") }.toMap
+ Files.write(LOG4J_CONF, logConfFile, UTF_8)
yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1)
yarnCluster.init(new YarnConfiguration())
@@ -119,99 +118,165 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
}
logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}")
- config.foreach { e =>
- sys.props += ("spark.hadoop." + e.getKey() -> e.getValue())
- }
fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
- val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
- sys.props += ("spark.yarn.appMasterEnv.SPARK_HOME" -> sparkHome)
- sys.props += ("spark.executorEnv.SPARK_HOME" -> sparkHome)
- sys.props += ("spark.yarn.jar" -> ("local:" + fakeSparkJar.getAbsolutePath()))
- sys.props += ("spark.executor.instances" -> "1")
- sys.props += ("spark.driver.extraClassPath" -> childClasspath)
- sys.props += ("spark.executor.extraClassPath" -> childClasspath)
- sys.props += ("spark.executor.extraJavaOptions" -> "-Dfoo=\"one two three\"")
- sys.props += ("spark.driver.extraJavaOptions" -> "-Dfoo=\"one two three\"")
}
override def afterAll() {
yarnCluster.stop()
- sys.props.retain { case (k, v) => !k.startsWith("spark.") }
- sys.props ++= oldConf
super.afterAll()
}
test("run Spark in yarn-client mode") {
- var result = File.createTempFile("result", null, tempDir)
- YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath()))
- checkResult(result)
-
- // verify log urls are present
- YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info =>
- assert(info.logUrlMap.nonEmpty)
- }
+ testBasicYarnApp(true)
}
test("run Spark in yarn-cluster mode") {
- val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
- var result = File.createTempFile("result", null, tempDir)
-
- val args = Array("--class", main,
- "--jar", "file:" + fakeSparkJar.getAbsolutePath(),
- "--arg", "yarn-cluster",
- "--arg", result.getAbsolutePath(),
- "--num-executors", "1")
- Client.main(args)
- checkResult(result)
-
- // verify log urls are present.
- YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info =>
- assert(info.logUrlMap.nonEmpty)
- }
+ testBasicYarnApp(false)
}
test("run Spark in yarn-cluster mode unsuccessfully") {
- val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
-
- // Use only one argument so the driver will fail
- val args = Array("--class", main,
- "--jar", "file:" + fakeSparkJar.getAbsolutePath(),
- "--arg", "yarn-cluster",
- "--num-executors", "1")
+ // Don't provide arguments so the driver will fail.
val exception = intercept[SparkException] {
- Client.main(args)
+ runSpark(false, mainClassName(YarnClusterDriver.getClass))
+ fail("Spark application should have failed.")
}
- assert(Utils.exceptionString(exception).contains("Application finished with failed status"))
}
test("run Python application in yarn-cluster mode") {
val primaryPyFile = new File(tempDir, "test.py")
- Files.write(TEST_PYFILE, primaryPyFile, Charsets.UTF_8)
+ Files.write(TEST_PYFILE, primaryPyFile, UTF_8)
val pyFile = new File(tempDir, "test2.py")
- Files.write(TEST_PYFILE, pyFile, Charsets.UTF_8)
+ Files.write(TEST_PYFILE, pyFile, UTF_8)
var result = File.createTempFile("result", null, tempDir)
- val args = Array("--class", "org.apache.spark.deploy.PythonRunner",
- "--primary-py-file", primaryPyFile.getAbsolutePath(),
- "--py-files", pyFile.getAbsolutePath(),
- "--arg", "yarn-cluster",
- "--arg", result.getAbsolutePath(),
- "--name", "python test in yarn-cluster mode",
- "--num-executors", "1")
- Client.main(args)
+ // The sbt assembly does not include pyspark / py4j python dependencies, so we need to
+ // propagate SPARK_HOME so that those are added to PYTHONPATH. See PythonUtils.scala.
+ val sparkHome = sys.props("spark.test.home")
+ val extraConf = Map(
+ "spark.executorEnv.SPARK_HOME" -> sparkHome,
+ "spark.yarn.appMasterEnv.SPARK_HOME" -> sparkHome)
+
+ runSpark(false, primaryPyFile.getAbsolutePath(),
+ sparkArgs = Seq("--py-files", pyFile.getAbsolutePath()),
+ appArgs = Seq(result.getAbsolutePath()),
+ extraConf = extraConf)
checkResult(result)
}
+ test("user class path first in client mode") {
+ testUseClassPathFirst(true)
+ }
+
+ test("user class path first in cluster mode") {
+ testUseClassPathFirst(false)
+ }
+
+ private def testBasicYarnApp(clientMode: Boolean): Unit = {
+ var result = File.createTempFile("result", null, tempDir)
+ runSpark(clientMode, mainClassName(YarnClusterDriver.getClass),
+ appArgs = Seq(result.getAbsolutePath()))
+ checkResult(result)
+ }
+
+ private def testUseClassPathFirst(clientMode: Boolean): Unit = {
+ // Create a jar file that contains a different version of "test.resource".
+ val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir)
+ val userJar = TestUtils.createJarWithFiles(Map("test.resource" -> "OVERRIDDEN"), tempDir)
+ val driverResult = File.createTempFile("driver", null, tempDir)
+ val executorResult = File.createTempFile("executor", null, tempDir)
+ runSpark(clientMode, mainClassName(YarnClasspathTest.getClass),
+ appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()),
+ extraClassPath = Seq(originalJar.getPath()),
+ extraJars = Seq("local:" + userJar.getPath()),
+ extraConf = Map(
+ "spark.driver.userClassPathFirst" -> "true",
+ "spark.executor.userClassPathFirst" -> "true"))
+ checkResult(driverResult, "OVERRIDDEN")
+ checkResult(executorResult, "OVERRIDDEN")
+ }
+
+ private def runSpark(
+ clientMode: Boolean,
+ klass: String,
+ appArgs: Seq[String] = Nil,
+ sparkArgs: Seq[String] = Nil,
+ extraClassPath: Seq[String] = Nil,
+ extraJars: Seq[String] = Nil,
+ extraConf: Map[String, String] = Map()): Unit = {
+ val master = if (clientMode) "yarn-client" else "yarn-cluster"
+ val props = new Properties()
+
+ props.setProperty("spark.yarn.jar", "local:" + fakeSparkJar.getAbsolutePath())
+
+ val childClasspath = logConfDir.getAbsolutePath() +
+ File.pathSeparator +
+ sys.props("java.class.path") +
+ File.pathSeparator +
+ extraClassPath.mkString(File.pathSeparator)
+ props.setProperty("spark.driver.extraClassPath", childClasspath)
+ props.setProperty("spark.executor.extraClassPath", childClasspath)
+
+ // SPARK-4267: make sure java options are propagated correctly.
+ props.setProperty("spark.driver.extraJavaOptions", "-Dfoo=\"one two three\"")
+ props.setProperty("spark.executor.extraJavaOptions", "-Dfoo=\"one two three\"")
+
+ yarnCluster.getConfig().foreach { e =>
+ props.setProperty("spark.hadoop." + e.getKey(), e.getValue())
+ }
+
+ sys.props.foreach { case (k, v) =>
+ if (k.startsWith("spark.")) {
+ props.setProperty(k, v)
+ }
+ }
+
+ extraConf.foreach { case (k, v) => props.setProperty(k, v) }
+
+ val propsFile = File.createTempFile("spark", ".properties", tempDir)
+ val writer = new OutputStreamWriter(new FileOutputStream(propsFile), UTF_8)
+ props.store(writer, "Spark properties.")
+ writer.close()
+
+ val extraJarArgs = if (!extraJars.isEmpty()) Seq("--jars", extraJars.mkString(",")) else Nil
+ val mainArgs =
+ if (klass.endsWith(".py")) {
+ Seq(klass)
+ } else {
+ Seq("--class", klass, fakeSparkJar.getAbsolutePath())
+ }
+ val argv =
+ Seq(
+ new File(sys.props("spark.test.home"), "bin/spark-submit").getAbsolutePath(),
+ "--master", master,
+ "--num-executors", "1",
+ "--properties-file", propsFile.getAbsolutePath()) ++
+ extraJarArgs ++
+ sparkArgs ++
+ mainArgs ++
+ appArgs
+
+ Utils.executeAndGetOutput(argv,
+ extraEnvironment = Map("YARN_CONF_DIR" -> tempDir.getAbsolutePath()))
+ }
+
/**
* This is a workaround for an issue with yarn-cluster mode: the Client class will not provide
* any sort of error when the job process finishes successfully, but the job itself fails. So
* the tests enforce that something is written to a file after everything is ok to indicate
* that the job succeeded.
*/
- private def checkResult(result: File) = {
- var resultString = Files.toString(result, Charsets.UTF_8)
- resultString should be ("success")
+ private def checkResult(result: File): Unit = {
+ checkResult(result, "success")
+ }
+
+ private def checkResult(result: File, expected: String): Unit = {
+ var resultString = Files.toString(result, UTF_8)
+ resultString should be (expected)
+ }
+
+ private def mainClassName(klass: Class[_]): String = {
+ klass.getName().stripSuffix("$")
}
}
@@ -229,22 +294,22 @@ private object YarnClusterDriver extends Logging with Matchers {
val WAIT_TIMEOUT_MILLIS = 10000
var listener: SaveExecutorInfo = null
- def main(args: Array[String]) = {
- if (args.length != 2) {
+ def main(args: Array[String]): Unit = {
+ if (args.length != 1) {
System.err.println(
s"""
|Invalid command line: ${args.mkString(" ")}
|
- |Usage: YarnClusterDriver [master] [result file]
+ |Usage: YarnClusterDriver [result file]
""".stripMargin)
System.exit(1)
}
listener = new SaveExecutorInfo
- val sc = new SparkContext(new SparkConf().setMaster(args(0))
+ val sc = new SparkContext(new SparkConf()
.setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
sc.addSparkListener(listener)
- val status = new File(args(1))
+ val status = new File(args(0))
var result = "failure"
try {
val data = sc.parallelize(1 to 4, 4).collect().toSet
@@ -253,7 +318,48 @@ private object YarnClusterDriver extends Logging with Matchers {
result = "success"
} finally {
sc.stop()
- Files.write(result, status, Charsets.UTF_8)
+ Files.write(result, status, UTF_8)
+ }
+
+ // verify log urls are present
+ listener.addedExecutorInfos.values.foreach { info =>
+ assert(info.logUrlMap.nonEmpty)
+ }
+ }
+
+}
+
+private object YarnClasspathTest {
+
+ def main(args: Array[String]): Unit = {
+ if (args.length != 2) {
+ System.err.println(
+ s"""
+ |Invalid command line: ${args.mkString(" ")}
+ |
+ |Usage: YarnClasspathTest [driver result file] [executor result file]
+ """.stripMargin)
+ System.exit(1)
+ }
+
+ readResource(args(0))
+ val sc = new SparkContext(new SparkConf())
+ try {
+ sc.parallelize(Seq(1)).foreach { x => readResource(args(1)) }
+ } finally {
+ sc.stop()
+ }
+ }
+
+ private def readResource(resultPath: String): Unit = {
+ var result = "failure"
+ try {
+ val ccl = Thread.currentThread().getContextClassLoader()
+ val resource = ccl.getResourceAsStream("test.resource")
+ val bytes = ByteStreams.toByteArray(resource)
+ result = new String(bytes, 0, bytes.length, UTF_8)
+ } finally {
+ Files.write(result, new File(resultPath), UTF_8)
}
}