aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-02-09 21:17:06 -0800
committerAndrew Or <andrew@databricks.com>2015-02-09 21:17:28 -0800
commit20a6013106b56a1a1cc3e8cda092330ffbe77cc3 (patch)
tree381068b96cbb3278ce4c2fd50bbf185fe1d6c3e9 /core
parent36c4e1d75933dc843acb747b91dc12e75ad1df42 (diff)
downloadspark-20a6013106b56a1a1cc3e8cda092330ffbe77cc3.tar.gz
spark-20a6013106b56a1a1cc3e8cda092330ffbe77cc3.tar.bz2
spark-20a6013106b56a1a1cc3e8cda092330ffbe77cc3.zip
[SPARK-2996] Implement userClassPathFirst for driver, yarn.
Yarn's config option `spark.yarn.user.classpath.first` does not work the same way as `spark.files.userClassPathFirst`; Yarn's version is a lot more dangerous, in that it modifies the system classpath, instead of restricting the changes to the user's class loader. So this change implements the behavior of the latter for Yarn, and deprecates the more dangerous choice. To be able to achieve feature-parity, I also implemented the option for drivers (the existing option only applies to executors). So now there are two options, each controlling whether to apply userClassPathFirst to the driver or executors. The old option was deprecated, and aliased to the new one (`spark.executor.userClassPathFirst`). The existing "child-first" class loader also had to be fixed. It didn't handle resources, and it was also doing some things that ended up causing JVM errors depending on how things were being called. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #3233 from vanzin/SPARK-2996 and squashes the following commits: 9cf9cf1 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 a1499e2 [Marcelo Vanzin] Remove SPARK_HOME propagation. fa7df88 [Marcelo Vanzin] Remove 'test.resource' file, create it dynamically. a8c69f1 [Marcelo Vanzin] Review feedback. cabf962 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 a1b8d7e [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 3f768e3 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 2ce3c7a [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 0e6d6be [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 70d4044 [Marcelo Vanzin] Fix pyspark/yarn-cluster test. 0fe7777 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 0e6ef19 [Marcelo Vanzin] Move class loaders around and make names more meaninful. fe970a7 [Marcelo Vanzin] Review feedback. 25d4fed [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 3cb6498 [Marcelo Vanzin] Call the right loadClass() method on the parent. fbb8ab5 [Marcelo Vanzin] Add locking in loadClass() to avoid deadlocks. 2e6c4b7 [Marcelo Vanzin] Mention new setting in documentation. b6497f9 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 a10f379 [Marcelo Vanzin] Some feedback. 3730151 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 f513871 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 44010b6 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 7b57cba [Marcelo Vanzin] Remove now outdated message. 5304d64 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 35949c8 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 54e1a98 [Marcelo Vanzin] Merge branch 'master' into SPARK-2996 d1273b2 [Marcelo Vanzin] Add test file to rat exclude. fa1aafa [Marcelo Vanzin] Remove write check on user jars. 89d8072 [Marcelo Vanzin] Cleanups. a963ea3 [Marcelo Vanzin] Implement spark.driver.userClassPathFirst for standalone cluster mode. 50afa5f [Marcelo Vanzin] Fix Yarn executor command line. 7d14397 [Marcelo Vanzin] Register user jars in executor up front. 7f8603c [Marcelo Vanzin] Fix yarn-cluster mode without userClassPathFirst. 20373f5 [Marcelo Vanzin] Fix ClientBaseSuite. 55c88fa [Marcelo Vanzin] Run all Yarn integration tests via spark-submit. 0b64d92 [Marcelo Vanzin] Add deprecation warning to yarn option. 4a84d87 [Marcelo Vanzin] Fix the child-first class loader. d0394b8 [Marcelo Vanzin] Add "deprecated configs" to SparkConf. 46d8cf2 [Marcelo Vanzin] Update doc with new option, change name to "userClassPathFirst". a314f2d [Marcelo Vanzin] Enable driver class path isolation in SparkSubmit. 91f7e54 [Marcelo Vanzin] [yarn] Enable executor class path isolation. a853e74 [Marcelo Vanzin] Re-work CoarseGrainedExecutorBackend command line arguments. 89522ef [Marcelo Vanzin] Add class path isolation support for Yarn cluster mode.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala83
-rw-r--r--core/src/main/scala/org/apache/spark/TestUtils.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/Client.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala83
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala52
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala84
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala103
-rw-r--r--core/src/main/scala/org/apache/spark/util/ParentClassLoader.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/SparkConfSuite.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala27
-rw-r--r--core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala (renamed from core/src/test/scala/org/apache/spark/executor/ExecutorURLClassLoaderSuite.scala)12
18 files changed, 403 insertions, 161 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 13aa9960ac..0dbd26146c 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -18,6 +18,7 @@
package org.apache.spark
import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.JavaConverters._
import scala.collection.mutable.LinkedHashSet
@@ -67,7 +68,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
if (value == null) {
throw new NullPointerException("null value for " + key)
}
- settings.put(key, value)
+ settings.put(translateConfKey(key, warn = true), value)
this
}
@@ -139,7 +140,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
/** Set a parameter if it isn't already configured */
def setIfMissing(key: String, value: String): SparkConf = {
- settings.putIfAbsent(key, value)
+ settings.putIfAbsent(translateConfKey(key, warn = true), value)
this
}
@@ -175,7 +176,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
/** Get a parameter as an Option */
def getOption(key: String): Option[String] = {
- Option(settings.get(key))
+ Option(settings.get(translateConfKey(key)))
}
/** Get all parameters as a list of pairs */
@@ -228,7 +229,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
def getAppId: String = get("spark.app.id")
/** Does the configuration contain a given parameter? */
- def contains(key: String): Boolean = settings.containsKey(key)
+ def contains(key: String): Boolean = settings.containsKey(translateConfKey(key))
/** Copy this object */
override def clone: SparkConf = {
@@ -285,7 +286,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
// Validate memory fractions
val memoryKeys = Seq(
"spark.storage.memoryFraction",
- "spark.shuffle.memoryFraction",
+ "spark.shuffle.memoryFraction",
"spark.shuffle.safetyFraction",
"spark.storage.unrollFraction",
"spark.storage.safetyFraction")
@@ -351,9 +352,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
def toDebugString: String = {
getAll.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")
}
+
}
-private[spark] object SparkConf {
+private[spark] object SparkConf extends Logging {
+
+ private val deprecatedConfigs: Map[String, DeprecatedConfig] = {
+ val configs = Seq(
+ DeprecatedConfig("spark.files.userClassPathFirst", "spark.executor.userClassPathFirst",
+ "1.3"),
+ DeprecatedConfig("spark.yarn.user.classpath.first", null, "1.3",
+ "Use spark.{driver,executor}.userClassPathFirst instead."))
+ configs.map { x => (x.oldName, x) }.toMap
+ }
+
/**
* Return whether the given config is an akka config (e.g. akka.actor.provider).
* Note that this does not include spark-specific akka configs (e.g. spark.akka.timeout).
@@ -380,4 +392,63 @@ private[spark] object SparkConf {
def isSparkPortConf(name: String): Boolean = {
(name.startsWith("spark.") && name.endsWith(".port")) || name.startsWith("spark.port.")
}
+
+ /**
+ * Translate the configuration key if it is deprecated and has a replacement, otherwise just
+ * returns the provided key.
+ *
+ * @param userKey Configuration key from the user / caller.
+ * @param warn Whether to print a warning if the key is deprecated. Warnings will be printed
+ * only once for each key.
+ */
+ def translateConfKey(userKey: String, warn: Boolean = false): String = {
+ deprecatedConfigs.get(userKey)
+ .map { deprecatedKey =>
+ if (warn) {
+ deprecatedKey.warn()
+ }
+ deprecatedKey.newName.getOrElse(userKey)
+ }.getOrElse(userKey)
+ }
+
+ /**
+ * Holds information about keys that have been deprecated or renamed.
+ *
+ * @param oldName Old configuration key.
+ * @param newName New configuration key, or `null` if key has no replacement, in which case the
+ * deprecated key will be used (but the warning message will still be printed).
+ * @param version Version of Spark where key was deprecated.
+ * @param deprecationMessage Message to include in the deprecation warning; mandatory when
+ * `newName` is not provided.
+ */
+ private case class DeprecatedConfig(
+ oldName: String,
+ _newName: String,
+ version: String,
+ deprecationMessage: String = null) {
+
+ private val warned = new AtomicBoolean(false)
+ val newName = Option(_newName)
+
+ if (newName == null && (deprecationMessage == null || deprecationMessage.isEmpty())) {
+ throw new IllegalArgumentException("Need new config name or deprecation message.")
+ }
+
+ def warn(): Unit = {
+ if (warned.compareAndSet(false, true)) {
+ if (newName != null) {
+ val message = Option(deprecationMessage).getOrElse(
+ s"Please use the alternative '$newName' instead.")
+ logWarning(
+ s"The configuration option '$oldName' has been replaced as of Spark $version and " +
+ s"may be removed in the future. $message")
+ } else {
+ logWarning(
+ s"The configuration option '$oldName' has been deprecated as of Spark $version and " +
+ s"may be removed in the future. $deprecationMessage")
+ }
+ }
+ }
+
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala
index be081c3825..35b324ba6f 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -17,12 +17,13 @@
package org.apache.spark
-import java.io.{File, FileInputStream, FileOutputStream}
+import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
import java.net.{URI, URL}
import java.util.jar.{JarEntry, JarOutputStream}
import scala.collection.JavaConversions._
+import com.google.common.base.Charsets.UTF_8
import com.google.common.io.{ByteStreams, Files}
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
@@ -59,6 +60,22 @@ private[spark] object TestUtils {
createJar(files1 ++ files2, jarFile)
}
+ /**
+ * Create a jar file containing multiple files. The `files` map contains a mapping of
+ * file names in the jar file to their contents.
+ */
+ def createJarWithFiles(files: Map[String, String], dir: File = null): URL = {
+ val tempDir = Option(dir).getOrElse(Utils.createTempDir())
+ val jarFile = File.createTempFile("testJar", ".jar", tempDir)
+ val jarStream = new JarOutputStream(new FileOutputStream(jarFile))
+ files.foreach { case (k, v) =>
+ val entry = new JarEntry(k)
+ jarStream.putNextEntry(entry)
+ ByteStreams.copy(new ByteArrayInputStream(v.getBytes(UTF_8)), jarStream)
+ }
+ jarStream.close()
+ jarFile.toURI.toURL
+ }
/**
* Create a jar file that contains this set of files. All files will be located at the root
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 38b3da0b13..237d26fc6b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -68,8 +68,9 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
- val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
- driverArgs.driverOptions, sys.env, classPathEntries, libraryPathEntries, javaOpts)
+ val command = new Command(mainClass,
+ Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
+ sys.env, classPathEntries, libraryPathEntries, javaOpts)
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 6d213926f3..c4bc5054d6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -37,7 +37,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}
import org.apache.spark.deploy.rest._
import org.apache.spark.executor._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
/**
* Whether to submit, kill, or request the status of an application.
@@ -467,11 +467,11 @@ object SparkSubmit {
}
val loader =
- if (sysProps.getOrElse("spark.files.userClassPathFirst", "false").toBoolean) {
- new ChildExecutorURLClassLoader(new Array[URL](0),
+ if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
+ new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
- new ExecutorURLClassLoader(new Array[URL](0),
+ new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
index b47a081053..fd514f0766 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
@@ -196,7 +196,7 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
<td sorttable_customkey={driver.desc.mem.toString}>
{Utils.megabytesToString(driver.desc.mem.toLong)}
</td>
- <td>{driver.desc.command.arguments(1)}</td>
+ <td>{driver.desc.command.arguments(2)}</td>
</tr>
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
index 2033d67e1f..6e4486e20f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
@@ -392,7 +392,7 @@ private class SubmitRequestServlet(
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = new Command(
"org.apache.spark.deploy.worker.DriverWrapper",
- Seq("{{WORKER_URL}}", mainClass) ++ appArgs, // args to the DriverWrapper
+ Seq("{{WORKER_URL}}", "{{USER_JAR}}", mainClass) ++ appArgs, // args to the DriverWrapper
environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
val actualDriverCores = driverCores.map(_.toInt).getOrElse(DEFAULT_CORES)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 28cab36c7b..b964a09bdb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -74,10 +74,15 @@ private[spark] class DriverRunner(
val driverDir = createWorkingDirectory()
val localJarFilename = downloadUserJar(driverDir)
- // Make sure user application jar is on the classpath
+ def substituteVariables(argument: String): String = argument match {
+ case "{{WORKER_URL}}" => workerUrl
+ case "{{USER_JAR}}" => localJarFilename
+ case other => other
+ }
+
// TODO: If we add ability to submit multiple jars they should also be added here
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, driverDesc.mem,
- sparkHome.getAbsolutePath, substituteVariables, Seq(localJarFilename))
+ sparkHome.getAbsolutePath, substituteVariables)
launchDriver(builder, driverDir, driverDesc.supervise)
}
catch {
@@ -111,12 +116,6 @@ private[spark] class DriverRunner(
}
}
- /** Replace variables in a command argument passed to us */
- private def substituteVariables(argument: String): String = argument match {
- case "{{WORKER_URL}}" => workerUrl
- case other => other
- }
-
/**
* Creates the working directory for this driver.
* Will throw an exception if there are errors preparing the directory.
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
index 05e242e6df..ab467a5ee8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -17,10 +17,12 @@
package org.apache.spark.deploy.worker
+import java.io.File
+
import akka.actor._
import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.{AkkaUtils, ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
/**
* Utility object for launching driver programs such that they share fate with the Worker process.
@@ -28,21 +30,31 @@ import org.apache.spark.util.{AkkaUtils, Utils}
object DriverWrapper {
def main(args: Array[String]) {
args.toList match {
- case workerUrl :: mainClass :: extraArgs =>
+ case workerUrl :: userJar :: mainClass :: extraArgs =>
val conf = new SparkConf()
val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",
Utils.localHostName(), 0, conf, new SecurityManager(conf))
actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher")
+ val currentLoader = Thread.currentThread.getContextClassLoader
+ val userJarUrl = new File(userJar).toURI().toURL()
+ val loader =
+ if (sys.props.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
+ new ChildFirstURLClassLoader(Array(userJarUrl), currentLoader)
+ } else {
+ new MutableURLClassLoader(Array(userJarUrl), currentLoader)
+ }
+ Thread.currentThread.setContextClassLoader(loader)
+
// Delegate to supplied main class
- val clazz = Class.forName(args(1))
+ val clazz = Class.forName(mainClass, true, loader)
val mainMethod = clazz.getMethod("main", classOf[Array[String]])
mainMethod.invoke(null, extraArgs.toArray[String])
actorSystem.shutdown()
case _ =>
- System.err.println("Usage: DriverWrapper <workerUrl> <driverMainClass> [options]")
+ System.err.println("Usage: DriverWrapper <workerUrl> <userJar> <driverMainClass> [options]")
System.exit(-1)
}
}
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 3a42f8b157..dd19e4947d 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -17,8 +17,10 @@
package org.apache.spark.executor
+import java.net.URL
import java.nio.ByteBuffer
+import scala.collection.mutable
import scala.concurrent.Await
import akka.actor.{Actor, ActorSelection, Props}
@@ -38,6 +40,7 @@ private[spark] class CoarseGrainedExecutorBackend(
executorId: String,
hostPort: String,
cores: Int,
+ userClassPath: Seq[URL],
env: SparkEnv)
extends Actor with ActorLogReceive with ExecutorBackend with Logging {
@@ -63,7 +66,7 @@ private[spark] class CoarseGrainedExecutorBackend(
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
val (hostname, _) = Utils.parseHostPort(hostPort)
- executor = new Executor(executorId, hostname, env, isLocal = false)
+ executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
@@ -117,7 +120,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
hostname: String,
cores: Int,
appId: String,
- workerUrl: Option[String]) {
+ workerUrl: Option[String],
+ userClassPath: Seq[URL]) {
SignalLogger.register(log)
@@ -162,7 +166,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
val sparkHostPort = hostname + ":" + boundPort
env.actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend],
- driverUrl, executorId, sparkHostPort, cores, env),
+ driverUrl, executorId, sparkHostPort, cores, userClassPath, env),
name = "Executor")
workerUrl.foreach { url =>
env.actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
@@ -172,20 +176,69 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}
def main(args: Array[String]) {
- args.length match {
- case x if x < 5 =>
- System.err.println(
+ var driverUrl: String = null
+ var executorId: String = null
+ var hostname: String = null
+ var cores: Int = 0
+ var appId: String = null
+ var workerUrl: Option[String] = None
+ val userClassPath = new mutable.ListBuffer[URL]()
+
+ var argv = args.toList
+ while (!argv.isEmpty) {
+ argv match {
+ case ("--driver-url") :: value :: tail =>
+ driverUrl = value
+ argv = tail
+ case ("--executor-id") :: value :: tail =>
+ executorId = value
+ argv = tail
+ case ("--hostname") :: value :: tail =>
+ hostname = value
+ argv = tail
+ case ("--cores") :: value :: tail =>
+ cores = value.toInt
+ argv = tail
+ case ("--app-id") :: value :: tail =>
+ appId = value
+ argv = tail
+ case ("--worker-url") :: value :: tail =>
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
- "Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> " +
- "<cores> <appid> [<workerUrl>] ")
- System.exit(1)
+ workerUrl = Some(value)
+ argv = tail
+ case ("--user-class-path") :: value :: tail =>
+ userClassPath += new URL(value)
+ argv = tail
+ case Nil =>
+ case tail =>
+ System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
+ printUsageAndExit()
+ }
+ }
- // NB: These arguments are provided by SparkDeploySchedulerBackend (for standalone mode)
- // and CoarseMesosSchedulerBackend (for mesos mode).
- case 5 =>
- run(args(0), args(1), args(2), args(3).toInt, args(4), None)
- case x if x > 5 =>
- run(args(0), args(1), args(2), args(3).toInt, args(4), Some(args(5)))
+ if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
+ appId == null) {
+ printUsageAndExit()
}
+
+ run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
}
+
+ private def printUsageAndExit() = {
+ System.err.println(
+ """
+ |"Usage: CoarseGrainedExecutorBackend [options]
+ |
+ | Options are:
+ | --driver-url <driverUrl>
+ | --executor-id <executorId>
+ | --hostname <hostname>
+ | --cores <cores>
+ | --app-id <appid>
+ | --worker-url <workerUrl>
+ | --user-class-path <url>
+ |""".stripMargin)
+ System.exit(1)
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 5141483d1e..6b22dcd6f5 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -19,6 +19,7 @@ package org.apache.spark.executor
import java.io.File
import java.lang.management.ManagementFactory
+import java.net.URL
import java.nio.ByteBuffer
import java.util.concurrent._
@@ -33,7 +34,8 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
-import org.apache.spark.util.{SparkUncaughtExceptionHandler, AkkaUtils, Utils}
+import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader,
+ SparkUncaughtExceptionHandler, AkkaUtils, Utils}
/**
* Spark executor used with Mesos, YARN, and the standalone scheduler.
@@ -43,6 +45,7 @@ private[spark] class Executor(
executorId: String,
executorHostname: String,
env: SparkEnv,
+ userClassPath: Seq[URL] = Nil,
isLocal: Boolean = false)
extends Logging
{
@@ -288,17 +291,23 @@ private[spark] class Executor(
* created by the interpreter to the search path
*/
private def createClassLoader(): MutableURLClassLoader = {
+ // Bootstrap the list of jars with the user class path.
+ val now = System.currentTimeMillis()
+ userClassPath.foreach { url =>
+ currentJars(url.getPath().split("/").last) = now
+ }
+
val currentLoader = Utils.getContextOrSparkClassLoader
// For each of the jars in the jarSet, add them to the class loader.
// We assume each of the files has already been fetched.
- val urls = currentJars.keySet.map { uri =>
+ val urls = userClassPath.toArray ++ currentJars.keySet.map { uri =>
new File(uri.split("/").last).toURI.toURL
- }.toArray
- val userClassPathFirst = conf.getBoolean("spark.files.userClassPathFirst", false)
- userClassPathFirst match {
- case true => new ChildExecutorURLClassLoader(urls, currentLoader)
- case false => new ExecutorURLClassLoader(urls, currentLoader)
+ }
+ if (conf.getBoolean("spark.executor.userClassPathFirst", false)) {
+ new ChildFirstURLClassLoader(urls, currentLoader)
+ } else {
+ new MutableURLClassLoader(urls, currentLoader)
}
}
@@ -311,7 +320,7 @@ private[spark] class Executor(
if (classUri != null) {
logInfo("Using REPL class URI: " + classUri)
val userClassPathFirst: java.lang.Boolean =
- conf.getBoolean("spark.files.userClassPathFirst", false)
+ conf.getBoolean("spark.executor.userClassPathFirst", false)
try {
val klass = Class.forName("org.apache.spark.repl.ExecutorClassLoader")
.asInstanceOf[Class[_ <: ClassLoader]]
@@ -344,18 +353,23 @@ private[spark] class Executor(
env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
currentFiles(name) = timestamp
}
- for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
- logInfo("Fetching " + name + " with timestamp " + timestamp)
- // Fetch file with useCache mode, close cache for local mode.
- Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf,
- env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
- currentJars(name) = timestamp
- // Add it to our class loader
+ for ((name, timestamp) <- newJars) {
val localName = name.split("/").last
- val url = new File(SparkFiles.getRootDirectory, localName).toURI.toURL
- if (!urlClassLoader.getURLs.contains(url)) {
- logInfo("Adding " + url + " to class loader")
- urlClassLoader.addURL(url)
+ val currentTimeStamp = currentJars.get(name)
+ .orElse(currentJars.get(localName))
+ .getOrElse(-1L)
+ if (currentTimeStamp < timestamp) {
+ logInfo("Fetching " + name + " with timestamp " + timestamp)
+ // Fetch file with useCache mode, close cache for local mode.
+ Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf,
+ env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
+ currentJars(name) = timestamp
+ // Add it to our class loader
+ val url = new File(SparkFiles.getRootDirectory, localName).toURI.toURL
+ if (!urlClassLoader.getURLs.contains(url)) {
+ logInfo("Adding " + url + " to class loader")
+ urlClassLoader.addURL(url)
+ }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala
deleted file mode 100644
index 8011e75944..0000000000
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.executor
-
-import java.net.{URLClassLoader, URL}
-
-import org.apache.spark.util.ParentClassLoader
-
-/**
- * The addURL method in URLClassLoader is protected. We subclass it to make this accessible.
- * We also make changes so user classes can come before the default classes.
- */
-
-private[spark] trait MutableURLClassLoader extends ClassLoader {
- def addURL(url: URL)
- def getURLs: Array[URL]
-}
-
-private[spark] class ChildExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
- extends MutableURLClassLoader {
-
- private object userClassLoader extends URLClassLoader(urls, null){
- override def addURL(url: URL) {
- super.addURL(url)
- }
- override def findClass(name: String): Class[_] = {
- val loaded = super.findLoadedClass(name)
- if (loaded != null) {
- return loaded
- }
- try {
- super.findClass(name)
- } catch {
- case e: ClassNotFoundException => {
- parentClassLoader.loadClass(name)
- }
- }
- }
- }
-
- private val parentClassLoader = new ParentClassLoader(parent)
-
- override def findClass(name: String): Class[_] = {
- try {
- userClassLoader.findClass(name)
- } catch {
- case e: ClassNotFoundException => {
- parentClassLoader.loadClass(name)
- }
- }
- }
-
- def addURL(url: URL) {
- userClassLoader.addURL(url)
- }
-
- def getURLs() = {
- userClassLoader.getURLs()
- }
-}
-
-private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
- extends URLClassLoader(urls, parent) with MutableURLClassLoader {
-
- override def addURL(url: URL) {
- super.addURL(url)
- }
-}
-
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index d2e1680a5f..40fc6b59cd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -52,8 +52,13 @@ private[spark] class SparkDeploySchedulerBackend(
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
- val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{APP_ID}}",
- "{{WORKER_URL}}")
+ val args = Seq(
+ "--driver-url", driverUrl,
+ "--executor-id", "{{EXECUTOR_ID}}",
+ "--hostname", "{{HOSTNAME}}",
+ "--cores", "{{CORES}}",
+ "--app-id", "{{APP_ID}}",
+ "--worker-url", "{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 0d1c2a916c..90dfe14352 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -154,18 +154,25 @@ private[spark] class CoarseMesosSchedulerBackend(
if (uri == null) {
val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath
command.setValue(
- "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s".format(
- prefixEnv, runScript, driverUrl, offer.getSlaveId.getValue,
- offer.getHostname, numCores, appId))
+ "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
+ .format(prefixEnv, runScript) +
+ s" --driver-url $driverUrl" +
+ s" --executor-id ${offer.getSlaveId.getValue}" +
+ s" --hostname ${offer.getHostname}" +
+ s" --cores $numCores" +
+ s" --app-id $appId")
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
command.setValue(
- ("cd %s*; %s " +
- "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s")
- .format(basename, prefixEnv, driverUrl, offer.getSlaveId.getValue,
- offer.getHostname, numCores, appId))
+ s"cd $basename*; $prefixEnv " +
+ "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
+ s" --driver-url $driverUrl" +
+ s" --executor-id ${offer.getSlaveId.getValue}" +
+ s" --hostname ${offer.getHostname}" +
+ s" --cores $numCores" +
+ s" --app-id $appId")
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
command.build()
diff --git a/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala b/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala
new file mode 100644
index 0000000000..d9c7103b2f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.net.{URLClassLoader, URL}
+import java.util.Enumeration
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.util.ParentClassLoader
+
+/**
+ * URL class loader that exposes the `addURL` and `getURLs` methods in URLClassLoader.
+ */
+private[spark] class MutableURLClassLoader(urls: Array[URL], parent: ClassLoader)
+ extends URLClassLoader(urls, parent) {
+
+ override def addURL(url: URL): Unit = {
+ super.addURL(url)
+ }
+
+ override def getURLs(): Array[URL] = {
+ super.getURLs()
+ }
+
+}
+
+/**
+ * A mutable class loader that gives preference to its own URLs over the parent class loader
+ * when loading classes and resources.
+ */
+private[spark] class ChildFirstURLClassLoader(urls: Array[URL], parent: ClassLoader)
+ extends MutableURLClassLoader(urls, null) {
+
+ private val parentClassLoader = new ParentClassLoader(parent)
+
+ /**
+ * Used to implement fine-grained class loading locks similar to what is done by Java 7. This
+ * prevents deadlock issues when using non-hierarchical class loaders.
+ *
+ * Note that due to Java 6 compatibility (and some issues with implementing class loaders in
+ * Scala), Java 7's `ClassLoader.registerAsParallelCapable` method is not called.
+ */
+ private val locks = new ConcurrentHashMap[String, Object]()
+
+ override def loadClass(name: String, resolve: Boolean): Class[_] = {
+ var lock = locks.get(name)
+ if (lock == null) {
+ val newLock = new Object()
+ lock = locks.putIfAbsent(name, newLock)
+ if (lock == null) {
+ lock = newLock
+ }
+ }
+
+ lock.synchronized {
+ try {
+ super.loadClass(name, resolve)
+ } catch {
+ case e: ClassNotFoundException =>
+ parentClassLoader.loadClass(name, resolve)
+ }
+ }
+ }
+
+ override def getResource(name: String): URL = {
+ val url = super.findResource(name)
+ val res = if (url != null) url else parentClassLoader.getResource(name)
+ res
+ }
+
+ override def getResources(name: String): Enumeration[URL] = {
+ val urls = super.findResources(name)
+ val res =
+ if (urls != null && urls.hasMoreElements()) {
+ urls
+ } else {
+ parentClassLoader.getResources(name)
+ }
+ res
+ }
+
+ override def addURL(url: URL) {
+ super.addURL(url)
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/util/ParentClassLoader.scala b/core/src/main/scala/org/apache/spark/util/ParentClassLoader.scala
index 3abc12681f..6d8d9e8da3 100644
--- a/core/src/main/scala/org/apache/spark/util/ParentClassLoader.scala
+++ b/core/src/main/scala/org/apache/spark/util/ParentClassLoader.scala
@@ -18,7 +18,7 @@
package org.apache.spark.util
/**
- * A class loader which makes findClass accesible to the child
+ * A class loader which makes some protected methods in ClassLoader accesible.
*/
private[spark] class ParentClassLoader(parent: ClassLoader) extends ClassLoader(parent) {
@@ -29,4 +29,9 @@ private[spark] class ParentClassLoader(parent: ClassLoader) extends ClassLoader(
override def loadClass(name: String): Class[_] = {
super.loadClass(name)
}
+
+ override def loadClass(name: String, resolve: Boolean): Class[_] = {
+ super.loadClass(name, resolve)
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index e08210ae60..ea6b73bc68 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -197,6 +197,18 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro
serializer.newInstance().serialize(new StringBuffer())
}
+ test("deprecated config keys") {
+ val conf = new SparkConf()
+ .set("spark.files.userClassPathFirst", "true")
+ .set("spark.yarn.user.classpath.first", "true")
+ assert(conf.contains("spark.files.userClassPathFirst"))
+ assert(conf.contains("spark.executor.userClassPathFirst"))
+ assert(conf.contains("spark.yarn.user.classpath.first"))
+ assert(conf.getBoolean("spark.files.userClassPathFirst", false))
+ assert(conf.getBoolean("spark.executor.userClassPathFirst", false))
+ assert(conf.getBoolean("spark.yarn.user.classpath.first", false))
+ }
+
}
class Class1 {}
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 1ddccae126..46d745c4ec 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -21,6 +21,8 @@ import java.io._
import scala.collection.mutable.ArrayBuffer
+import com.google.common.base.Charsets.UTF_8
+import com.google.common.io.ByteStreams
import org.scalatest.FunSuite
import org.scalatest.Matchers
import org.scalatest.concurrent.Timeouts
@@ -450,6 +452,19 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
}
+ test("user classpath first in driver") {
+ val systemJar = TestUtils.createJarWithFiles(Map("test.resource" -> "SYSTEM"))
+ val userJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"))
+ val args = Seq(
+ "--class", UserClasspathFirstTest.getClass.getName.stripSuffix("$"),
+ "--name", "testApp",
+ "--master", "local",
+ "--conf", "spark.driver.extraClassPath=" + systemJar,
+ "--conf", "spark.driver.userClassPathFirst=true",
+ userJar.toString)
+ runSparkSubmit(args)
+ }
+
test("SPARK_CONF_DIR overrides spark-defaults.conf") {
forConfDir(Map("spark.executor.memory" -> "2.3g")) { path =>
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
@@ -541,3 +556,15 @@ object SimpleApplicationTest {
}
}
}
+
+object UserClasspathFirstTest {
+ def main(args: Array[String]) {
+ val ccl = Thread.currentThread().getContextClassLoader()
+ val resource = ccl.getResourceAsStream("test.resource")
+ val bytes = ByteStreams.toByteArray(resource)
+ val contents = new String(bytes, 0, bytes.length, UTF_8)
+ if (contents != "USER") {
+ throw new SparkException("Should have read user resource, but instead read: " + contents)
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorURLClassLoaderSuite.scala b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
index b7912c09d1..31e3b7e7bb 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorURLClassLoaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.executor
+package org.apache.spark.util
import java.net.URLClassLoader
@@ -24,7 +24,7 @@ import org.scalatest.FunSuite
import org.apache.spark.{LocalSparkContext, SparkContext, SparkException, TestUtils}
import org.apache.spark.util.Utils
-class ExecutorURLClassLoaderSuite extends FunSuite {
+class MutableURLClassLoaderSuite extends FunSuite {
val urls2 = List(TestUtils.createJarWithClasses(
classNames = Seq("FakeClass1", "FakeClass2", "FakeClass3"),
@@ -37,7 +37,7 @@ class ExecutorURLClassLoaderSuite extends FunSuite {
test("child first") {
val parentLoader = new URLClassLoader(urls2, null)
- val classLoader = new ChildExecutorURLClassLoader(urls, parentLoader)
+ val classLoader = new ChildFirstURLClassLoader(urls, parentLoader)
val fakeClass = classLoader.loadClass("FakeClass2").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "1")
@@ -47,7 +47,7 @@ class ExecutorURLClassLoaderSuite extends FunSuite {
test("parent first") {
val parentLoader = new URLClassLoader(urls2, null)
- val classLoader = new ExecutorURLClassLoader(urls, parentLoader)
+ val classLoader = new MutableURLClassLoader(urls, parentLoader)
val fakeClass = classLoader.loadClass("FakeClass1").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "2")
@@ -57,7 +57,7 @@ class ExecutorURLClassLoaderSuite extends FunSuite {
test("child first can fall back") {
val parentLoader = new URLClassLoader(urls2, null)
- val classLoader = new ChildExecutorURLClassLoader(urls, parentLoader)
+ val classLoader = new ChildFirstURLClassLoader(urls, parentLoader)
val fakeClass = classLoader.loadClass("FakeClass3").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "2")
@@ -65,7 +65,7 @@ class ExecutorURLClassLoaderSuite extends FunSuite {
test("child first can fail") {
val parentLoader = new URLClassLoader(urls2, null)
- val classLoader = new ChildExecutorURLClassLoader(urls, parentLoader)
+ val classLoader = new ChildFirstURLClassLoader(urls, parentLoader)
intercept[java.lang.ClassNotFoundException] {
classLoader.loadClass("FakeClassDoesNotExist").newInstance()
}