aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala83
-rw-r--r--core/src/main/scala/org/apache/spark/TestUtils.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/Client.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala83
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala52
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala84
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala103
-rw-r--r--core/src/main/scala/org/apache/spark/util/ParentClassLoader.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/SparkConfSuite.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala27
-rw-r--r--core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala (renamed from core/src/test/scala/org/apache/spark/executor/ExecutorURLClassLoaderSuite.scala)12
18 files changed, 403 insertions, 161 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 13aa9960ac..0dbd26146c 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -18,6 +18,7 @@
package org.apache.spark
import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.JavaConverters._
import scala.collection.mutable.LinkedHashSet
@@ -67,7 +68,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
if (value == null) {
throw new NullPointerException("null value for " + key)
}
- settings.put(key, value)
+ settings.put(translateConfKey(key, warn = true), value)
this
}
@@ -139,7 +140,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
/** Set a parameter if it isn't already configured */
def setIfMissing(key: String, value: String): SparkConf = {
- settings.putIfAbsent(key, value)
+ settings.putIfAbsent(translateConfKey(key, warn = true), value)
this
}
@@ -175,7 +176,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
/** Get a parameter as an Option */
def getOption(key: String): Option[String] = {
- Option(settings.get(key))
+ Option(settings.get(translateConfKey(key)))
}
/** Get all parameters as a list of pairs */
@@ -228,7 +229,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
def getAppId: String = get("spark.app.id")
/** Does the configuration contain a given parameter? */
- def contains(key: String): Boolean = settings.containsKey(key)
+ def contains(key: String): Boolean = settings.containsKey(translateConfKey(key))
/** Copy this object */
override def clone: SparkConf = {
@@ -285,7 +286,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
// Validate memory fractions
val memoryKeys = Seq(
"spark.storage.memoryFraction",
- "spark.shuffle.memoryFraction",
+ "spark.shuffle.memoryFraction",
"spark.shuffle.safetyFraction",
"spark.storage.unrollFraction",
"spark.storage.safetyFraction")
@@ -351,9 +352,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
def toDebugString: String = {
getAll.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")
}
+
}
-private[spark] object SparkConf {
+private[spark] object SparkConf extends Logging {
+
+ private val deprecatedConfigs: Map[String, DeprecatedConfig] = {
+ val configs = Seq(
+ DeprecatedConfig("spark.files.userClassPathFirst", "spark.executor.userClassPathFirst",
+ "1.3"),
+ DeprecatedConfig("spark.yarn.user.classpath.first", null, "1.3",
+ "Use spark.{driver,executor}.userClassPathFirst instead."))
+ configs.map { x => (x.oldName, x) }.toMap
+ }
+
/**
* Return whether the given config is an akka config (e.g. akka.actor.provider).
* Note that this does not include spark-specific akka configs (e.g. spark.akka.timeout).
@@ -380,4 +392,63 @@ private[spark] object SparkConf {
def isSparkPortConf(name: String): Boolean = {
(name.startsWith("spark.") && name.endsWith(".port")) || name.startsWith("spark.port.")
}
+
+ /**
+ * Translate the configuration key if it is deprecated and has a replacement, otherwise just
+ * returns the provided key.
+ *
+ * @param userKey Configuration key from the user / caller.
+ * @param warn Whether to print a warning if the key is deprecated. Warnings will be printed
+ * only once for each key.
+ */
+ def translateConfKey(userKey: String, warn: Boolean = false): String = {
+ deprecatedConfigs.get(userKey)
+ .map { deprecatedKey =>
+ if (warn) {
+ deprecatedKey.warn()
+ }
+ deprecatedKey.newName.getOrElse(userKey)
+ }.getOrElse(userKey)
+ }
+
+ /**
+ * Holds information about keys that have been deprecated or renamed.
+ *
+ * @param oldName Old configuration key.
+ * @param newName New configuration key, or `null` if key has no replacement, in which case the
+ * deprecated key will be used (but the warning message will still be printed).
+ * @param version Version of Spark where key was deprecated.
+ * @param deprecationMessage Message to include in the deprecation warning; mandatory when
+ * `newName` is not provided.
+ */
+ private case class DeprecatedConfig(
+ oldName: String,
+ _newName: String,
+ version: String,
+ deprecationMessage: String = null) {
+
+ private val warned = new AtomicBoolean(false)
+ val newName = Option(_newName)
+
+ if (newName == null && (deprecationMessage == null || deprecationMessage.isEmpty())) {
+ throw new IllegalArgumentException("Need new config name or deprecation message.")
+ }
+
+ def warn(): Unit = {
+ if (warned.compareAndSet(false, true)) {
+ if (newName != null) {
+ val message = Option(deprecationMessage).getOrElse(
+ s"Please use the alternative '$newName' instead.")
+ logWarning(
+ s"The configuration option '$oldName' has been replaced as of Spark $version and " +
+ s"may be removed in the future. $message")
+ } else {
+ logWarning(
+ s"The configuration option '$oldName' has been deprecated as of Spark $version and " +
+ s"may be removed in the future. $deprecationMessage")
+ }
+ }
+ }
+
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala
index be081c3825..35b324ba6f 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -17,12 +17,13 @@
package org.apache.spark
-import java.io.{File, FileInputStream, FileOutputStream}
+import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
import java.net.{URI, URL}
import java.util.jar.{JarEntry, JarOutputStream}
import scala.collection.JavaConversions._
+import com.google.common.base.Charsets.UTF_8
import com.google.common.io.{ByteStreams, Files}
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
@@ -59,6 +60,22 @@ private[spark] object TestUtils {
createJar(files1 ++ files2, jarFile)
}
+ /**
+ * Create a jar file containing multiple files. The `files` map contains a mapping of
+ * file names in the jar file to their contents.
+ */
+ def createJarWithFiles(files: Map[String, String], dir: File = null): URL = {
+ val tempDir = Option(dir).getOrElse(Utils.createTempDir())
+ val jarFile = File.createTempFile("testJar", ".jar", tempDir)
+ val jarStream = new JarOutputStream(new FileOutputStream(jarFile))
+ files.foreach { case (k, v) =>
+ val entry = new JarEntry(k)
+ jarStream.putNextEntry(entry)
+ ByteStreams.copy(new ByteArrayInputStream(v.getBytes(UTF_8)), jarStream)
+ }
+ jarStream.close()
+ jarFile.toURI.toURL
+ }
/**
* Create a jar file that contains this set of files. All files will be located at the root
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 38b3da0b13..237d26fc6b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -68,8 +68,9 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
- val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
- driverArgs.driverOptions, sys.env, classPathEntries, libraryPathEntries, javaOpts)
+ val command = new Command(mainClass,
+ Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
+ sys.env, classPathEntries, libraryPathEntries, javaOpts)
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 6d213926f3..c4bc5054d6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -37,7 +37,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}
import org.apache.spark.deploy.rest._
import org.apache.spark.executor._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
/**
* Whether to submit, kill, or request the status of an application.
@@ -467,11 +467,11 @@ object SparkSubmit {
}
val loader =
- if (sysProps.getOrElse("spark.files.userClassPathFirst", "false").toBoolean) {
- new ChildExecutorURLClassLoader(new Array[URL](0),
+ if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
+ new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
- new ExecutorURLClassLoader(new Array[URL](0),
+ new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
index b47a081053..fd514f0766 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
@@ -196,7 +196,7 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
<td sorttable_customkey={driver.desc.mem.toString}>
{Utils.megabytesToString(driver.desc.mem.toLong)}
</td>
- <td>{driver.desc.command.arguments(1)}</td>
+ <td>{driver.desc.command.arguments(2)}</td>
</tr>
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
index 2033d67e1f..6e4486e20f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
@@ -392,7 +392,7 @@ private class SubmitRequestServlet(
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = new Command(
"org.apache.spark.deploy.worker.DriverWrapper",
- Seq("{{WORKER_URL}}", mainClass) ++ appArgs, // args to the DriverWrapper
+ Seq("{{WORKER_URL}}", "{{USER_JAR}}", mainClass) ++ appArgs, // args to the DriverWrapper
environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
val actualDriverCores = driverCores.map(_.toInt).getOrElse(DEFAULT_CORES)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 28cab36c7b..b964a09bdb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -74,10 +74,15 @@ private[spark] class DriverRunner(
val driverDir = createWorkingDirectory()
val localJarFilename = downloadUserJar(driverDir)
- // Make sure user application jar is on the classpath
+ def substituteVariables(argument: String): String = argument match {
+ case "{{WORKER_URL}}" => workerUrl
+ case "{{USER_JAR}}" => localJarFilename
+ case other => other
+ }
+
// TODO: If we add ability to submit multiple jars they should also be added here
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, driverDesc.mem,
- sparkHome.getAbsolutePath, substituteVariables, Seq(localJarFilename))
+ sparkHome.getAbsolutePath, substituteVariables)
launchDriver(builder, driverDir, driverDesc.supervise)
}
catch {
@@ -111,12 +116,6 @@ private[spark] class DriverRunner(
}
}
- /** Replace variables in a command argument passed to us */
- private def substituteVariables(argument: String): String = argument match {
- case "{{WORKER_URL}}" => workerUrl
- case other => other
- }
-
/**
* Creates the working directory for this driver.
* Will throw an exception if there are errors preparing the directory.
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
index 05e242e6df..ab467a5ee8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -17,10 +17,12 @@
package org.apache.spark.deploy.worker
+import java.io.File
+
import akka.actor._
import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.{AkkaUtils, ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
/**
* Utility object for launching driver programs such that they share fate with the Worker process.
@@ -28,21 +30,31 @@ import org.apache.spark.util.{AkkaUtils, Utils}
object DriverWrapper {
def main(args: Array[String]) {
args.toList match {
- case workerUrl :: mainClass :: extraArgs =>
+ case workerUrl :: userJar :: mainClass :: extraArgs =>
val conf = new SparkConf()
val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",
Utils.localHostName(), 0, conf, new SecurityManager(conf))
actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher")
+ val currentLoader = Thread.currentThread.getContextClassLoader
+ val userJarUrl = new File(userJar).toURI().toURL()
+ val loader =
+ if (sys.props.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
+ new ChildFirstURLClassLoader(Array(userJarUrl), currentLoader)
+ } else {
+ new MutableURLClassLoader(Array(userJarUrl), currentLoader)
+ }
+ Thread.currentThread.setContextClassLoader(loader)
+
// Delegate to supplied main class
- val clazz = Class.forName(args(1))
+ val clazz = Class.forName(mainClass, true, loader)
val mainMethod = clazz.getMethod("main", classOf[Array[String]])
mainMethod.invoke(null, extraArgs.toArray[String])
actorSystem.shutdown()
case _ =>
- System.err.println("Usage: DriverWrapper <workerUrl> <driverMainClass> [options]")
+ System.err.println("Usage: DriverWrapper <workerUrl> <userJar> <driverMainClass> [options]")
System.exit(-1)
}
}
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 3a42f8b157..dd19e4947d 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -17,8 +17,10 @@
package org.apache.spark.executor
+import java.net.URL
import java.nio.ByteBuffer
+import scala.collection.mutable
import scala.concurrent.Await
import akka.actor.{Actor, ActorSelection, Props}
@@ -38,6 +40,7 @@ private[spark] class CoarseGrainedExecutorBackend(
executorId: String,
hostPort: String,
cores: Int,
+ userClassPath: Seq[URL],
env: SparkEnv)
extends Actor with ActorLogReceive with ExecutorBackend with Logging {
@@ -63,7 +66,7 @@ private[spark] class CoarseGrainedExecutorBackend(
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
val (hostname, _) = Utils.parseHostPort(hostPort)
- executor = new Executor(executorId, hostname, env, isLocal = false)
+ executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
@@ -117,7 +120,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
hostname: String,
cores: Int,
appId: String,
- workerUrl: Option[String]) {
+ workerUrl: Option[String],
+ userClassPath: Seq[URL]) {
SignalLogger.register(log)
@@ -162,7 +166,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
val sparkHostPort = hostname + ":" + boundPort
env.actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend],
- driverUrl, executorId, sparkHostPort, cores, env),
+ driverUrl, executorId, sparkHostPort, cores, userClassPath, env),
name = "Executor")
workerUrl.foreach { url =>
env.actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
@@ -172,20 +176,69 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}
def main(args: Array[String]) {
- args.length match {
- case x if x < 5 =>
- System.err.println(
+ var driverUrl: String = null
+ var executorId: String = null
+ var hostname: String = null
+ var cores: Int = 0
+ var appId: String = null
+ var workerUrl: Option[String] = None
+ val userClassPath = new mutable.ListBuffer[URL]()
+
+ var argv = args.toList
+ while (!argv.isEmpty) {
+ argv match {
+ case ("--driver-url") :: value :: tail =>
+ driverUrl = value
+ argv = tail
+ case ("--executor-id") :: value :: tail =>
+ executorId = value
+ argv = tail
+ case ("--hostname") :: value :: tail =>
+ hostname = value
+ argv = tail
+ case ("--cores") :: value :: tail =>
+ cores = value.toInt
+ argv = tail
+ case ("--app-id") :: value :: tail =>
+ appId = value
+ argv = tail
+ case ("--worker-url") :: value :: tail =>
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
- "Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> " +
- "<cores> <appid> [<workerUrl>] ")
- System.exit(1)
+ workerUrl = Some(value)
+ argv = tail
+ case ("--user-class-path") :: value :: tail =>
+ userClassPath += new URL(value)
+ argv = tail
+ case Nil =>
+ case tail =>
+ System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
+ printUsageAndExit()
+ }
+ }
- // NB: These arguments are provided by SparkDeploySchedulerBackend (for standalone mode)
- // and CoarseMesosSchedulerBackend (for mesos mode).
- case 5 =>
- run(args(0), args(1), args(2), args(3).toInt, args(4), None)
- case x if x > 5 =>
- run(args(0), args(1), args(2), args(3).toInt, args(4), Some(args(5)))
+ if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
+ appId == null) {
+ printUsageAndExit()
}
+
+ run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
}
+
+ private def printUsageAndExit() = {
+ System.err.println(
+ """
+ |"Usage: CoarseGrainedExecutorBackend [options]
+ |
+ | Options are:
+ | --driver-url <driverUrl>
+ | --executor-id <executorId>
+ | --hostname <hostname>
+ | --cores <cores>
+ | --app-id <appid>
+ | --worker-url <workerUrl>
+ | --user-class-path <url>
+ |""".stripMargin)
+ System.exit(1)
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 5141483d1e..6b22dcd6f5 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -19,6 +19,7 @@ package org.apache.spark.executor
import java.io.File
import java.lang.management.ManagementFactory
+import java.net.URL
import java.nio.ByteBuffer
import java.util.concurrent._
@@ -33,7 +34,8 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
-import org.apache.spark.util.{SparkUncaughtExceptionHandler, AkkaUtils, Utils}
+import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader,
+ SparkUncaughtExceptionHandler, AkkaUtils, Utils}
/**
* Spark executor used with Mesos, YARN, and the standalone scheduler.
@@ -43,6 +45,7 @@ private[spark] class Executor(
executorId: String,
executorHostname: String,
env: SparkEnv,
+ userClassPath: Seq[URL] = Nil,
isLocal: Boolean = false)
extends Logging
{
@@ -288,17 +291,23 @@ private[spark] class Executor(
* created by the interpreter to the search path
*/
private def createClassLoader(): MutableURLClassLoader = {
+ // Bootstrap the list of jars with the user class path.
+ val now = System.currentTimeMillis()
+ userClassPath.foreach { url =>
+ currentJars(url.getPath().split("/").last) = now
+ }
+
val currentLoader = Utils.getContextOrSparkClassLoader
// For each of the jars in the jarSet, add them to the class loader.
// We assume each of the files has already been fetched.
- val urls = currentJars.keySet.map { uri =>
+ val urls = userClassPath.toArray ++ currentJars.keySet.map { uri =>
new File(uri.split("/").last).toURI.toURL
- }.toArray
- val userClassPathFirst = conf.getBoolean("spark.files.userClassPathFirst", false)
- userClassPathFirst match {
- case true => new ChildExecutorURLClassLoader(urls, currentLoader)
- case false => new ExecutorURLClassLoader(urls, currentLoader)
+ }
+ if (conf.getBoolean("spark.executor.userClassPathFirst", false)) {
+ new ChildFirstURLClassLoader(urls, currentLoader)
+ } else {
+ new MutableURLClassLoader(urls, currentLoader)
}
}
@@ -311,7 +320,7 @@ private[spark] class Executor(
if (classUri != null) {
logInfo("Using REPL class URI: " + classUri)
val userClassPathFirst: java.lang.Boolean =
- conf.getBoolean("spark.files.userClassPathFirst", false)
+ conf.getBoolean("spark.executor.userClassPathFirst", false)
try {
val klass = Class.forName("org.apache.spark.repl.ExecutorClassLoader")
.asInstanceOf[Class[_ <: ClassLoader]]
@@ -344,18 +353,23 @@ private[spark] class Executor(
env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
currentFiles(name) = timestamp
}
- for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
- logInfo("Fetching " + name + " with timestamp " + timestamp)
- // Fetch file with useCache mode, close cache for local mode.
- Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf,
- env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
- currentJars(name) = timestamp
- // Add it to our class loader
+ for ((name, timestamp) <- newJars) {
val localName = name.split("/").last
- val url = new File(SparkFiles.getRootDirectory, localName).toURI.toURL
- if (!urlClassLoader.getURLs.contains(url)) {
- logInfo("Adding " + url + " to class loader")
- urlClassLoader.addURL(url)
+ val currentTimeStamp = currentJars.get(name)
+ .orElse(currentJars.get(localName))
+ .getOrElse(-1L)
+ if (currentTimeStamp < timestamp) {
+ logInfo("Fetching " + name + " with timestamp " + timestamp)
+ // Fetch file with useCache mode, close cache for local mode.
+ Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf,
+ env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
+ currentJars(name) = timestamp
+ // Add it to our class loader
+ val url = new File(SparkFiles.getRootDirectory, localName).toURI.toURL
+ if (!urlClassLoader.getURLs.contains(url)) {
+ logInfo("Adding " + url + " to class loader")
+ urlClassLoader.addURL(url)
+ }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala
deleted file mode 100644
index 8011e75944..0000000000
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.executor
-
-import java.net.{URLClassLoader, URL}
-
-import org.apache.spark.util.ParentClassLoader
-
-/**
- * The addURL method in URLClassLoader is protected. We subclass it to make this accessible.
- * We also make changes so user classes can come before the default classes.
- */
-
-private[spark] trait MutableURLClassLoader extends ClassLoader {
- def addURL(url: URL)
- def getURLs: Array[URL]
-}
-
-private[spark] class ChildExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
- extends MutableURLClassLoader {
-
- private object userClassLoader extends URLClassLoader(urls, null){
- override def addURL(url: URL) {
- super.addURL(url)
- }
- override def findClass(name: String): Class[_] = {
- val loaded = super.findLoadedClass(name)
- if (loaded != null) {
- return loaded
- }
- try {
- super.findClass(name)
- } catch {
- case e: ClassNotFoundException => {
- parentClassLoader.loadClass(name)
- }
- }
- }
- }
-
- private val parentClassLoader = new ParentClassLoader(parent)
-
- override def findClass(name: String): Class[_] = {
- try {
- userClassLoader.findClass(name)
- } catch {
- case e: ClassNotFoundException => {
- parentClassLoader.loadClass(name)
- }
- }
- }
-
- def addURL(url: URL) {
- userClassLoader.addURL(url)
- }
-
- def getURLs() = {
- userClassLoader.getURLs()
- }
-}
-
-private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
- extends URLClassLoader(urls, parent) with MutableURLClassLoader {
-
- override def addURL(url: URL) {
- super.addURL(url)
- }
-}
-
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index d2e1680a5f..40fc6b59cd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -52,8 +52,13 @@ private[spark] class SparkDeploySchedulerBackend(
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
- val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{APP_ID}}",
- "{{WORKER_URL}}")
+ val args = Seq(
+ "--driver-url", driverUrl,
+ "--executor-id", "{{EXECUTOR_ID}}",
+ "--hostname", "{{HOSTNAME}}",
+ "--cores", "{{CORES}}",
+ "--app-id", "{{APP_ID}}",
+ "--worker-url", "{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 0d1c2a916c..90dfe14352 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -154,18 +154,25 @@ private[spark] class CoarseMesosSchedulerBackend(
if (uri == null) {
val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath
command.setValue(
- "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s".format(
- prefixEnv, runScript, driverUrl, offer.getSlaveId.getValue,
- offer.getHostname, numCores, appId))
+ "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
+ .format(prefixEnv, runScript) +
+ s" --driver-url $driverUrl" +
+ s" --executor-id ${offer.getSlaveId.getValue}" +
+ s" --hostname ${offer.getHostname}" +
+ s" --cores $numCores" +
+ s" --app-id $appId")
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
command.setValue(
- ("cd %s*; %s " +
- "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s")
- .format(basename, prefixEnv, driverUrl, offer.getSlaveId.getValue,
- offer.getHostname, numCores, appId))
+ s"cd $basename*; $prefixEnv " +
+ "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
+ s" --driver-url $driverUrl" +
+ s" --executor-id ${offer.getSlaveId.getValue}" +
+ s" --hostname ${offer.getHostname}" +
+ s" --cores $numCores" +
+ s" --app-id $appId")
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
command.build()
diff --git a/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala b/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala
new file mode 100644
index 0000000000..d9c7103b2f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.net.{URLClassLoader, URL}
+import java.util.Enumeration
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.util.ParentClassLoader
+
+/**
+ * URL class loader that exposes the `addURL` and `getURLs` methods in URLClassLoader.
+ */
+private[spark] class MutableURLClassLoader(urls: Array[URL], parent: ClassLoader)
+ extends URLClassLoader(urls, parent) {
+
+ override def addURL(url: URL): Unit = {
+ super.addURL(url)
+ }
+
+ override def getURLs(): Array[URL] = {
+ super.getURLs()
+ }
+
+}
+
+/**
+ * A mutable class loader that gives preference to its own URLs over the parent class loader
+ * when loading classes and resources.
+ */
+private[spark] class ChildFirstURLClassLoader(urls: Array[URL], parent: ClassLoader)
+ extends MutableURLClassLoader(urls, null) {
+
+ private val parentClassLoader = new ParentClassLoader(parent)
+
+ /**
+ * Used to implement fine-grained class loading locks similar to what is done by Java 7. This
+ * prevents deadlock issues when using non-hierarchical class loaders.
+ *
+ * Note that due to Java 6 compatibility (and some issues with implementing class loaders in
+ * Scala), Java 7's `ClassLoader.registerAsParallelCapable` method is not called.
+ */
+ private val locks = new ConcurrentHashMap[String, Object]()
+
+ override def loadClass(name: String, resolve: Boolean): Class[_] = {
+ var lock = locks.get(name)
+ if (lock == null) {
+ val newLock = new Object()
+ lock = locks.putIfAbsent(name, newLock)
+ if (lock == null) {
+ lock = newLock
+ }
+ }
+
+ lock.synchronized {
+ try {
+ super.loadClass(name, resolve)
+ } catch {
+ case e: ClassNotFoundException =>
+ parentClassLoader.loadClass(name, resolve)
+ }
+ }
+ }
+
+ override def getResource(name: String): URL = {
+ val url = super.findResource(name)
+ val res = if (url != null) url else parentClassLoader.getResource(name)
+ res
+ }
+
+ override def getResources(name: String): Enumeration[URL] = {
+ val urls = super.findResources(name)
+ val res =
+ if (urls != null && urls.hasMoreElements()) {
+ urls
+ } else {
+ parentClassLoader.getResources(name)
+ }
+ res
+ }
+
+ override def addURL(url: URL) {
+ super.addURL(url)
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/util/ParentClassLoader.scala b/core/src/main/scala/org/apache/spark/util/ParentClassLoader.scala
index 3abc12681f..6d8d9e8da3 100644
--- a/core/src/main/scala/org/apache/spark/util/ParentClassLoader.scala
+++ b/core/src/main/scala/org/apache/spark/util/ParentClassLoader.scala
@@ -18,7 +18,7 @@
package org.apache.spark.util
/**
- * A class loader which makes findClass accesible to the child
+ * A class loader which makes some protected methods in ClassLoader accesible.
*/
private[spark] class ParentClassLoader(parent: ClassLoader) extends ClassLoader(parent) {
@@ -29,4 +29,9 @@ private[spark] class ParentClassLoader(parent: ClassLoader) extends ClassLoader(
override def loadClass(name: String): Class[_] = {
super.loadClass(name)
}
+
+ override def loadClass(name: String, resolve: Boolean): Class[_] = {
+ super.loadClass(name, resolve)
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index e08210ae60..ea6b73bc68 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -197,6 +197,18 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro
serializer.newInstance().serialize(new StringBuffer())
}
+ test("deprecated config keys") {
+ val conf = new SparkConf()
+ .set("spark.files.userClassPathFirst", "true")
+ .set("spark.yarn.user.classpath.first", "true")
+ assert(conf.contains("spark.files.userClassPathFirst"))
+ assert(conf.contains("spark.executor.userClassPathFirst"))
+ assert(conf.contains("spark.yarn.user.classpath.first"))
+ assert(conf.getBoolean("spark.files.userClassPathFirst", false))
+ assert(conf.getBoolean("spark.executor.userClassPathFirst", false))
+ assert(conf.getBoolean("spark.yarn.user.classpath.first", false))
+ }
+
}
class Class1 {}
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 1ddccae126..46d745c4ec 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -21,6 +21,8 @@ import java.io._
import scala.collection.mutable.ArrayBuffer
+import com.google.common.base.Charsets.UTF_8
+import com.google.common.io.ByteStreams
import org.scalatest.FunSuite
import org.scalatest.Matchers
import org.scalatest.concurrent.Timeouts
@@ -450,6 +452,19 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
}
+ test("user classpath first in driver") {
+ val systemJar = TestUtils.createJarWithFiles(Map("test.resource" -> "SYSTEM"))
+ val userJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"))
+ val args = Seq(
+ "--class", UserClasspathFirstTest.getClass.getName.stripSuffix("$"),
+ "--name", "testApp",
+ "--master", "local",
+ "--conf", "spark.driver.extraClassPath=" + systemJar,
+ "--conf", "spark.driver.userClassPathFirst=true",
+ userJar.toString)
+ runSparkSubmit(args)
+ }
+
test("SPARK_CONF_DIR overrides spark-defaults.conf") {
forConfDir(Map("spark.executor.memory" -> "2.3g")) { path =>
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
@@ -541,3 +556,15 @@ object SimpleApplicationTest {
}
}
}
+
+object UserClasspathFirstTest {
+ def main(args: Array[String]) {
+ val ccl = Thread.currentThread().getContextClassLoader()
+ val resource = ccl.getResourceAsStream("test.resource")
+ val bytes = ByteStreams.toByteArray(resource)
+ val contents = new String(bytes, 0, bytes.length, UTF_8)
+ if (contents != "USER") {
+ throw new SparkException("Should have read user resource, but instead read: " + contents)
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorURLClassLoaderSuite.scala b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
index b7912c09d1..31e3b7e7bb 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorURLClassLoaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.executor
+package org.apache.spark.util
import java.net.URLClassLoader
@@ -24,7 +24,7 @@ import org.scalatest.FunSuite
import org.apache.spark.{LocalSparkContext, SparkContext, SparkException, TestUtils}
import org.apache.spark.util.Utils
-class ExecutorURLClassLoaderSuite extends FunSuite {
+class MutableURLClassLoaderSuite extends FunSuite {
val urls2 = List(TestUtils.createJarWithClasses(
classNames = Seq("FakeClass1", "FakeClass2", "FakeClass3"),
@@ -37,7 +37,7 @@ class ExecutorURLClassLoaderSuite extends FunSuite {
test("child first") {
val parentLoader = new URLClassLoader(urls2, null)
- val classLoader = new ChildExecutorURLClassLoader(urls, parentLoader)
+ val classLoader = new ChildFirstURLClassLoader(urls, parentLoader)
val fakeClass = classLoader.loadClass("FakeClass2").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "1")
@@ -47,7 +47,7 @@ class ExecutorURLClassLoaderSuite extends FunSuite {
test("parent first") {
val parentLoader = new URLClassLoader(urls2, null)
- val classLoader = new ExecutorURLClassLoader(urls, parentLoader)
+ val classLoader = new MutableURLClassLoader(urls, parentLoader)
val fakeClass = classLoader.loadClass("FakeClass1").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "2")
@@ -57,7 +57,7 @@ class ExecutorURLClassLoaderSuite extends FunSuite {
test("child first can fall back") {
val parentLoader = new URLClassLoader(urls2, null)
- val classLoader = new ChildExecutorURLClassLoader(urls, parentLoader)
+ val classLoader = new ChildFirstURLClassLoader(urls, parentLoader)
val fakeClass = classLoader.loadClass("FakeClass3").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "2")
@@ -65,7 +65,7 @@ class ExecutorURLClassLoaderSuite extends FunSuite {
test("child first can fail") {
val parentLoader = new URLClassLoader(urls2, null)
- val classLoader = new ChildExecutorURLClassLoader(urls, parentLoader)
+ val classLoader = new ChildFirstURLClassLoader(urls, parentLoader)
intercept[java.lang.ClassNotFoundException] {
classLoader.loadClass("FakeClassDoesNotExist").newInstance()
}