aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala24
1 files changed, 12 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index fefe1cb6f1..9f5c5bd30f 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -372,8 +372,8 @@ private[spark] object PythonRDD extends Logging {
batchSize: Int) = {
val keyClass = Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
val valueClass = Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
- val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
- val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
+ val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
+ val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration()))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
@@ -440,9 +440,9 @@ private[spark] object PythonRDD extends Logging {
keyClass: String,
valueClass: String,
conf: Configuration) = {
- val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
- val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
- val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
+ val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
+ val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
+ val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
if (path.isDefined) {
sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
} else {
@@ -509,9 +509,9 @@ private[spark] object PythonRDD extends Logging {
keyClass: String,
valueClass: String,
conf: Configuration) = {
- val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
- val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
- val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
+ val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
+ val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
+ val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
if (path.isDefined) {
sc.sc.hadoopFile(path.get, fc, kc, vc)
} else {
@@ -558,7 +558,7 @@ private[spark] object PythonRDD extends Logging {
for {
k <- Option(keyClass)
v <- Option(valueClass)
- } yield (Class.forName(k), Class.forName(v))
+ } yield (Utils.classForName(k), Utils.classForName(v))
}
private def getKeyValueConverters(keyConverterClass: String, valueConverterClass: String,
@@ -621,10 +621,10 @@ private[spark] object PythonRDD extends Logging {
val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
- val codec = Option(compressionCodecClass).map(Class.forName(_).asInstanceOf[Class[C]])
+ val codec = Option(compressionCodecClass).map(Utils.classForName(_).asInstanceOf[Class[C]])
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new JavaToWritableConverter)
- val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
+ val fc = Utils.classForName(outputFormatClass).asInstanceOf[Class[F]]
converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), codec=codec)
}
@@ -653,7 +653,7 @@ private[spark] object PythonRDD extends Logging {
val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new JavaToWritableConverter)
- val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
+ val fc = Utils.classForName(outputFormatClass).asInstanceOf[Class[F]]
converted.saveAsNewAPIHadoopFile(path, kc, vc, fc, mergedConf)
}