aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/Logging.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/io/CompressionCodec.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/SizeEstimator.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/FileSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala2
33 files changed, 73 insertions, 57 deletions
diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala
index 87ab099267..f0598816d6 100644
--- a/core/src/main/scala/org/apache/spark/Logging.scala
+++ b/core/src/main/scala/org/apache/spark/Logging.scala
@@ -159,7 +159,7 @@ private object Logging {
try {
// We use reflection here to handle the case where users remove the
// slf4j-to-jul bridge order to route their logs to JUL.
- val bridgeClass = Class.forName("org.slf4j.bridge.SLF4JBridgeHandler")
+ val bridgeClass = Utils.classForName("org.slf4j.bridge.SLF4JBridgeHandler")
bridgeClass.getMethod("removeHandlersForRootLogger").invoke(null)
val installed = bridgeClass.getMethod("isInstalled").invoke(null).asInstanceOf[Boolean]
if (!installed) {
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 82704b1ab2..bd1cc332a6 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1968,7 +1968,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
for (className <- listenerClassNames) {
// Use reflection to find the right constructor
val constructors = {
- val listenerClass = Class.forName(className)
+ val listenerClass = Utils.classForName(className)
listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: SparkListener]]]
}
val constructorTakingSparkConf = constructors.find { c =>
@@ -2503,7 +2503,7 @@ object SparkContext extends Logging {
"\"yarn-standalone\" is deprecated as of Spark 1.0. Use \"yarn-cluster\" instead.")
}
val scheduler = try {
- val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
+ val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
val cons = clazz.getConstructor(classOf[SparkContext])
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
} catch {
@@ -2515,7 +2515,7 @@ object SparkContext extends Logging {
}
val backend = try {
val clazz =
- Class.forName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
+ Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
@@ -2528,8 +2528,7 @@ object SparkContext extends Logging {
case "yarn-client" =>
val scheduler = try {
- val clazz =
- Class.forName("org.apache.spark.scheduler.cluster.YarnScheduler")
+ val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnScheduler")
val cons = clazz.getConstructor(classOf[SparkContext])
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
@@ -2541,7 +2540,7 @@ object SparkContext extends Logging {
val backend = try {
val clazz =
- Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
+ Utils.classForName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index d18fc599e9..adfece4d6e 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -261,7 +261,7 @@ object SparkEnv extends Logging {
// Create an instance of the class with the given name, possibly initializing it with our conf
def instantiateClass[T](className: String): T = {
- val cls = Class.forName(className, true, Utils.getContextOrSparkClassLoader)
+ val cls = Utils.classForName(className)
// Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
// SparkConf, then one taking no arguments
try {
diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
index 4b8f7fe924..9658e9a696 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
@@ -26,6 +26,7 @@ import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
import org.apache.spark.Logging
import org.apache.spark.api.r.SerDe._
+import org.apache.spark.util.Utils
/**
* Handler for RBackend
@@ -88,21 +89,6 @@ private[r] class RBackendHandler(server: RBackend)
ctx.close()
}
- // Looks up a class given a class name. This function first checks the
- // current class loader and if a class is not found, it looks up the class
- // in the context class loader. Address [SPARK-5185]
- def getStaticClass(objId: String): Class[_] = {
- try {
- val clsCurrent = Class.forName(objId)
- clsCurrent
- } catch {
- // Use contextLoader if we can't find the JAR in the system class loader
- case e: ClassNotFoundException =>
- val clsContext = Class.forName(objId, true, Thread.currentThread().getContextClassLoader)
- clsContext
- }
- }
-
def handleMethodCall(
isStatic: Boolean,
objId: String,
@@ -113,7 +99,7 @@ private[r] class RBackendHandler(server: RBackend)
var obj: Object = null
try {
val cls = if (isStatic) {
- getStaticClass(objId)
+ Utils.classForName(objId)
} else {
JVMObjectTracker.get(objId) match {
case None => throw new IllegalArgumentException("Object not found " + objId)
diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
index 685313ac00..fac6666bb3 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong
import scala.reflect.ClassTag
import org.apache.spark._
+import org.apache.spark.util.Utils
private[spark] class BroadcastManager(
val isDriver: Boolean,
@@ -42,7 +43,7 @@ private[spark] class BroadcastManager(
conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory")
broadcastFactory =
- Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
+ Utils.classForName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
// Initialize appropriate BroadcastFactory and BroadcastObject
broadcastFactory.initialize(isDriver, conf, securityManager)
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 6d14590a1d..9f94118829 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -178,7 +178,7 @@ class SparkHadoopUtil extends Logging {
private def getFileSystemThreadStatisticsMethod(methodName: String): Method = {
val statisticsDataClass =
- Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
+ Utils.classForName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
statisticsDataClass.getDeclaredMethod(methodName)
}
@@ -356,7 +356,7 @@ object SparkHadoopUtil {
System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
if (yarnMode) {
try {
- Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
+ Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
.newInstance()
.asInstanceOf[SparkHadoopUtil]
} catch {
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 7089a7e267..036cb6e054 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -624,7 +624,7 @@ object SparkSubmit {
var mainClass: Class[_] = null
try {
- mainClass = Class.forName(childMainClass, true, loader)
+ mainClass = Utils.classForName(childMainClass)
} catch {
case e: ClassNotFoundException =>
e.printStackTrace(printStream)
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index ebb39c354d..b3710073e3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -576,7 +576,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
System.setSecurityManager(sm)
try {
- Class.forName(mainClass).getMethod("main", classOf[Array[String]])
+ Utils.classForName(mainClass).getMethod("main", classOf[Array[String]])
.invoke(null, Array(HELP))
} catch {
case e: InvocationTargetException =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 10638afb74..a076a9c3f9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -228,7 +228,7 @@ object HistoryServer extends Logging {
val providerName = conf.getOption("spark.history.provider")
.getOrElse(classOf[FsHistoryProvider].getName())
- val provider = Class.forName(providerName)
+ val provider = Utils.classForName(providerName)
.getConstructor(classOf[SparkConf])
.newInstance(conf)
.asInstanceOf[ApplicationHistoryProvider]
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 48070768f6..245b047e7d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -172,7 +172,7 @@ private[master] class Master(
new FileSystemRecoveryModeFactory(conf, SerializationExtension(actorSystem))
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
- val clazz = Class.forName(conf.get("spark.deploy.recoveryMode.factory"))
+ val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serialization])
.newInstance(conf, SerializationExtension(actorSystem))
.asInstanceOf[StandaloneRecoveryModeFactory]
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala
index e6615a3174..ef5a7e35ad 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala
@@ -128,7 +128,7 @@ private[spark] object SubmitRestProtocolMessage {
*/
def fromJson(json: String): SubmitRestProtocolMessage = {
val className = parseAction(json)
- val clazz = Class.forName(packagePrefix + "." + className)
+ val clazz = Utils.classForName(packagePrefix + "." + className)
.asSubclass[SubmitRestProtocolMessage](classOf[SubmitRestProtocolMessage])
fromJson(json, clazz)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
index 2d6be3042c..6799f78ec0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -53,7 +53,7 @@ object DriverWrapper {
Thread.currentThread.setContextClassLoader(loader)
// Delegate to supplied main class
- val clazz = Class.forName(mainClass, true, loader)
+ val clazz = Utils.classForName(mainClass)
val mainMethod = clazz.getMethod("main", classOf[Array[String]])
mainMethod.invoke(null, extraArgs.toArray[String])
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
index e89d076802..5181142c5f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
@@ -149,6 +149,7 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) {
val ibmVendor = System.getProperty("java.vendor").contains("IBM")
var totalMb = 0
try {
+ // scalastyle:off classforname
val bean = ManagementFactory.getOperatingSystemMXBean()
if (ibmVendor) {
val beanClass = Class.forName("com.ibm.lang.management.OperatingSystemMXBean")
@@ -159,6 +160,7 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) {
val method = beanClass.getDeclaredMethod("getTotalPhysicalMemorySize")
totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt
}
+ // scalastyle:on classforname
} catch {
case e: Exception => {
totalMb = 2*1024
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index f7ef92bc80..1a02051c87 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -356,7 +356,7 @@ private[spark] class Executor(
logInfo("Using REPL class URI: " + classUri)
try {
val _userClassPathFirst: java.lang.Boolean = userClassPathFirst
- val klass = Class.forName("org.apache.spark.repl.ExecutorClassLoader")
+ val klass = Utils.classForName("org.apache.spark.repl.ExecutorClassLoader")
.asInstanceOf[Class[_ <: ClassLoader]]
val constructor = klass.getConstructor(classOf[SparkConf], classOf[String],
classOf[ClassLoader], classOf[Boolean])
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 0d8ac1f80a..607d5a321e 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -63,8 +63,7 @@ private[spark] object CompressionCodec {
def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
val codecClass = shortCompressionCodecNames.getOrElse(codecName.toLowerCase, codecName)
val codec = try {
- val ctor = Class.forName(codecClass, true, Utils.getContextOrSparkClassLoader)
- .getConstructor(classOf[SparkConf])
+ val ctor = Utils.classForName(codecClass).getConstructor(classOf[SparkConf])
Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec])
} catch {
case e: ClassNotFoundException => None
diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
index 818f7a4c8d..87df42748b 100644
--- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.{OutputCommitter => MapReduceOutputCommitter}
import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.{Logging, SparkEnv, TaskContext}
+import org.apache.spark.util.{Utils => SparkUtils}
private[spark]
trait SparkHadoopMapRedUtil {
@@ -64,10 +65,10 @@ trait SparkHadoopMapRedUtil {
private def firstAvailableClass(first: String, second: String): Class[_] = {
try {
- Class.forName(first)
+ SparkUtils.classForName(first)
} catch {
case e: ClassNotFoundException =>
- Class.forName(second)
+ SparkUtils.classForName(second)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala b/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala
index 390d148bc9..943ebcb7bd 100644
--- a/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala
@@ -21,6 +21,7 @@ import java.lang.{Boolean => JBoolean, Integer => JInteger}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{JobContext, JobID, TaskAttemptContext, TaskAttemptID}
+import org.apache.spark.util.Utils
private[spark]
trait SparkHadoopMapReduceUtil {
@@ -46,7 +47,7 @@ trait SparkHadoopMapReduceUtil {
isMap: Boolean,
taskId: Int,
attemptId: Int): TaskAttemptID = {
- val klass = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptID")
+ val klass = Utils.classForName("org.apache.hadoop.mapreduce.TaskAttemptID")
try {
// First, attempt to use the old-style constructor that takes a boolean isMap
// (not available in YARN)
@@ -57,7 +58,7 @@ trait SparkHadoopMapReduceUtil {
} catch {
case exc: NoSuchMethodException => {
// If that failed, look for the new constructor that takes a TaskType (not available in 1.x)
- val taskTypeClass = Class.forName("org.apache.hadoop.mapreduce.TaskType")
+ val taskTypeClass = Utils.classForName("org.apache.hadoop.mapreduce.TaskType")
.asInstanceOf[Class[Enum[_]]]
val taskType = taskTypeClass.getMethod("valueOf", classOf[String]).invoke(
taskTypeClass, if (isMap) "MAP" else "REDUCE")
@@ -71,10 +72,10 @@ trait SparkHadoopMapReduceUtil {
private def firstAvailableClass(first: String, second: String): Class[_] = {
try {
- Class.forName(first)
+ Utils.classForName(first)
} catch {
case e: ClassNotFoundException =>
- Class.forName(second)
+ Utils.classForName(second)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index ed5131c79f..67f64d5e27 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -20,6 +20,8 @@ package org.apache.spark.metrics
import java.util.Properties
import java.util.concurrent.TimeUnit
+import org.apache.spark.util.Utils
+
import scala.collection.mutable
import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
@@ -166,7 +168,7 @@ private[spark] class MetricsSystem private (
sourceConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
try {
- val source = Class.forName(classPath).newInstance()
+ val source = Utils.classForName(classPath).newInstance()
registerSource(source.asInstanceOf[Source])
} catch {
case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
@@ -182,7 +184,7 @@ private[spark] class MetricsSystem private (
val classPath = kv._2.getProperty("class")
if (null != classPath) {
try {
- val sink = Class.forName(classPath)
+ val sink = Utils.classForName(classPath)
.getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
.newInstance(kv._2, registry, securityMgr)
if (kv._1 == "servlet") {
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index bee59a437f..f1c17369cb 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -383,11 +383,11 @@ private[spark] object HadoopRDD extends Logging {
private[spark] class SplitInfoReflections {
val inputSplitWithLocationInfo =
- Class.forName("org.apache.hadoop.mapred.InputSplitWithLocationInfo")
+ Utils.classForName("org.apache.hadoop.mapred.InputSplitWithLocationInfo")
val getLocationInfo = inputSplitWithLocationInfo.getMethod("getLocationInfo")
- val newInputSplit = Class.forName("org.apache.hadoop.mapreduce.InputSplit")
+ val newInputSplit = Utils.classForName("org.apache.hadoop.mapreduce.InputSplit")
val newGetLocationInfo = newInputSplit.getMethod("getLocationInfo")
- val splitLocationInfo = Class.forName("org.apache.hadoop.mapred.SplitLocationInfo")
+ val splitLocationInfo = Utils.classForName("org.apache.hadoop.mapred.SplitLocationInfo")
val isInMemory = splitLocationInfo.getMethod("isInMemory")
val getLocation = splitLocationInfo.getMethod("getLocation")
}
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
index 1709bdf560..c9fcc7a36c 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
@@ -39,8 +39,7 @@ private[spark] object RpcEnv {
val rpcEnvNames = Map("akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory")
val rpcEnvName = conf.get("spark.rpc", "akka")
val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)
- Class.forName(rpcEnvFactoryClassName, true, Utils.getContextOrSparkClassLoader).
- newInstance().asInstanceOf[RpcEnvFactory]
+ Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory]
}
def create(
diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index 698d1384d5..4a5274b46b 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -62,8 +62,11 @@ private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoa
extends DeserializationStream {
private val objIn = new ObjectInputStream(in) {
- override def resolveClass(desc: ObjectStreamClass): Class[_] =
+ override def resolveClass(desc: ObjectStreamClass): Class[_] = {
+ // scalastyle:off classforname
Class.forName(desc.getName, false, loader)
+ // scalastyle:on classforname
+ }
}
def readObject[T: ClassTag](): T = objIn.readObject().asInstanceOf[T]
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index ed35cffe96..7cb6e08053 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -102,6 +102,7 @@ class KryoSerializer(conf: SparkConf)
kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer())
try {
+ // scalastyle:off classforname
// Use the default classloader when calling the user registrator.
Thread.currentThread.setContextClassLoader(classLoader)
// Register classes given through spark.kryo.classesToRegister.
@@ -111,6 +112,7 @@ class KryoSerializer(conf: SparkConf)
userRegistrator
.map(Class.forName(_, true, classLoader).newInstance().asInstanceOf[KryoRegistrator])
.foreach { reg => reg.registerClasses(kryo) }
+ // scalastyle:on classforname
} catch {
case e: Exception =>
throw new SparkException(s"Failed to register classes with Kryo", e)
diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala b/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala
index cc2f050681..a1b1e1631e 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala
@@ -407,7 +407,9 @@ private[spark] object SerializationDebugger extends Logging {
/** ObjectStreamClass$ClassDataSlot.desc field */
val DescField: Field = {
+ // scalastyle:off classforname
val f = Class.forName("java.io.ObjectStreamClass$ClassDataSlot").getDeclaredField("desc")
+ // scalastyle:on classforname
f.setAccessible(true)
f
}
diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
index 291394ed34..db965d54ba 100644
--- a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
@@ -192,7 +192,7 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
.getOrElse(ExternalBlockStore.DEFAULT_BLOCK_MANAGER_NAME)
try {
- val instance = Class.forName(clsName)
+ val instance = Utils.classForName(clsName)
.newInstance()
.asInstanceOf[ExternalBlockManager]
instance.init(blockManager, executorId)
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 305de4c755..43626b4ef4 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -448,10 +448,12 @@ private class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM
if (op == INVOKESPECIAL && name == "<init>" && argTypes.length > 0
&& argTypes(0).toString.startsWith("L") // is it an object?
&& argTypes(0).getInternalName == myName) {
+ // scalastyle:off classforname
output += Class.forName(
owner.replace('/', '.'),
false,
Thread.currentThread.getContextClassLoader)
+ // scalastyle:on classforname
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index 0180399c9d..7d84468f62 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -124,9 +124,11 @@ object SizeEstimator extends Logging {
val server = ManagementFactory.getPlatformMBeanServer()
// NOTE: This should throw an exception in non-Sun JVMs
+ // scalastyle:off classforname
val hotSpotMBeanClass = Class.forName("com.sun.management.HotSpotDiagnosticMXBean")
val getVMMethod = hotSpotMBeanClass.getDeclaredMethod("getVMOption",
Class.forName("java.lang.String"))
+ // scalastyle:on classforname
val bean = ManagementFactory.newPlatformMXBeanProxy(server,
hotSpotMBeanName, hotSpotMBeanClass)
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index b6b932104a..e6374f17d8 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -113,8 +113,11 @@ private[spark] object Utils extends Logging {
def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = {
val bis = new ByteArrayInputStream(bytes)
val ois = new ObjectInputStream(bis) {
- override def resolveClass(desc: ObjectStreamClass): Class[_] =
+ override def resolveClass(desc: ObjectStreamClass): Class[_] = {
+ // scalastyle:off classforname
Class.forName(desc.getName, false, loader)
+ // scalastyle:on classforname
+ }
}
ois.readObject.asInstanceOf[T]
}
@@ -177,12 +180,16 @@ private[spark] object Utils extends Logging {
/** Determines whether the provided class is loadable in the current thread. */
def classIsLoadable(clazz: String): Boolean = {
+ // scalastyle:off classforname
Try { Class.forName(clazz, false, getContextOrSparkClassLoader) }.isSuccess
+ // scalastyle:on classforname
}
+ // scalastyle:off classforname
/** Preferred alternative to Class.forName(className) */
def classForName(className: String): Class[_] = {
Class.forName(className, true, getContextOrSparkClassLoader)
+ // scalastyle:on classforname
}
/**
@@ -2266,7 +2273,7 @@ private [util] class SparkShutdownHookManager {
val hookTask = new Runnable() {
override def run(): Unit = runAll()
}
- Try(Class.forName("org.apache.hadoop.util.ShutdownHookManager")) match {
+ Try(Utils.classForName("org.apache.hadoop.util.ShutdownHookManager")) match {
case Success(shmClass) =>
val fsPriority = classOf[FileSystem].getField("SHUTDOWN_HOOK_PRIORITY").get()
.asInstanceOf[Int]
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 1d8fade90f..418763f4e5 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -179,6 +179,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
}
test("object files of classes from a JAR") {
+ // scalastyle:off classforname
val original = Thread.currentThread().getContextClassLoader
val className = "FileSuiteObjectFileTest"
val jar = TestUtils.createJarWithClasses(Seq(className))
@@ -201,6 +202,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
finally {
Thread.currentThread().setContextClassLoader(original)
}
+ // scalastyle:on classforname
}
test("write SequenceFile using new Hadoop API") {
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index f89e3d0a49..dba46f101c 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark
import org.scalatest.PrivateMethodTester
+import org.apache.spark.util.Utils
import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, SparkDeploySchedulerBackend}
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
@@ -131,7 +132,7 @@ class SparkContextSchedulerCreationSuite
def testYarn(master: String, expectedClassName: String) {
try {
val sched = createTaskScheduler(master)
- assert(sched.getClass === Class.forName(expectedClassName))
+ assert(sched.getClass === Utils.classForName(expectedClassName))
} catch {
case e: SparkException =>
assert(e.getMessage.contains("YARN mode not available"))
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index e7878bde6f..343d28eef8 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -541,8 +541,8 @@ object JarCreationTest extends Logging {
val result = sc.makeRDD(1 to 100, 10).mapPartitions { x =>
var exception: String = null
try {
- Class.forName(args(0), true, Thread.currentThread().getContextClassLoader)
- Class.forName(args(1), true, Thread.currentThread().getContextClassLoader)
+ Utils.classForName(args(0))
+ Utils.classForName(args(1))
} catch {
case t: Throwable =>
exception = t + "\n" + t.getStackTraceString
diff --git a/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala
index 08215a2baf..05013fbc49 100644
--- a/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala
@@ -22,11 +22,12 @@ import java.sql._
import org.scalatest.BeforeAndAfter
import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite}
+import org.apache.spark.util.Utils
class JdbcRDDSuite extends SparkFunSuite with BeforeAndAfter with LocalSparkContext {
before {
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver")
+ Utils.classForName("org.apache.derby.jdbc.EmbeddedDriver")
val conn = DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb;create=true")
try {
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
index 63a8480c9b..353b97469c 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
@@ -59,7 +59,9 @@ object KryoDistributedTest {
class AppJarRegistrator extends KryoRegistrator {
override def registerClasses(k: Kryo) {
val classLoader = Thread.currentThread.getContextClassLoader
+ // scalastyle:off classforname
k.register(Class.forName(AppJarRegistrator.customClassName, true, classLoader))
+ // scalastyle:on classforname
}
}
diff --git a/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
index 4212554743..d3d464e84f 100644
--- a/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
@@ -84,7 +84,9 @@ class MutableURLClassLoaderSuite extends SparkFunSuite {
try {
sc.makeRDD(1 to 5, 2).mapPartitions { x =>
val loader = Thread.currentThread().getContextClassLoader
+ // scalastyle:off classforname
Class.forName(className, true, loader).newInstance()
+ // scalastyle:on classforname
Seq().iterator
}.count()
}