aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/Logging.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/JettyUtils.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/executor/ExecutorURLClassLoaderSuite.scala32
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala7
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala3
15 files changed, 78 insertions, 35 deletions
diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala
index 9d429dceeb..50d8e93e1f 100644
--- a/core/src/main/scala/org/apache/spark/Logging.scala
+++ b/core/src/main/scala/org/apache/spark/Logging.scala
@@ -22,6 +22,7 @@ import org.slf4j.{Logger, LoggerFactory}
import org.slf4j.impl.StaticLoggerBinder
import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.Utils
/**
* :: DeveloperApi ::
@@ -115,8 +116,7 @@ trait Logging {
val log4jInitialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized && usingLog4j) {
val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
- val classLoader = this.getClass.getClassLoader
- Option(classLoader.getResource(defaultLogProps)) match {
+ Option(Utils.getSparkClassLoader.getResource(defaultLogProps)) match {
case Some(url) =>
PropertyConfigurator.configure(url)
log.info(s"Using Spark's default log4j profile: $defaultLogProps")
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index c12bd922d4..f89b2bffd1 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -292,7 +292,7 @@ private[spark] class Executor(
* created by the interpreter to the search path
*/
private def createClassLoader(): MutableURLClassLoader = {
- val loader = this.getClass.getClassLoader
+ val currentLoader = Utils.getContextOrSparkClassLoader
// For each of the jars in the jarSet, add them to the class loader.
// We assume each of the files has already been fetched.
@@ -301,8 +301,8 @@ private[spark] class Executor(
}.toArray
val userClassPathFirst = conf.getBoolean("spark.files.userClassPathFirst", false)
userClassPathFirst match {
- case true => new ChildExecutorURLClassLoader(urls, loader)
- case false => new ExecutorURLClassLoader(urls, loader)
+ case true => new ChildExecutorURLClassLoader(urls, currentLoader)
+ case false => new ExecutorURLClassLoader(urls, currentLoader)
}
}
diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index df36a06485..6fc702fdb1 100644
--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -50,21 +50,13 @@ private[spark] class MesosExecutorBackend
executorInfo: ExecutorInfo,
frameworkInfo: FrameworkInfo,
slaveInfo: SlaveInfo) {
- val cl = Thread.currentThread.getContextClassLoader
- try {
- // Work around for SPARK-1480
- Thread.currentThread.setContextClassLoader(getClass.getClassLoader)
- logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
- this.driver = driver
- val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
- executor = new Executor(
- executorInfo.getExecutorId.getValue,
- slaveInfo.getHostname,
- properties)
- } finally {
- // Work around for SPARK-1480
- Thread.currentThread.setContextClassLoader(cl)
- }
+ logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
+ this.driver = driver
+ val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
+ executor = new Executor(
+ executorInfo.getExecutorId.getValue,
+ slaveInfo.getHostname,
+ properties)
}
override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
index 3e3e18c353..1b7a5d1f19 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable
import scala.util.matching.Regex
import org.apache.spark.Logging
+import org.apache.spark.util.Utils
private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging {
@@ -50,7 +51,7 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
try {
is = configFile match {
case Some(f) => new FileInputStream(f)
- case None => getClass.getClassLoader.getResourceAsStream(METRICS_CONF)
+ case None => Utils.getSparkClassLoader.getResourceAsStream(METRICS_CONF)
}
if (is != null) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 083fb895d8..0b381308b6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -54,7 +54,6 @@ private[spark] object ResultTask {
def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], (TaskContext, Iterator[_]) => _) =
{
- val loader = Thread.currentThread.getContextClassLoader
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
val ser = SparkEnv.get.closureSerializer.newInstance()
val objIn = ser.deserializeStream(in)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
index e4eced383c..6c5827f75e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
@@ -23,6 +23,7 @@ import java.util.{NoSuchElementException, Properties}
import scala.xml.XML
import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.util.Utils
/**
* An interface to build Schedulable tree
@@ -72,7 +73,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
schedulerAllocFile.map { f =>
new FileInputStream(f)
}.getOrElse {
- getClass.getClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
+ Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index cb4ad4ae93..c9ad2b151d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -85,13 +85,13 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
try {
if (serializedData != null && serializedData.limit() > 0) {
reason = serializer.get().deserialize[TaskEndReason](
- serializedData, getClass.getClassLoader)
+ serializedData, Utils.getSparkClassLoader)
}
} catch {
case cnd: ClassNotFoundException =>
// Log an error but keep going here -- the task failed, so not catastropic if we can't
// deserialize the reason.
- val loader = Thread.currentThread.getContextClassLoader
+ val loader = Utils.getContextOrSparkClassLoader
logError(
"Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
case ex: Throwable => {}
diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index 5e5883554f..e9163deaf2 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer
import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.ByteBufferInputStream
+import org.apache.spark.util.Utils
private[spark] class JavaSerializationStream(out: OutputStream, counterReset: Int)
extends SerializationStream {
@@ -86,7 +87,7 @@ private[spark] class JavaSerializerInstance(counterReset: Int) extends Serialize
}
def deserializeStream(s: InputStream): DeserializationStream = {
- new JavaDeserializationStream(s, Thread.currentThread.getContextClassLoader)
+ new JavaDeserializationStream(s, Utils.getContextOrSparkClassLoader)
}
def deserializeStream(s: InputStream, loader: ClassLoader): DeserializationStream = {
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index 62a4e3d0f6..3ae147a36c 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -33,6 +33,7 @@ import org.json4s.JValue
import org.json4s.jackson.JsonMethods.{pretty, render}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.util.Utils
/**
* Utilities for launching a web server using Jetty's HTTP Server class
@@ -124,7 +125,7 @@ private[spark] object JettyUtils extends Logging {
contextHandler.setInitParameter("org.eclipse.jetty.servlet.Default.gzip", "false")
val staticHandler = new DefaultServlet
val holder = new ServletHolder(staticHandler)
- Option(getClass.getClassLoader.getResource(resourceBase)) match {
+ Option(Utils.getSparkClassLoader.getResource(resourceBase)) match {
case Some(res) =>
holder.setInitParameter("resourceBase", res.toString)
case None =>
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 166f48ce73..a3af4e7b91 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -117,6 +117,21 @@ private[spark] object Utils extends Logging {
}
/**
+ * Get the ClassLoader which loaded Spark.
+ */
+ def getSparkClassLoader = getClass.getClassLoader
+
+ /**
+ * Get the Context ClassLoader on this thread or, if not present, the ClassLoader that
+ * loaded Spark.
+ *
+ * This should be used whenever passing a ClassLoader to Class.ForName or finding the currently
+ * active loader when setting up ClassLoader delegation chains.
+ */
+ def getContextOrSparkClassLoader =
+ Option(Thread.currentThread().getContextClassLoader).getOrElse(getSparkClassLoader)
+
+ /**
* Primitive often used when writing {@link java.nio.ByteBuffer} to {@link java.io.DataOutput}.
*/
def writeByteBuffer(bb: ByteBuffer, out: ObjectOutput) = {
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorURLClassLoaderSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorURLClassLoaderSuite.scala
index c40cfc0696..e2050e95a1 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorURLClassLoaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorURLClassLoaderSuite.scala
@@ -17,12 +17,12 @@
package org.apache.spark.executor
-import java.io.File
import java.net.URLClassLoader
import org.scalatest.FunSuite
-import org.apache.spark.TestUtils
+import org.apache.spark.{LocalSparkContext, SparkContext, SparkException, TestUtils}
+import org.apache.spark.util.Utils
class ExecutorURLClassLoaderSuite extends FunSuite {
@@ -63,5 +63,33 @@ class ExecutorURLClassLoaderSuite extends FunSuite {
}
}
+ test("driver sets context class loader in local mode") {
+ // Test the case where the driver program sets a context classloader and then runs a job
+ // in local mode. This is what happens when ./spark-submit is called with "local" as the
+ // master.
+ val original = Thread.currentThread().getContextClassLoader
+ val className = "ClassForDriverTest"
+ val jar = TestUtils.createJarWithClasses(Seq(className))
+ val contextLoader = new URLClassLoader(Array(jar), Utils.getContextOrSparkClassLoader)
+ Thread.currentThread().setContextClassLoader(contextLoader)
+
+ val sc = new SparkContext("local", "driverLoaderTest")
+
+ try {
+ sc.makeRDD(1 to 5, 2).mapPartitions { x =>
+ val loader = Thread.currentThread().getContextClassLoader
+ Class.forName(className, true, loader).newInstance()
+ Seq().iterator
+ }.count()
+ }
+ catch {
+ case e: SparkException if e.getMessage.contains("ClassNotFoundException") =>
+ fail("Local executor could not find class", e)
+ case t: Throwable => fail("Unexpected exception ", t)
+ }
+
+ sc.stop()
+ Thread.currentThread().setContextClassLoader(original)
+ }
}
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index 5a367b6bb7..beb40e8702 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -39,6 +39,7 @@ import scala.reflect.api.{Mirror, TypeCreator, Universe => ApiUniverse}
import org.apache.spark.Logging
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
+import org.apache.spark.util.Utils
/** The Scala interactive shell. It provides a read-eval-print loop
* around the Interpreter class.
@@ -130,7 +131,7 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
def history = in.history
/** The context class loader at the time this object was created */
- protected val originalClassLoader = Thread.currentThread.getContextClassLoader
+ protected val originalClassLoader = Utils.getContextOrSparkClassLoader
// classpath entries added via :cp
var addedClasspath: String = ""
@@ -177,7 +178,7 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
override lazy val formatting = new Formatting {
def prompt = SparkILoop.this.prompt
}
- override protected def parentClassLoader = SparkHelper.explicitParentLoader(settings).getOrElse(classOf[SparkILoop].getClassLoader)
+ override protected def parentClassLoader = SparkHelper.explicitParentLoader(settings).getOrElse(classOf[SparkILoop].getClassLoader)
}
/** Create a new interpreter. */
@@ -871,7 +872,7 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
}
val u: scala.reflect.runtime.universe.type = scala.reflect.runtime.universe
- val m = u.runtimeMirror(getClass.getClassLoader)
+ val m = u.runtimeMirror(Utils.getSparkClassLoader)
private def tagOfStaticClass[T: ClassTag]: u.TypeTag[T] =
u.TypeTag[T](
m,
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
index a001d95359..49fc4f70fd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst
import java.io.{PrintWriter, ByteArrayOutputStream, FileInputStream, File}
+import org.apache.spark.util.{Utils => SparkUtils}
+
package object util {
/**
* Returns a path to a temporary file that probably does not exist.
@@ -54,7 +56,7 @@ package object util {
def resourceToString(
resource:String,
encoding: String = "UTF-8",
- classLoader: ClassLoader = this.getClass.getClassLoader) = {
+ classLoader: ClassLoader = SparkUtils.getSparkClassLoader) = {
val inStream = classLoader.getResourceAsStream(resource)
val outStream = new ByteArrayOutputStream
try {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
index df8220b556..e92cf5ac4f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
@@ -26,6 +26,7 @@ import scala.reflect.runtime.universe.runtimeMirror
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.columnar._
+import org.apache.spark.util.Utils
private[sql] case object PassThrough extends CompressionScheme {
override val typeId = 0
@@ -254,7 +255,7 @@ private[sql] case object DictionaryEncoding extends CompressionScheme {
private val dictionary = {
// TODO Can we clean up this mess? Maybe move this to `DataType`?
implicit val classTag = {
- val mirror = runtimeMirror(getClass.getClassLoader)
+ val mirror = runtimeMirror(Utils.getSparkClassLoader)
ClassTag[T#JvmType](mirror.runtimeClass(columnType.scalaTag.tpe))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
index d8e1b970c1..c30ae5bcc0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
@@ -25,6 +25,7 @@ import com.esotericsoftware.kryo.{Serializer, Kryo}
import org.apache.spark.{SparkEnv, SparkConf}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.MutablePair
+import org.apache.spark.util.Utils
class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) {
override def newKryo(): Kryo = {
@@ -44,7 +45,7 @@ class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) {
kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]])
kryo.register(classOf[scala.math.BigDecimal], new BigDecimalSerializer)
kryo.setReferences(false)
- kryo.setClassLoader(this.getClass.getClassLoader)
+ kryo.setClassLoader(Utils.getSparkClassLoader)
kryo
}
}