aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala83
-rw-r--r--core/src/main/scala/org/apache/spark/TestUtils.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/Client.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala83
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala52
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala84
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala103
-rw-r--r--core/src/main/scala/org/apache/spark/util/ParentClassLoader.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/SparkConfSuite.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala27
-rw-r--r--core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala (renamed from core/src/test/scala/org/apache/spark/executor/ExecutorURLClassLoaderSuite.scala)12
-rw-r--r--docs/configuration.md31
-rw-r--r--pom.xml12
-rw-r--r--project/SparkBuild.scala8
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala25
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala133
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala25
-rw-r--r--yarn/src/test/resources/log4j.properties4
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala6
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala276
27 files changed, 736 insertions, 348 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 13aa9960ac..0dbd26146c 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -18,6 +18,7 @@
package org.apache.spark
import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.JavaConverters._
import scala.collection.mutable.LinkedHashSet
@@ -67,7 +68,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
if (value == null) {
throw new NullPointerException("null value for " + key)
}
- settings.put(key, value)
+ settings.put(translateConfKey(key, warn = true), value)
this
}
@@ -139,7 +140,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
/** Set a parameter if it isn't already configured */
def setIfMissing(key: String, value: String): SparkConf = {
- settings.putIfAbsent(key, value)
+ settings.putIfAbsent(translateConfKey(key, warn = true), value)
this
}
@@ -175,7 +176,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
/** Get a parameter as an Option */
def getOption(key: String): Option[String] = {
- Option(settings.get(key))
+ Option(settings.get(translateConfKey(key)))
}
/** Get all parameters as a list of pairs */
@@ -228,7 +229,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
def getAppId: String = get("spark.app.id")
/** Does the configuration contain a given parameter? */
- def contains(key: String): Boolean = settings.containsKey(key)
+ def contains(key: String): Boolean = settings.containsKey(translateConfKey(key))
/** Copy this object */
override def clone: SparkConf = {
@@ -285,7 +286,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
// Validate memory fractions
val memoryKeys = Seq(
"spark.storage.memoryFraction",
- "spark.shuffle.memoryFraction",
+ "spark.shuffle.memoryFraction",
"spark.shuffle.safetyFraction",
"spark.storage.unrollFraction",
"spark.storage.safetyFraction")
@@ -351,9 +352,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
def toDebugString: String = {
getAll.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")
}
+
}
-private[spark] object SparkConf {
+private[spark] object SparkConf extends Logging {
+
+ private val deprecatedConfigs: Map[String, DeprecatedConfig] = {
+ val configs = Seq(
+ DeprecatedConfig("spark.files.userClassPathFirst", "spark.executor.userClassPathFirst",
+ "1.3"),
+ DeprecatedConfig("spark.yarn.user.classpath.first", null, "1.3",
+ "Use spark.{driver,executor}.userClassPathFirst instead."))
+ configs.map { x => (x.oldName, x) }.toMap
+ }
+
/**
* Return whether the given config is an akka config (e.g. akka.actor.provider).
* Note that this does not include spark-specific akka configs (e.g. spark.akka.timeout).
@@ -380,4 +392,63 @@ private[spark] object SparkConf {
def isSparkPortConf(name: String): Boolean = {
(name.startsWith("spark.") && name.endsWith(".port")) || name.startsWith("spark.port.")
}
+
+ /**
+ * Translate the configuration key if it is deprecated and has a replacement, otherwise just
+ * returns the provided key.
+ *
+ * @param userKey Configuration key from the user / caller.
+ * @param warn Whether to print a warning if the key is deprecated. Warnings will be printed
+ * only once for each key.
+ */
+ def translateConfKey(userKey: String, warn: Boolean = false): String = {
+ deprecatedConfigs.get(userKey)
+ .map { deprecatedKey =>
+ if (warn) {
+ deprecatedKey.warn()
+ }
+ deprecatedKey.newName.getOrElse(userKey)
+ }.getOrElse(userKey)
+ }
+
+ /**
+ * Holds information about keys that have been deprecated or renamed.
+ *
+ * @param oldName Old configuration key.
+ * @param newName New configuration key, or `null` if key has no replacement, in which case the
+ * deprecated key will be used (but the warning message will still be printed).
+ * @param version Version of Spark where key was deprecated.
+ * @param deprecationMessage Message to include in the deprecation warning; mandatory when
+ * `newName` is not provided.
+ */
+ private case class DeprecatedConfig(
+ oldName: String,
+ _newName: String,
+ version: String,
+ deprecationMessage: String = null) {
+
+ private val warned = new AtomicBoolean(false)
+ val newName = Option(_newName)
+
+ if (newName == null && (deprecationMessage == null || deprecationMessage.isEmpty())) {
+ throw new IllegalArgumentException("Need new config name or deprecation message.")
+ }
+
+ def warn(): Unit = {
+ if (warned.compareAndSet(false, true)) {
+ if (newName != null) {
+ val message = Option(deprecationMessage).getOrElse(
+ s"Please use the alternative '$newName' instead.")
+ logWarning(
+ s"The configuration option '$oldName' has been replaced as of Spark $version and " +
+ s"may be removed in the future. $message")
+ } else {
+ logWarning(
+ s"The configuration option '$oldName' has been deprecated as of Spark $version and " +
+ s"may be removed in the future. $deprecationMessage")
+ }
+ }
+ }
+
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala
index be081c3825..35b324ba6f 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -17,12 +17,13 @@
package org.apache.spark
-import java.io.{File, FileInputStream, FileOutputStream}
+import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
import java.net.{URI, URL}
import java.util.jar.{JarEntry, JarOutputStream}
import scala.collection.JavaConversions._
+import com.google.common.base.Charsets.UTF_8
import com.google.common.io.{ByteStreams, Files}
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
@@ -59,6 +60,22 @@ private[spark] object TestUtils {
createJar(files1 ++ files2, jarFile)
}
+ /**
+ * Create a jar file containing multiple files. The `files` map contains a mapping of
+ * file names in the jar file to their contents.
+ */
+ def createJarWithFiles(files: Map[String, String], dir: File = null): URL = {
+ val tempDir = Option(dir).getOrElse(Utils.createTempDir())
+ val jarFile = File.createTempFile("testJar", ".jar", tempDir)
+ val jarStream = new JarOutputStream(new FileOutputStream(jarFile))
+ files.foreach { case (k, v) =>
+ val entry = new JarEntry(k)
+ jarStream.putNextEntry(entry)
+ ByteStreams.copy(new ByteArrayInputStream(v.getBytes(UTF_8)), jarStream)
+ }
+ jarStream.close()
+ jarFile.toURI.toURL
+ }
/**
* Create a jar file that contains this set of files. All files will be located at the root
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 38b3da0b13..237d26fc6b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -68,8 +68,9 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
- val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
- driverArgs.driverOptions, sys.env, classPathEntries, libraryPathEntries, javaOpts)
+ val command = new Command(mainClass,
+ Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
+ sys.env, classPathEntries, libraryPathEntries, javaOpts)
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 6d213926f3..c4bc5054d6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -37,7 +37,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}
import org.apache.spark.deploy.rest._
import org.apache.spark.executor._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
/**
* Whether to submit, kill, or request the status of an application.
@@ -467,11 +467,11 @@ object SparkSubmit {
}
val loader =
- if (sysProps.getOrElse("spark.files.userClassPathFirst", "false").toBoolean) {
- new ChildExecutorURLClassLoader(new Array[URL](0),
+ if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
+ new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
- new ExecutorURLClassLoader(new Array[URL](0),
+ new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
index b47a081053..fd514f0766 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
@@ -196,7 +196,7 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
<td sorttable_customkey={driver.desc.mem.toString}>
{Utils.megabytesToString(driver.desc.mem.toLong)}
</td>
- <td>{driver.desc.command.arguments(1)}</td>
+ <td>{driver.desc.command.arguments(2)}</td>
</tr>
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
index 2033d67e1f..6e4486e20f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
@@ -392,7 +392,7 @@ private class SubmitRequestServlet(
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = new Command(
"org.apache.spark.deploy.worker.DriverWrapper",
- Seq("{{WORKER_URL}}", mainClass) ++ appArgs, // args to the DriverWrapper
+ Seq("{{WORKER_URL}}", "{{USER_JAR}}", mainClass) ++ appArgs, // args to the DriverWrapper
environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
val actualDriverCores = driverCores.map(_.toInt).getOrElse(DEFAULT_CORES)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 28cab36c7b..b964a09bdb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -74,10 +74,15 @@ private[spark] class DriverRunner(
val driverDir = createWorkingDirectory()
val localJarFilename = downloadUserJar(driverDir)
- // Make sure user application jar is on the classpath
+ def substituteVariables(argument: String): String = argument match {
+ case "{{WORKER_URL}}" => workerUrl
+ case "{{USER_JAR}}" => localJarFilename
+ case other => other
+ }
+
// TODO: If we add ability to submit multiple jars they should also be added here
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, driverDesc.mem,
- sparkHome.getAbsolutePath, substituteVariables, Seq(localJarFilename))
+ sparkHome.getAbsolutePath, substituteVariables)
launchDriver(builder, driverDir, driverDesc.supervise)
}
catch {
@@ -111,12 +116,6 @@ private[spark] class DriverRunner(
}
}
- /** Replace variables in a command argument passed to us */
- private def substituteVariables(argument: String): String = argument match {
- case "{{WORKER_URL}}" => workerUrl
- case other => other
- }
-
/**
* Creates the working directory for this driver.
* Will throw an exception if there are errors preparing the directory.
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
index 05e242e6df..ab467a5ee8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -17,10 +17,12 @@
package org.apache.spark.deploy.worker
+import java.io.File
+
import akka.actor._
import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.{AkkaUtils, ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
/**
* Utility object for launching driver programs such that they share fate with the Worker process.
@@ -28,21 +30,31 @@ import org.apache.spark.util.{AkkaUtils, Utils}
object DriverWrapper {
def main(args: Array[String]) {
args.toList match {
- case workerUrl :: mainClass :: extraArgs =>
+ case workerUrl :: userJar :: mainClass :: extraArgs =>
val conf = new SparkConf()
val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",
Utils.localHostName(), 0, conf, new SecurityManager(conf))
actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher")
+ val currentLoader = Thread.currentThread.getContextClassLoader
+ val userJarUrl = new File(userJar).toURI().toURL()
+ val loader =
+ if (sys.props.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
+ new ChildFirstURLClassLoader(Array(userJarUrl), currentLoader)
+ } else {
+ new MutableURLClassLoader(Array(userJarUrl), currentLoader)
+ }
+ Thread.currentThread.setContextClassLoader(loader)
+
// Delegate to supplied main class
- val clazz = Class.forName(args(1))
+ val clazz = Class.forName(mainClass, true, loader)
val mainMethod = clazz.getMethod("main", classOf[Array[String]])
mainMethod.invoke(null, extraArgs.toArray[String])
actorSystem.shutdown()
case _ =>
- System.err.println("Usage: DriverWrapper <workerUrl> <driverMainClass> [options]")
+ System.err.println("Usage: DriverWrapper <workerUrl> <userJar> <driverMainClass> [options]")
System.exit(-1)
}
}
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 3a42f8b157..dd19e4947d 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -17,8 +17,10 @@
package org.apache.spark.executor
+import java.net.URL
import java.nio.ByteBuffer
+import scala.collection.mutable
import scala.concurrent.Await
import akka.actor.{Actor, ActorSelection, Props}
@@ -38,6 +40,7 @@ private[spark] class CoarseGrainedExecutorBackend(
executorId: String,
hostPort: String,
cores: Int,
+ userClassPath: Seq[URL],
env: SparkEnv)
extends Actor with ActorLogReceive with ExecutorBackend with Logging {
@@ -63,7 +66,7 @@ private[spark] class CoarseGrainedExecutorBackend(
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
val (hostname, _) = Utils.parseHostPort(hostPort)
- executor = new Executor(executorId, hostname, env, isLocal = false)
+ executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
@@ -117,7 +120,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
hostname: String,
cores: Int,
appId: String,
- workerUrl: Option[String]) {
+ workerUrl: Option[String],
+ userClassPath: Seq[URL]) {
SignalLogger.register(log)
@@ -162,7 +166,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
val sparkHostPort = hostname + ":" + boundPort
env.actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend],
- driverUrl, executorId, sparkHostPort, cores, env),
+ driverUrl, executorId, sparkHostPort, cores, userClassPath, env),
name = "Executor")
workerUrl.foreach { url =>
env.actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
@@ -172,20 +176,69 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}
def main(args: Array[String]) {
- args.length match {
- case x if x < 5 =>
- System.err.println(
+ var driverUrl: String = null
+ var executorId: String = null
+ var hostname: String = null
+ var cores: Int = 0
+ var appId: String = null
+ var workerUrl: Option[String] = None
+ val userClassPath = new mutable.ListBuffer[URL]()
+
+ var argv = args.toList
+ while (!argv.isEmpty) {
+ argv match {
+ case ("--driver-url") :: value :: tail =>
+ driverUrl = value
+ argv = tail
+ case ("--executor-id") :: value :: tail =>
+ executorId = value
+ argv = tail
+ case ("--hostname") :: value :: tail =>
+ hostname = value
+ argv = tail
+ case ("--cores") :: value :: tail =>
+ cores = value.toInt
+ argv = tail
+ case ("--app-id") :: value :: tail =>
+ appId = value
+ argv = tail
+ case ("--worker-url") :: value :: tail =>
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
- "Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> " +
- "<cores> <appid> [<workerUrl>] ")
- System.exit(1)
+ workerUrl = Some(value)
+ argv = tail
+ case ("--user-class-path") :: value :: tail =>
+ userClassPath += new URL(value)
+ argv = tail
+ case Nil =>
+ case tail =>
+ System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
+ printUsageAndExit()
+ }
+ }
- // NB: These arguments are provided by SparkDeploySchedulerBackend (for standalone mode)
- // and CoarseMesosSchedulerBackend (for mesos mode).
- case 5 =>
- run(args(0), args(1), args(2), args(3).toInt, args(4), None)
- case x if x > 5 =>
- run(args(0), args(1), args(2), args(3).toInt, args(4), Some(args(5)))
+ if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
+ appId == null) {
+ printUsageAndExit()
}
+
+ run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
}
+
+ private def printUsageAndExit() = {
+ System.err.println(
+ """
+ |"Usage: CoarseGrainedExecutorBackend [options]
+ |
+ | Options are:
+ | --driver-url <driverUrl>
+ | --executor-id <executorId>
+ | --hostname <hostname>
+ | --cores <cores>
+ | --app-id <appid>
+ | --worker-url <workerUrl>
+ | --user-class-path <url>
+ |""".stripMargin)
+ System.exit(1)
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 5141483d1e..6b22dcd6f5 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -19,6 +19,7 @@ package org.apache.spark.executor
import java.io.File
import java.lang.management.ManagementFactory
+import java.net.URL
import java.nio.ByteBuffer
import java.util.concurrent._
@@ -33,7 +34,8 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
-import org.apache.spark.util.{SparkUncaughtExceptionHandler, AkkaUtils, Utils}
+import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader,
+ SparkUncaughtExceptionHandler, AkkaUtils, Utils}
/**
* Spark executor used with Mesos, YARN, and the standalone scheduler.
@@ -43,6 +45,7 @@ private[spark] class Executor(
executorId: String,
executorHostname: String,
env: SparkEnv,
+ userClassPath: Seq[URL] = Nil,
isLocal: Boolean = false)
extends Logging
{
@@ -288,17 +291,23 @@ private[spark] class Executor(
* created by the interpreter to the search path
*/
private def createClassLoader(): MutableURLClassLoader = {
+ // Bootstrap the list of jars with the user class path.
+ val now = System.currentTimeMillis()
+ userClassPath.foreach { url =>
+ currentJars(url.getPath().split("/").last) = now
+ }
+
val currentLoader = Utils.getContextOrSparkClassLoader
// For each of the jars in the jarSet, add them to the class loader.
// We assume each of the files has already been fetched.
- val urls = currentJars.keySet.map { uri =>
+ val urls = userClassPath.toArray ++ currentJars.keySet.map { uri =>
new File(uri.split("/").last).toURI.toURL
- }.toArray
- val userClassPathFirst = conf.getBoolean("spark.files.userClassPathFirst", false)
- userClassPathFirst match {
- case true => new ChildExecutorURLClassLoader(urls, currentLoader)
- case false => new ExecutorURLClassLoader(urls, currentLoader)
+ }
+ if (conf.getBoolean("spark.executor.userClassPathFirst", false)) {
+ new ChildFirstURLClassLoader(urls, currentLoader)
+ } else {
+ new MutableURLClassLoader(urls, currentLoader)
}
}
@@ -311,7 +320,7 @@ private[spark] class Executor(
if (classUri != null) {
logInfo("Using REPL class URI: " + classUri)
val userClassPathFirst: java.lang.Boolean =
- conf.getBoolean("spark.files.userClassPathFirst", false)
+ conf.getBoolean("spark.executor.userClassPathFirst", false)
try {
val klass = Class.forName("org.apache.spark.repl.ExecutorClassLoader")
.asInstanceOf[Class[_ <: ClassLoader]]
@@ -344,18 +353,23 @@ private[spark] class Executor(
env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
currentFiles(name) = timestamp
}
- for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
- logInfo("Fetching " + name + " with timestamp " + timestamp)
- // Fetch file with useCache mode, close cache for local mode.
- Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf,
- env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
- currentJars(name) = timestamp
- // Add it to our class loader
+ for ((name, timestamp) <- newJars) {
val localName = name.split("/").last
- val url = new File(SparkFiles.getRootDirectory, localName).toURI.toURL
- if (!urlClassLoader.getURLs.contains(url)) {
- logInfo("Adding " + url + " to class loader")
- urlClassLoader.addURL(url)
+ val currentTimeStamp = currentJars.get(name)
+ .orElse(currentJars.get(localName))
+ .getOrElse(-1L)
+ if (currentTimeStamp < timestamp) {
+ logInfo("Fetching " + name + " with timestamp " + timestamp)
+ // Fetch file with useCache mode, close cache for local mode.
+ Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf,
+ env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
+ currentJars(name) = timestamp
+ // Add it to our class loader
+ val url = new File(SparkFiles.getRootDirectory, localName).toURI.toURL
+ if (!urlClassLoader.getURLs.contains(url)) {
+ logInfo("Adding " + url + " to class loader")
+ urlClassLoader.addURL(url)
+ }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala
deleted file mode 100644
index 8011e75944..0000000000
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.executor
-
-import java.net.{URLClassLoader, URL}
-
-import org.apache.spark.util.ParentClassLoader
-
-/**
- * The addURL method in URLClassLoader is protected. We subclass it to make this accessible.
- * We also make changes so user classes can come before the default classes.
- */
-
-private[spark] trait MutableURLClassLoader extends ClassLoader {
- def addURL(url: URL)
- def getURLs: Array[URL]
-}
-
-private[spark] class ChildExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
- extends MutableURLClassLoader {
-
- private object userClassLoader extends URLClassLoader(urls, null){
- override def addURL(url: URL) {
- super.addURL(url)
- }
- override def findClass(name: String): Class[_] = {
- val loaded = super.findLoadedClass(name)
- if (loaded != null) {
- return loaded
- }
- try {
- super.findClass(name)
- } catch {
- case e: ClassNotFoundException => {
- parentClassLoader.loadClass(name)
- }
- }
- }
- }
-
- private val parentClassLoader = new ParentClassLoader(parent)
-
- override def findClass(name: String): Class[_] = {
- try {
- userClassLoader.findClass(name)
- } catch {
- case e: ClassNotFoundException => {
- parentClassLoader.loadClass(name)
- }
- }
- }
-
- def addURL(url: URL) {
- userClassLoader.addURL(url)
- }
-
- def getURLs() = {
- userClassLoader.getURLs()
- }
-}
-
-private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
- extends URLClassLoader(urls, parent) with MutableURLClassLoader {
-
- override def addURL(url: URL) {
- super.addURL(url)
- }
-}
-
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index d2e1680a5f..40fc6b59cd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -52,8 +52,13 @@ private[spark] class SparkDeploySchedulerBackend(
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
- val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{APP_ID}}",
- "{{WORKER_URL}}")
+ val args = Seq(
+ "--driver-url", driverUrl,
+ "--executor-id", "{{EXECUTOR_ID}}",
+ "--hostname", "{{HOSTNAME}}",
+ "--cores", "{{CORES}}",
+ "--app-id", "{{APP_ID}}",
+ "--worker-url", "{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 0d1c2a916c..90dfe14352 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -154,18 +154,25 @@ private[spark] class CoarseMesosSchedulerBackend(
if (uri == null) {
val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath
command.setValue(
- "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s".format(
- prefixEnv, runScript, driverUrl, offer.getSlaveId.getValue,
- offer.getHostname, numCores, appId))
+ "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
+ .format(prefixEnv, runScript) +
+ s" --driver-url $driverUrl" +
+ s" --executor-id ${offer.getSlaveId.getValue}" +
+ s" --hostname ${offer.getHostname}" +
+ s" --cores $numCores" +
+ s" --app-id $appId")
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
command.setValue(
- ("cd %s*; %s " +
- "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s")
- .format(basename, prefixEnv, driverUrl, offer.getSlaveId.getValue,
- offer.getHostname, numCores, appId))
+ s"cd $basename*; $prefixEnv " +
+ "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
+ s" --driver-url $driverUrl" +
+ s" --executor-id ${offer.getSlaveId.getValue}" +
+ s" --hostname ${offer.getHostname}" +
+ s" --cores $numCores" +
+ s" --app-id $appId")
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
command.build()
diff --git a/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala b/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala
new file mode 100644
index 0000000000..d9c7103b2f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.net.{URLClassLoader, URL}
+import java.util.Enumeration
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.util.ParentClassLoader
+
+/**
+ * URL class loader that exposes the `addURL` and `getURLs` methods in URLClassLoader.
+ */
+private[spark] class MutableURLClassLoader(urls: Array[URL], parent: ClassLoader)
+ extends URLClassLoader(urls, parent) {
+
+ override def addURL(url: URL): Unit = {
+ super.addURL(url)
+ }
+
+ override def getURLs(): Array[URL] = {
+ super.getURLs()
+ }
+
+}
+
+/**
+ * A mutable class loader that gives preference to its own URLs over the parent class loader
+ * when loading classes and resources.
+ */
+private[spark] class ChildFirstURLClassLoader(urls: Array[URL], parent: ClassLoader)
+ extends MutableURLClassLoader(urls, null) {
+
+ private val parentClassLoader = new ParentClassLoader(parent)
+
+ /**
+ * Used to implement fine-grained class loading locks similar to what is done by Java 7. This
+ * prevents deadlock issues when using non-hierarchical class loaders.
+ *
+ * Note that due to Java 6 compatibility (and some issues with implementing class loaders in
+ * Scala), Java 7's `ClassLoader.registerAsParallelCapable` method is not called.
+ */
+ private val locks = new ConcurrentHashMap[String, Object]()
+
+ override def loadClass(name: String, resolve: Boolean): Class[_] = {
+ var lock = locks.get(name)
+ if (lock == null) {
+ val newLock = new Object()
+ lock = locks.putIfAbsent(name, newLock)
+ if (lock == null) {
+ lock = newLock
+ }
+ }
+
+ lock.synchronized {
+ try {
+ super.loadClass(name, resolve)
+ } catch {
+ case e: ClassNotFoundException =>
+ parentClassLoader.loadClass(name, resolve)
+ }
+ }
+ }
+
+ override def getResource(name: String): URL = {
+ val url = super.findResource(name)
+ val res = if (url != null) url else parentClassLoader.getResource(name)
+ res
+ }
+
+ override def getResources(name: String): Enumeration[URL] = {
+ val urls = super.findResources(name)
+ val res =
+ if (urls != null && urls.hasMoreElements()) {
+ urls
+ } else {
+ parentClassLoader.getResources(name)
+ }
+ res
+ }
+
+ override def addURL(url: URL) {
+ super.addURL(url)
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/util/ParentClassLoader.scala b/core/src/main/scala/org/apache/spark/util/ParentClassLoader.scala
index 3abc12681f..6d8d9e8da3 100644
--- a/core/src/main/scala/org/apache/spark/util/ParentClassLoader.scala
+++ b/core/src/main/scala/org/apache/spark/util/ParentClassLoader.scala
@@ -18,7 +18,7 @@
package org.apache.spark.util
/**
- * A class loader which makes findClass accesible to the child
+ * A class loader which makes some protected methods in ClassLoader accesible.
*/
private[spark] class ParentClassLoader(parent: ClassLoader) extends ClassLoader(parent) {
@@ -29,4 +29,9 @@ private[spark] class ParentClassLoader(parent: ClassLoader) extends ClassLoader(
override def loadClass(name: String): Class[_] = {
super.loadClass(name)
}
+
+ override def loadClass(name: String, resolve: Boolean): Class[_] = {
+ super.loadClass(name, resolve)
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index e08210ae60..ea6b73bc68 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -197,6 +197,18 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro
serializer.newInstance().serialize(new StringBuffer())
}
+ test("deprecated config keys") {
+ val conf = new SparkConf()
+ .set("spark.files.userClassPathFirst", "true")
+ .set("spark.yarn.user.classpath.first", "true")
+ assert(conf.contains("spark.files.userClassPathFirst"))
+ assert(conf.contains("spark.executor.userClassPathFirst"))
+ assert(conf.contains("spark.yarn.user.classpath.first"))
+ assert(conf.getBoolean("spark.files.userClassPathFirst", false))
+ assert(conf.getBoolean("spark.executor.userClassPathFirst", false))
+ assert(conf.getBoolean("spark.yarn.user.classpath.first", false))
+ }
+
}
class Class1 {}
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 1ddccae126..46d745c4ec 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -21,6 +21,8 @@ import java.io._
import scala.collection.mutable.ArrayBuffer
+import com.google.common.base.Charsets.UTF_8
+import com.google.common.io.ByteStreams
import org.scalatest.FunSuite
import org.scalatest.Matchers
import org.scalatest.concurrent.Timeouts
@@ -450,6 +452,19 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
}
+ test("user classpath first in driver") {
+ val systemJar = TestUtils.createJarWithFiles(Map("test.resource" -> "SYSTEM"))
+ val userJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"))
+ val args = Seq(
+ "--class", UserClasspathFirstTest.getClass.getName.stripSuffix("$"),
+ "--name", "testApp",
+ "--master", "local",
+ "--conf", "spark.driver.extraClassPath=" + systemJar,
+ "--conf", "spark.driver.userClassPathFirst=true",
+ userJar.toString)
+ runSparkSubmit(args)
+ }
+
test("SPARK_CONF_DIR overrides spark-defaults.conf") {
forConfDir(Map("spark.executor.memory" -> "2.3g")) { path =>
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
@@ -541,3 +556,15 @@ object SimpleApplicationTest {
}
}
}
+
+object UserClasspathFirstTest {
+ def main(args: Array[String]) {
+ val ccl = Thread.currentThread().getContextClassLoader()
+ val resource = ccl.getResourceAsStream("test.resource")
+ val bytes = ByteStreams.toByteArray(resource)
+ val contents = new String(bytes, 0, bytes.length, UTF_8)
+ if (contents != "USER") {
+ throw new SparkException("Should have read user resource, but instead read: " + contents)
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorURLClassLoaderSuite.scala b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
index b7912c09d1..31e3b7e7bb 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorURLClassLoaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.executor
+package org.apache.spark.util
import java.net.URLClassLoader
@@ -24,7 +24,7 @@ import org.scalatest.FunSuite
import org.apache.spark.{LocalSparkContext, SparkContext, SparkException, TestUtils}
import org.apache.spark.util.Utils
-class ExecutorURLClassLoaderSuite extends FunSuite {
+class MutableURLClassLoaderSuite extends FunSuite {
val urls2 = List(TestUtils.createJarWithClasses(
classNames = Seq("FakeClass1", "FakeClass2", "FakeClass3"),
@@ -37,7 +37,7 @@ class ExecutorURLClassLoaderSuite extends FunSuite {
test("child first") {
val parentLoader = new URLClassLoader(urls2, null)
- val classLoader = new ChildExecutorURLClassLoader(urls, parentLoader)
+ val classLoader = new ChildFirstURLClassLoader(urls, parentLoader)
val fakeClass = classLoader.loadClass("FakeClass2").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "1")
@@ -47,7 +47,7 @@ class ExecutorURLClassLoaderSuite extends FunSuite {
test("parent first") {
val parentLoader = new URLClassLoader(urls2, null)
- val classLoader = new ExecutorURLClassLoader(urls, parentLoader)
+ val classLoader = new MutableURLClassLoader(urls, parentLoader)
val fakeClass = classLoader.loadClass("FakeClass1").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "2")
@@ -57,7 +57,7 @@ class ExecutorURLClassLoaderSuite extends FunSuite {
test("child first can fall back") {
val parentLoader = new URLClassLoader(urls2, null)
- val classLoader = new ChildExecutorURLClassLoader(urls, parentLoader)
+ val classLoader = new ChildFirstURLClassLoader(urls, parentLoader)
val fakeClass = classLoader.loadClass("FakeClass3").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "2")
@@ -65,7 +65,7 @@ class ExecutorURLClassLoaderSuite extends FunSuite {
test("child first can fail") {
val parentLoader = new URLClassLoader(urls2, null)
- val classLoader = new ChildExecutorURLClassLoader(urls, parentLoader)
+ val classLoader = new ChildFirstURLClassLoader(urls, parentLoader)
intercept[java.lang.ClassNotFoundException] {
classLoader.loadClass("FakeClassDoesNotExist").newInstance()
}
diff --git a/docs/configuration.md b/docs/configuration.md
index 00e973c245..eb0d6d33c9 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -231,6 +231,15 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td><code>spark.driver.userClassPathFirst</code></td>
+ <td>false</td>
+ <td>
+ (Experimental) Whether to give user-added jars precedence over Spark's own jars when loading
+ classes in the the driver. This feature can be used to mitigate conflicts between Spark's
+ dependencies and user dependencies. It is currently an experimental feature.
+ </td>
+</tr>
+<tr>
<td><code>spark.executor.extraJavaOptions</code></td>
<td>(none)</td>
<td>
@@ -297,13 +306,11 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
- <td><code>spark.files.userClassPathFirst</code></td>
+ <td><code>spark.executor.userClassPathFirst</code></td>
<td>false</td>
<td>
- (Experimental) Whether to give user-added jars precedence over Spark's own jars when
- loading classes in Executors. This feature can be used to mitigate conflicts between
- Spark's dependencies and user dependencies. It is currently an experimental feature.
- (Currently, this setting does not work for YARN, see <a href="https://issues.apache.org/jira/browse/SPARK-2996">SPARK-2996</a> for more details).
+ (Experimental) Same functionality as <code>spark.driver.userClassPathFirst</code>, but
+ applied to executor instances.
</td>
</tr>
<tr>
@@ -865,8 +872,8 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.network.timeout</code></td>
<td>120</td>
<td>
- Default timeout for all network interactions, in seconds. This config will be used in
- place of <code>spark.core.connection.ack.wait.timeout</code>, <code>spark.akka.timeout</code>,
+ Default timeout for all network interactions, in seconds. This config will be used in
+ place of <code>spark.core.connection.ack.wait.timeout</code>, <code>spark.akka.timeout</code>,
<code>spark.storage.blockManagerSlaveTimeoutMs</code> or
<code>spark.shuffle.io.connectionTimeout</code>, if they are not configured.
</td>
@@ -911,8 +918,8 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.shuffle.io.preferDirectBufs</code></td>
<td>true</td>
<td>
- (Netty only) Off-heap buffers are used to reduce garbage collection during shuffle and cache
- block transfer. For environments where off-heap memory is tightly limited, users may wish to
+ (Netty only) Off-heap buffers are used to reduce garbage collection during shuffle and cache
+ block transfer. For environments where off-heap memory is tightly limited, users may wish to
turn this off to force all allocations from Netty to be on-heap.
</td>
</tr>
@@ -920,7 +927,7 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.shuffle.io.numConnectionsPerPeer</code></td>
<td>1</td>
<td>
- (Netty only) Connections between hosts are reused in order to reduce connection buildup for
+ (Netty only) Connections between hosts are reused in order to reduce connection buildup for
large clusters. For clusters with many hard disks and few hosts, this may result in insufficient
concurrency to saturate all disks, and so users may consider increasing this value.
</td>
@@ -930,7 +937,7 @@ Apart from these, the following properties are also available, and may be useful
<td>3</td>
<td>
(Netty only) Fetches that fail due to IO-related exceptions are automatically retried if this is
- set to a non-zero value. This retry logic helps stabilize large shuffles in the face of long GC
+ set to a non-zero value. This retry logic helps stabilize large shuffles in the face of long GC
pauses or transient network connectivity issues.
</td>
</tr>
@@ -939,7 +946,7 @@ Apart from these, the following properties are also available, and may be useful
<td>5</td>
<td>
(Netty only) Seconds to wait between retries of fetches. The maximum delay caused by retrying
- is simply <code>maxRetries * retryWait</code>, by default 15 seconds.
+ is simply <code>maxRetries * retryWait</code>, by default 15 seconds.
</td>
</tr>
</table>
diff --git a/pom.xml b/pom.xml
index f6f176d200..a9e968af25 100644
--- a/pom.xml
+++ b/pom.xml
@@ -342,7 +342,7 @@
</exclusion>
</exclusions>
</dependency>
-
+
<!-- Shaded deps marked as provided. These are promoted to compile scope
in the modules where we want the shaded classes to appear in the
associated jar. -->
@@ -395,7 +395,7 @@
<scope>provided</scope>
</dependency>
<!-- End of shaded deps -->
-
+
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
@@ -1178,13 +1178,19 @@
</includes>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<argLine>-Xmx3g -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m</argLine>
+ <environmentVariables>
+ <!--
+ Setting SPARK_DIST_CLASSPATH is a simple way to make sure any child processes
+ launched by the tests have access to the correct test-time classpath.
+ -->
+ <SPARK_DIST_CLASSPATH>${test_classpath}</SPARK_DIST_CLASSPATH>
+ </environmentVariables>
<systemProperties>
<java.awt.headless>true</java.awt.headless>
<spark.test.home>${session.executionRootDirectory}</spark.test.home>
<spark.testing>1</spark.testing>
<spark.ui.enabled>false</spark.ui.enabled>
<spark.ui.showConsoleProgress>false</spark.ui.showConsoleProgress>
- <spark.executor.extraClassPath>${test_classpath}</spark.executor.extraClassPath>
<spark.driver.allowMultipleContexts>true</spark.driver.allowMultipleContexts>
</systemProperties>
<failIfNoTests>false</failIfNoTests>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 95f8dfa3d2..8fb1239b4a 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -411,6 +411,10 @@ object TestSettings {
lazy val settings = Seq (
// Fork new JVMs for tests and set Java options for those
fork := true,
+ // Setting SPARK_DIST_CLASSPATH is a simple way to make sure any child processes
+ // launched by the tests have access to the correct test-time classpath.
+ envVars in Test += ("SPARK_DIST_CLASSPATH" ->
+ (fullClasspath in Test).value.files.map(_.getAbsolutePath).mkString(":").stripSuffix(":")),
javaOptions in Test += "-Dspark.test.home=" + sparkHome,
javaOptions in Test += "-Dspark.testing=1",
javaOptions in Test += "-Dspark.port.maxRetries=100",
@@ -423,10 +427,6 @@ object TestSettings {
javaOptions in Test += "-ea",
javaOptions in Test ++= "-Xmx3g -XX:PermSize=128M -XX:MaxNewSize=256m -XX:MaxPermSize=1g"
.split(" ").toSeq,
- // This places test scope jars on the classpath of executors during tests.
- javaOptions in Test +=
- "-Dspark.executor.extraClassPath=" + (fullClasspath in Test).value.files.
- map(_.getAbsolutePath).mkString(":").stripSuffix(":"),
javaOptions += "-Xmx3g",
// Show full stack trace and duration in test cases.
testOptions in Test += Tests.Argument("-oDF"),
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 4cc320c5d5..a9bf861d16 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -19,9 +19,9 @@ package org.apache.spark.deploy.yarn
import scala.util.control.NonFatal
-import java.io.IOException
+import java.io.{File, IOException}
import java.lang.reflect.InvocationTargetException
-import java.net.Socket
+import java.net.{Socket, URL}
import java.util.concurrent.atomic.AtomicReference
import akka.actor._
@@ -38,7 +38,8 @@ import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil}
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.scheduler.cluster.YarnSchedulerBackend
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
+import org.apache.spark.util.{AkkaUtils, ChildFirstURLClassLoader, MutableURLClassLoader,
+ SignalLogger, Utils}
/**
* Common application master functionality for Spark on Yarn.
@@ -244,7 +245,6 @@ private[spark] class ApplicationMaster(
host: String,
port: String,
isClusterMode: Boolean): Unit = {
-
val driverUrl = AkkaUtils.address(
AkkaUtils.protocol(actorSystem),
SparkEnv.driverActorSystemName,
@@ -453,12 +453,24 @@ private[spark] class ApplicationMaster(
private def startUserApplication(): Thread = {
logInfo("Starting the user application in a separate Thread")
System.setProperty("spark.executor.instances", args.numExecutors.toString)
+
+ val classpath = Client.getUserClasspath(sparkConf)
+ val urls = classpath.map { entry =>
+ new URL("file:" + new File(entry.getPath()).getAbsolutePath())
+ }
+ val userClassLoader =
+ if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
+ new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
+ } else {
+ new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
+ }
+
if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
System.setProperty("spark.submit.pyFiles",
PythonRunner.formatPaths(args.pyFiles).mkString(","))
}
- val mainMethod = Class.forName(args.userClass, false,
- Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
+ val mainMethod = userClassLoader.loadClass(args.userClass)
+ .getMethod("main", classOf[Array[String]])
val userThread = new Thread {
override def run() {
@@ -483,6 +495,7 @@ private[spark] class ApplicationMaster(
}
}
}
+ userThread.setContextClassLoader(userClassLoader)
userThread.setName("Driver")
userThread.start()
userThread
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 8afc1ccdad..46d9df9348 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -183,8 +183,7 @@ private[spark] class Client(
private[yarn] def copyFileToRemote(
destDir: Path,
srcPath: Path,
- replication: Short,
- setPerms: Boolean = false): Path = {
+ replication: Short): Path = {
val destFs = destDir.getFileSystem(hadoopConf)
val srcFs = srcPath.getFileSystem(hadoopConf)
var destPath = srcPath
@@ -193,9 +192,7 @@ private[spark] class Client(
logInfo(s"Uploading resource $srcPath -> $destPath")
FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf)
destFs.setReplication(destPath, replication)
- if (setPerms) {
- destFs.setPermission(destPath, new FsPermission(APP_FILE_PERMISSION))
- }
+ destFs.setPermission(destPath, new FsPermission(APP_FILE_PERMISSION))
} else {
logInfo(s"Source and destination file systems are the same. Not copying $srcPath")
}
@@ -239,23 +236,22 @@ private[spark] class Client(
/**
* Copy the given main resource to the distributed cache if the scheme is not "local".
* Otherwise, set the corresponding key in our SparkConf to handle it downstream.
- * Each resource is represented by a 4-tuple of:
+ * Each resource is represented by a 3-tuple of:
* (1) destination resource name,
* (2) local path to the resource,
- * (3) Spark property key to set if the scheme is not local, and
- * (4) whether to set permissions for this resource
+ * (3) Spark property key to set if the scheme is not local
*/
List(
- (SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR, false),
- (APP_JAR, args.userJar, CONF_SPARK_USER_JAR, true),
- ("log4j.properties", oldLog4jConf.orNull, null, false)
- ).foreach { case (destName, _localPath, confKey, setPermissions) =>
+ (SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR),
+ (APP_JAR, args.userJar, CONF_SPARK_USER_JAR),
+ ("log4j.properties", oldLog4jConf.orNull, null)
+ ).foreach { case (destName, _localPath, confKey) =>
val localPath: String = if (_localPath != null) _localPath.trim() else ""
if (!localPath.isEmpty()) {
val localURI = new URI(localPath)
if (localURI.getScheme != LOCAL_SCHEME) {
val src = getQualifiedLocalPath(localURI, hadoopConf)
- val destPath = copyFileToRemote(dst, src, replication, setPermissions)
+ val destPath = copyFileToRemote(dst, src, replication)
val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
distCacheMgr.addResource(destFs, hadoopConf, destPath,
localResources, LocalResourceType.FILE, destName, statCache)
@@ -707,7 +703,7 @@ object Client extends Logging {
* Return the path to the given application's staging directory.
*/
private def getAppStagingDir(appId: ApplicationId): String = {
- SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
+ buildPath(SPARK_STAGING, appId.toString())
}
/**
@@ -783,7 +779,13 @@ object Client extends Logging {
/**
* Populate the classpath entry in the given environment map.
- * This includes the user jar, Spark jar, and any extra application jars.
+ *
+ * User jars are generally not added to the JVM's system classpath; those are handled by the AM
+ * and executor backend. When the deprecated `spark.yarn.user.classpath.first` is used, user jars
+ * are included in the system classpath, though. The extra class path and other uploaded files are
+ * always made available through the system class path.
+ *
+ * @param args Client arguments (when starting the AM) or null (when starting executors).
*/
private[yarn] def populateClasspath(
args: ClientArguments,
@@ -795,48 +797,38 @@ object Client extends Logging {
addClasspathEntry(
YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env
)
-
- // Normally the users app.jar is last in case conflicts with spark jars
if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) {
- addUserClasspath(args, sparkConf, env)
- addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env)
- populateHadoopClasspath(conf, env)
- } else {
- addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env)
- populateHadoopClasspath(conf, env)
- addUserClasspath(args, sparkConf, env)
+ val userClassPath =
+ if (args != null) {
+ getUserClasspath(Option(args.userJar), Option(args.addJars))
+ } else {
+ getUserClasspath(sparkConf)
+ }
+ userClassPath.foreach { x =>
+ addFileToClasspath(x, null, env)
+ }
}
-
- // Append all jar files under the working directory to the classpath.
- addClasspathEntry(
- YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + "*", env
- )
+ addFileToClasspath(new URI(sparkJar(sparkConf)), SPARK_JAR, env)
+ populateHadoopClasspath(conf, env)
+ sys.env.get(ENV_DIST_CLASSPATH).foreach(addClasspathEntry(_, env))
}
/**
- * Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly
- * to the classpath.
+ * Returns a list of URIs representing the user classpath.
+ *
+ * @param conf Spark configuration.
*/
- private def addUserClasspath(
- args: ClientArguments,
- conf: SparkConf,
- env: HashMap[String, String]): Unit = {
-
- // If `args` is not null, we are launching an AM container.
- // Otherwise, we are launching executor containers.
- val (mainJar, secondaryJars) =
- if (args != null) {
- (args.userJar, args.addJars)
- } else {
- (conf.get(CONF_SPARK_USER_JAR, null), conf.get(CONF_SPARK_YARN_SECONDARY_JARS, null))
- }
+ def getUserClasspath(conf: SparkConf): Array[URI] = {
+ getUserClasspath(conf.getOption(CONF_SPARK_USER_JAR),
+ conf.getOption(CONF_SPARK_YARN_SECONDARY_JARS))
+ }
- addFileToClasspath(mainJar, APP_JAR, env)
- if (secondaryJars != null) {
- secondaryJars.split(",").filter(_.nonEmpty).foreach { jar =>
- addFileToClasspath(jar, null, env)
- }
- }
+ private def getUserClasspath(
+ mainJar: Option[String],
+ secondaryJars: Option[String]): Array[URI] = {
+ val mainUri = mainJar.orElse(Some(APP_JAR)).map(new URI(_))
+ val secondaryUris = secondaryJars.map(_.split(",")).toSeq.flatten.map(new URI(_))
+ (mainUri ++ secondaryUris).toArray
}
/**
@@ -847,27 +839,19 @@ object Client extends Logging {
*
* If not a "local:" file and no alternate name, the environment is not modified.
*
- * @param path Path to add to classpath (optional).
+ * @param uri URI to add to classpath (optional).
* @param fileName Alternate name for the file (optional).
* @param env Map holding the environment variables.
*/
private def addFileToClasspath(
- path: String,
+ uri: URI,
fileName: String,
env: HashMap[String, String]): Unit = {
- if (path != null) {
- scala.util.control.Exception.ignoring(classOf[URISyntaxException]) {
- val uri = new URI(path)
- if (uri.getScheme == LOCAL_SCHEME) {
- addClasspathEntry(uri.getPath, env)
- return
- }
- }
- }
- if (fileName != null) {
- addClasspathEntry(
- YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + fileName, env
- )
+ if (uri != null && uri.getScheme == LOCAL_SCHEME) {
+ addClasspathEntry(uri.getPath, env)
+ } else if (fileName != null) {
+ addClasspathEntry(buildPath(
+ YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env)
}
}
@@ -963,4 +947,23 @@ object Client extends Logging {
new Path(qualifiedURI)
}
+ /**
+ * Whether to consider jars provided by the user to have precedence over the Spark jars when
+ * loading user classes.
+ */
+ def isUserClassPathFirst(conf: SparkConf, isDriver: Boolean): Boolean = {
+ if (isDriver) {
+ conf.getBoolean("spark.driver.userClassPathFirst", false)
+ } else {
+ conf.getBoolean("spark.executor.userClassPathFirst", false)
+ }
+ }
+
+ /**
+ * Joins all the path components using Path.SEPARATOR.
+ */
+ def buildPath(components: String*): String = {
+ components.mkString(Path.SEPARATOR)
+ }
+
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 7cd8c5f0f9..6d5b8fda76 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -17,6 +17,7 @@
package org.apache.spark.deploy.yarn
+import java.io.File
import java.net.URI
import java.nio.ByteBuffer
@@ -57,7 +58,7 @@ class ExecutorRunnable(
var nmClient: NMClient = _
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
lazy val env = prepareEnvironment(container)
-
+
def run = {
logInfo("Starting Executor Container")
nmClient = NMClient.createNMClient()
@@ -185,6 +186,16 @@ class ExecutorRunnable(
// For log4j configuration to reference
javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
+ val userClassPath = Client.getUserClasspath(sparkConf).flatMap { uri =>
+ val absPath =
+ if (new File(uri.getPath()).isAbsolute()) {
+ uri.getPath()
+ } else {
+ Client.buildPath(Environment.PWD.$(), uri.getPath())
+ }
+ Seq("--user-class-path", "file:" + absPath)
+ }.toSeq
+
val commands = prefixEnv ++ Seq(
YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java",
"-server",
@@ -196,11 +207,13 @@ class ExecutorRunnable(
"-XX:OnOutOfMemoryError='kill %p'") ++
javaOpts ++
Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
- masterAddress.toString,
- slaveId.toString,
- hostname.toString,
- executorCores.toString,
- appId,
+ "--driver-url", masterAddress.toString,
+ "--executor-id", slaveId.toString,
+ "--hostname", hostname.toString,
+ "--cores", executorCores.toString,
+ "--app-id", appId) ++
+ userClassPath ++
+ Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
diff --git a/yarn/src/test/resources/log4j.properties b/yarn/src/test/resources/log4j.properties
index 287c8e3563..aab41fa494 100644
--- a/yarn/src/test/resources/log4j.properties
+++ b/yarn/src/test/resources/log4j.properties
@@ -16,7 +16,7 @@
#
# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
+log4j.rootCategory=DEBUG, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=true
log4j.appender.file.file=target/unit-tests.log
@@ -25,4 +25,4 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN
-org.eclipse.jetty.LEVEL=WARN
+log4j.logger.org.apache.hadoop=WARN
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 2bb3dcffd6..f8f8129d22 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -82,6 +82,7 @@ class ClientSuite extends FunSuite with Matchers {
test("Local jar URIs") {
val conf = new Configuration()
val sparkConf = new SparkConf().set(Client.CONF_SPARK_JAR, SPARK)
+ .set("spark.yarn.user.classpath.first", "true")
val env = new MutableHashMap[String, String]()
val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
@@ -98,13 +99,10 @@ class ClientSuite extends FunSuite with Matchers {
})
if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
cp should contain("{{PWD}}")
- cp should contain(s"{{PWD}}${Path.SEPARATOR}*")
} else if (Utils.isWindows) {
cp should contain("%PWD%")
- cp should contain(s"%PWD%${Path.SEPARATOR}*")
} else {
cp should contain(Environment.PWD.$())
- cp should contain(s"${Environment.PWD.$()}${File.separator}*")
}
cp should not contain (Client.SPARK_JAR)
cp should not contain (Client.APP_JAR)
@@ -117,7 +115,7 @@ class ClientSuite extends FunSuite with Matchers {
val client = spy(new Client(args, conf, sparkConf))
doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
- any(classOf[Path]), anyShort(), anyBoolean())
+ any(classOf[Path]), anyShort())
val tempDir = Utils.createTempDir()
try {
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index e39de82740..0e37276ba7 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -17,27 +17,34 @@
package org.apache.spark.deploy.yarn
-import java.io.File
+import java.io.{File, FileOutputStream, OutputStreamWriter}
+import java.util.Properties
import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._
import scala.collection.mutable
-import com.google.common.base.Charsets
+import com.google.common.base.Charsets.UTF_8
+import com.google.common.io.ByteStreams
import com.google.common.io.Files
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.MiniYARNCluster
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
+import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, TestUtils}
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded}
import org.apache.spark.util.Utils
+/**
+ * Integration tests for YARN; these tests use a mini Yarn cluster to run Spark-on-YARN
+ * applications, and require the Spark assembly to be built before they can be successfully
+ * run.
+ */
class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging {
- // log4j configuration for the Yarn containers, so that their output is collected
- // by Yarn instead of trying to overwrite unit-tests.log.
+ // log4j configuration for the YARN containers, so that their output is collected
+ // by YARN instead of trying to overwrite unit-tests.log.
private val LOG4J_CONF = """
|log4j.rootCategory=DEBUG, console
|log4j.appender.console=org.apache.log4j.ConsoleAppender
@@ -52,13 +59,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
|
|from pyspark import SparkConf , SparkContext
|if __name__ == "__main__":
- | if len(sys.argv) != 3:
- | print >> sys.stderr, "Usage: test.py [master] [result file]"
+ | if len(sys.argv) != 2:
+ | print >> sys.stderr, "Usage: test.py [result file]"
| exit(-1)
- | conf = SparkConf()
- | conf.setMaster(sys.argv[1]).setAppName("python test in yarn cluster mode")
- | sc = SparkContext(conf=conf)
- | status = open(sys.argv[2],'w')
+ | sc = SparkContext(conf=SparkConf())
+ | status = open(sys.argv[1],'w')
| result = "failure"
| rdd = sc.parallelize(range(10))
| cnt = rdd.count()
@@ -72,23 +77,17 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
private var yarnCluster: MiniYARNCluster = _
private var tempDir: File = _
private var fakeSparkJar: File = _
- private var oldConf: Map[String, String] = _
+ private var logConfDir: File = _
override def beforeAll() {
super.beforeAll()
tempDir = Utils.createTempDir()
-
- val logConfDir = new File(tempDir, "log4j")
+ logConfDir = new File(tempDir, "log4j")
logConfDir.mkdir()
val logConfFile = new File(logConfDir, "log4j.properties")
- Files.write(LOG4J_CONF, logConfFile, Charsets.UTF_8)
-
- val childClasspath = logConfDir.getAbsolutePath() + File.pathSeparator +
- sys.props("java.class.path")
-
- oldConf = sys.props.filter { case (k, v) => k.startsWith("spark.") }.toMap
+ Files.write(LOG4J_CONF, logConfFile, UTF_8)
yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1)
yarnCluster.init(new YarnConfiguration())
@@ -119,99 +118,165 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
}
logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}")
- config.foreach { e =>
- sys.props += ("spark.hadoop." + e.getKey() -> e.getValue())
- }
fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
- val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
- sys.props += ("spark.yarn.appMasterEnv.SPARK_HOME" -> sparkHome)
- sys.props += ("spark.executorEnv.SPARK_HOME" -> sparkHome)
- sys.props += ("spark.yarn.jar" -> ("local:" + fakeSparkJar.getAbsolutePath()))
- sys.props += ("spark.executor.instances" -> "1")
- sys.props += ("spark.driver.extraClassPath" -> childClasspath)
- sys.props += ("spark.executor.extraClassPath" -> childClasspath)
- sys.props += ("spark.executor.extraJavaOptions" -> "-Dfoo=\"one two three\"")
- sys.props += ("spark.driver.extraJavaOptions" -> "-Dfoo=\"one two three\"")
}
override def afterAll() {
yarnCluster.stop()
- sys.props.retain { case (k, v) => !k.startsWith("spark.") }
- sys.props ++= oldConf
super.afterAll()
}
test("run Spark in yarn-client mode") {
- var result = File.createTempFile("result", null, tempDir)
- YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath()))
- checkResult(result)
-
- // verify log urls are present
- YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info =>
- assert(info.logUrlMap.nonEmpty)
- }
+ testBasicYarnApp(true)
}
test("run Spark in yarn-cluster mode") {
- val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
- var result = File.createTempFile("result", null, tempDir)
-
- val args = Array("--class", main,
- "--jar", "file:" + fakeSparkJar.getAbsolutePath(),
- "--arg", "yarn-cluster",
- "--arg", result.getAbsolutePath(),
- "--num-executors", "1")
- Client.main(args)
- checkResult(result)
-
- // verify log urls are present.
- YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info =>
- assert(info.logUrlMap.nonEmpty)
- }
+ testBasicYarnApp(false)
}
test("run Spark in yarn-cluster mode unsuccessfully") {
- val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
-
- // Use only one argument so the driver will fail
- val args = Array("--class", main,
- "--jar", "file:" + fakeSparkJar.getAbsolutePath(),
- "--arg", "yarn-cluster",
- "--num-executors", "1")
+ // Don't provide arguments so the driver will fail.
val exception = intercept[SparkException] {
- Client.main(args)
+ runSpark(false, mainClassName(YarnClusterDriver.getClass))
+ fail("Spark application should have failed.")
}
- assert(Utils.exceptionString(exception).contains("Application finished with failed status"))
}
test("run Python application in yarn-cluster mode") {
val primaryPyFile = new File(tempDir, "test.py")
- Files.write(TEST_PYFILE, primaryPyFile, Charsets.UTF_8)
+ Files.write(TEST_PYFILE, primaryPyFile, UTF_8)
val pyFile = new File(tempDir, "test2.py")
- Files.write(TEST_PYFILE, pyFile, Charsets.UTF_8)
+ Files.write(TEST_PYFILE, pyFile, UTF_8)
var result = File.createTempFile("result", null, tempDir)
- val args = Array("--class", "org.apache.spark.deploy.PythonRunner",
- "--primary-py-file", primaryPyFile.getAbsolutePath(),
- "--py-files", pyFile.getAbsolutePath(),
- "--arg", "yarn-cluster",
- "--arg", result.getAbsolutePath(),
- "--name", "python test in yarn-cluster mode",
- "--num-executors", "1")
- Client.main(args)
+ // The sbt assembly does not include pyspark / py4j python dependencies, so we need to
+ // propagate SPARK_HOME so that those are added to PYTHONPATH. See PythonUtils.scala.
+ val sparkHome = sys.props("spark.test.home")
+ val extraConf = Map(
+ "spark.executorEnv.SPARK_HOME" -> sparkHome,
+ "spark.yarn.appMasterEnv.SPARK_HOME" -> sparkHome)
+
+ runSpark(false, primaryPyFile.getAbsolutePath(),
+ sparkArgs = Seq("--py-files", pyFile.getAbsolutePath()),
+ appArgs = Seq(result.getAbsolutePath()),
+ extraConf = extraConf)
checkResult(result)
}
+ test("user class path first in client mode") {
+ testUseClassPathFirst(true)
+ }
+
+ test("user class path first in cluster mode") {
+ testUseClassPathFirst(false)
+ }
+
+ private def testBasicYarnApp(clientMode: Boolean): Unit = {
+ var result = File.createTempFile("result", null, tempDir)
+ runSpark(clientMode, mainClassName(YarnClusterDriver.getClass),
+ appArgs = Seq(result.getAbsolutePath()))
+ checkResult(result)
+ }
+
+ private def testUseClassPathFirst(clientMode: Boolean): Unit = {
+ // Create a jar file that contains a different version of "test.resource".
+ val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir)
+ val userJar = TestUtils.createJarWithFiles(Map("test.resource" -> "OVERRIDDEN"), tempDir)
+ val driverResult = File.createTempFile("driver", null, tempDir)
+ val executorResult = File.createTempFile("executor", null, tempDir)
+ runSpark(clientMode, mainClassName(YarnClasspathTest.getClass),
+ appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()),
+ extraClassPath = Seq(originalJar.getPath()),
+ extraJars = Seq("local:" + userJar.getPath()),
+ extraConf = Map(
+ "spark.driver.userClassPathFirst" -> "true",
+ "spark.executor.userClassPathFirst" -> "true"))
+ checkResult(driverResult, "OVERRIDDEN")
+ checkResult(executorResult, "OVERRIDDEN")
+ }
+
+ private def runSpark(
+ clientMode: Boolean,
+ klass: String,
+ appArgs: Seq[String] = Nil,
+ sparkArgs: Seq[String] = Nil,
+ extraClassPath: Seq[String] = Nil,
+ extraJars: Seq[String] = Nil,
+ extraConf: Map[String, String] = Map()): Unit = {
+ val master = if (clientMode) "yarn-client" else "yarn-cluster"
+ val props = new Properties()
+
+ props.setProperty("spark.yarn.jar", "local:" + fakeSparkJar.getAbsolutePath())
+
+ val childClasspath = logConfDir.getAbsolutePath() +
+ File.pathSeparator +
+ sys.props("java.class.path") +
+ File.pathSeparator +
+ extraClassPath.mkString(File.pathSeparator)
+ props.setProperty("spark.driver.extraClassPath", childClasspath)
+ props.setProperty("spark.executor.extraClassPath", childClasspath)
+
+ // SPARK-4267: make sure java options are propagated correctly.
+ props.setProperty("spark.driver.extraJavaOptions", "-Dfoo=\"one two three\"")
+ props.setProperty("spark.executor.extraJavaOptions", "-Dfoo=\"one two three\"")
+
+ yarnCluster.getConfig().foreach { e =>
+ props.setProperty("spark.hadoop." + e.getKey(), e.getValue())
+ }
+
+ sys.props.foreach { case (k, v) =>
+ if (k.startsWith("spark.")) {
+ props.setProperty(k, v)
+ }
+ }
+
+ extraConf.foreach { case (k, v) => props.setProperty(k, v) }
+
+ val propsFile = File.createTempFile("spark", ".properties", tempDir)
+ val writer = new OutputStreamWriter(new FileOutputStream(propsFile), UTF_8)
+ props.store(writer, "Spark properties.")
+ writer.close()
+
+ val extraJarArgs = if (!extraJars.isEmpty()) Seq("--jars", extraJars.mkString(",")) else Nil
+ val mainArgs =
+ if (klass.endsWith(".py")) {
+ Seq(klass)
+ } else {
+ Seq("--class", klass, fakeSparkJar.getAbsolutePath())
+ }
+ val argv =
+ Seq(
+ new File(sys.props("spark.test.home"), "bin/spark-submit").getAbsolutePath(),
+ "--master", master,
+ "--num-executors", "1",
+ "--properties-file", propsFile.getAbsolutePath()) ++
+ extraJarArgs ++
+ sparkArgs ++
+ mainArgs ++
+ appArgs
+
+ Utils.executeAndGetOutput(argv,
+ extraEnvironment = Map("YARN_CONF_DIR" -> tempDir.getAbsolutePath()))
+ }
+
/**
* This is a workaround for an issue with yarn-cluster mode: the Client class will not provide
* any sort of error when the job process finishes successfully, but the job itself fails. So
* the tests enforce that something is written to a file after everything is ok to indicate
* that the job succeeded.
*/
- private def checkResult(result: File) = {
- var resultString = Files.toString(result, Charsets.UTF_8)
- resultString should be ("success")
+ private def checkResult(result: File): Unit = {
+ checkResult(result, "success")
+ }
+
+ private def checkResult(result: File, expected: String): Unit = {
+ var resultString = Files.toString(result, UTF_8)
+ resultString should be (expected)
+ }
+
+ private def mainClassName(klass: Class[_]): String = {
+ klass.getName().stripSuffix("$")
}
}
@@ -229,22 +294,22 @@ private object YarnClusterDriver extends Logging with Matchers {
val WAIT_TIMEOUT_MILLIS = 10000
var listener: SaveExecutorInfo = null
- def main(args: Array[String]) = {
- if (args.length != 2) {
+ def main(args: Array[String]): Unit = {
+ if (args.length != 1) {
System.err.println(
s"""
|Invalid command line: ${args.mkString(" ")}
|
- |Usage: YarnClusterDriver [master] [result file]
+ |Usage: YarnClusterDriver [result file]
""".stripMargin)
System.exit(1)
}
listener = new SaveExecutorInfo
- val sc = new SparkContext(new SparkConf().setMaster(args(0))
+ val sc = new SparkContext(new SparkConf()
.setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
sc.addSparkListener(listener)
- val status = new File(args(1))
+ val status = new File(args(0))
var result = "failure"
try {
val data = sc.parallelize(1 to 4, 4).collect().toSet
@@ -253,7 +318,48 @@ private object YarnClusterDriver extends Logging with Matchers {
result = "success"
} finally {
sc.stop()
- Files.write(result, status, Charsets.UTF_8)
+ Files.write(result, status, UTF_8)
+ }
+
+ // verify log urls are present
+ listener.addedExecutorInfos.values.foreach { info =>
+ assert(info.logUrlMap.nonEmpty)
+ }
+ }
+
+}
+
+private object YarnClasspathTest {
+
+ def main(args: Array[String]): Unit = {
+ if (args.length != 2) {
+ System.err.println(
+ s"""
+ |Invalid command line: ${args.mkString(" ")}
+ |
+ |Usage: YarnClasspathTest [driver result file] [executor result file]
+ """.stripMargin)
+ System.exit(1)
+ }
+
+ readResource(args(0))
+ val sc = new SparkContext(new SparkConf())
+ try {
+ sc.parallelize(Seq(1)).foreach { x => readResource(args(1)) }
+ } finally {
+ sc.stop()
+ }
+ }
+
+ private def readResource(resultPath: String): Unit = {
+ var result = "failure"
+ try {
+ val ccl = Thread.currentThread().getContextClassLoader()
+ val resource = ccl.getResourceAsStream("test.resource")
+ val bytes = ByteStreams.toByteArray(resource)
+ result = new String(bytes, 0, bytes.length, UTF_8)
+ } finally {
+ Files.write(result, new File(resultPath), UTF_8)
}
}