aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala82
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala37
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala85
4 files changed, 108 insertions, 100 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index cd0aea0cb3..3f672900cb 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -17,35 +17,29 @@
package org.apache.spark.api.java
-import java.util.{List => JList}
-import java.util.Comparator
+import java.util.{Comparator, List => JList}
-import scala.Tuple2
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import com.google.common.base.Optional
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.compress.CompressionCodec
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.OutputFormat
+import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.HashPartitioner
-import org.apache.spark.Partitioner
+import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.Partitioner._
import org.apache.spark.SparkContext.rddToPairRDDFunctions
-import org.apache.spark.api.java.function.{Function2 => JFunction2}
-import org.apache.spark.api.java.function.{Function => JFunction}
-import org.apache.spark.partial.BoundedDouble
-import org.apache.spark.partial.PartialResult
-import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.OrderedRDDFunctions
+import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.rdd.{OrderedRDDFunctions, RDD}
import org.apache.spark.storage.StorageLevel
-
-class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K],
- implicit val vClassTag: ClassTag[V]) extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
+class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
+ (implicit val kClassTag: ClassTag[K], implicit val vClassTag: ClassTag[V])
+ extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
@@ -158,7 +152,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
partitioner: Partitioner): JavaPairRDD[K, C] = {
- implicit val cm: ClassTag[C] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
+ implicit val ctag: ClassTag[C] = fakeClassTag
fromRDD(rdd.combineByKey(
createCombiner,
mergeValue,
@@ -284,19 +278,19 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
* RDD will be <= us.
*/
def subtractByKey[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, V] = {
- implicit val cmw: ClassTag[W] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val ctag: ClassTag[W] = fakeClassTag
fromRDD(rdd.subtractByKey(other))
}
/** Return an RDD with the pairs from `this` whose keys are not in `other`. */
def subtractByKey[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, V] = {
- implicit val cmw: ClassTag[W] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val ctag: ClassTag[W] = fakeClassTag
fromRDD(rdd.subtractByKey(other, numPartitions))
}
/** Return an RDD with the pairs from `this` whose keys are not in `other`. */
def subtractByKey[W](other: JavaPairRDD[K, W], p: Partitioner): JavaPairRDD[K, V] = {
- implicit val cmw: ClassTag[W] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val ctag: ClassTag[W] = fakeClassTag
fromRDD(rdd.subtractByKey(other, p))
}
@@ -345,7 +339,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C]): JavaPairRDD[K, C] = {
- implicit val cm: ClassTag[C] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
+ implicit val ctag: ClassTag[C] = fakeClassTag
fromRDD(combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(rdd)))
}
@@ -438,7 +432,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
* this also retains the original RDD's partitioning.
*/
def mapValues[U](f: JFunction[V, U]): JavaPairRDD[K, U] = {
- implicit val cm: ClassTag[U] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
+ implicit val ctag: ClassTag[U] = fakeClassTag
fromRDD(rdd.mapValues(f))
}
@@ -449,7 +443,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
import scala.collection.JavaConverters._
def fn = (x: V) => f.apply(x).asScala
- implicit val cm: ClassTag[U] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
+ implicit val ctag: ClassTag[U] = fakeClassTag
fromRDD(rdd.flatMapValues(fn))
}
@@ -682,31 +676,35 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
}
object JavaPairRDD {
- def groupByResultToJava[K, T](rdd: RDD[(K, Seq[T])])(implicit kcm: ClassTag[K],
- vcm: ClassTag[T]): RDD[(K, JList[T])] =
- rddToPairRDDFunctions(rdd).mapValues(seqAsJavaList _)
-
- def cogroupResultToJava[W, K, V](rdd: RDD[(K, (Seq[V], Seq[W]))])(implicit kcm: ClassTag[K],
- vcm: ClassTag[V]): RDD[(K, (JList[V], JList[W]))] = rddToPairRDDFunctions(rdd)
- .mapValues((x: (Seq[V], Seq[W])) => (seqAsJavaList(x._1), seqAsJavaList(x._2)))
-
- def cogroupResult2ToJava[W1, W2, K, V](rdd: RDD[(K, (Seq[V], Seq[W1],
- Seq[W2]))])(implicit kcm: ClassTag[K]) : RDD[(K, (JList[V], JList[W1],
- JList[W2]))] = rddToPairRDDFunctions(rdd).mapValues(
- (x: (Seq[V], Seq[W1], Seq[W2])) => (seqAsJavaList(x._1),
- seqAsJavaList(x._2),
- seqAsJavaList(x._3)))
-
- def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] =
+ private[spark]
+ def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Seq[T])]): RDD[(K, JList[T])] = {
+ rddToPairRDDFunctions(rdd).mapValues(seqAsJavaList)
+ }
+
+ private[spark]
+ def cogroupResultToJava[K: ClassTag, V, W](
+ rdd: RDD[(K, (Seq[V], Seq[W]))]): RDD[(K, (JList[V], JList[W]))] = {
+ rddToPairRDDFunctions(rdd).mapValues(x => (seqAsJavaList(x._1), seqAsJavaList(x._2)))
+ }
+
+ private[spark]
+ def cogroupResult2ToJava[K: ClassTag, V, W1, W2](
+ rdd: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))]): RDD[(K, (JList[V], JList[W1], JList[W2]))] = {
+ rddToPairRDDFunctions(rdd)
+ .mapValues(x => (seqAsJavaList(x._1), seqAsJavaList(x._2), seqAsJavaList(x._3)))
+ }
+
+ def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = {
new JavaPairRDD[K, V](rdd)
+ }
implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd
/** Convert a JavaRDD of key-value pairs to JavaPairRDD. */
def fromJavaRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = {
- implicit val cmk: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
- implicit val cmv: ClassTag[V] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+ implicit val ctagK: ClassTag[K] = fakeClassTag
+ implicit val ctagV: ClassTag[V] = fakeClassTag
new JavaPairRDD[K, V](rdd.rdd)
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index 7d48ce01cf..7b73057953 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -24,8 +24,8 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.storage.StorageLevel
-class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) extends
-JavaRDDLike[T, JavaRDD[T]] {
+class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
+ extends JavaRDDLike[T, JavaRDD[T]] {
override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd)
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index fcb9729c10..24a9925dbd 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -17,7 +17,8 @@
package org.apache.spark.api.java
-import java.util.{List => JList, Comparator}
+import java.util.{Comparator, List => JList}
+
import scala.Tuple2
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
@@ -25,14 +26,14 @@ import scala.reflect.ClassTag
import com.google.common.base.Optional
import org.apache.hadoop.io.compress.CompressionCodec
-import org.apache.spark.{SparkContext, Partition, TaskContext}
-import org.apache.spark.rdd.RDD
+import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.api.java.JavaPairRDD._
-import org.apache.spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
-import org.apache.spark.partial.{PartialResult, BoundedDouble}
+import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, _}
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
-
trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def wrapRDD(rdd: RDD[T]): This
@@ -88,8 +89,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
- def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
- new JavaPairRDD(rdd.map(f)(cm))(f.keyType(), f.valueType())
+ val ctag = implicitly[ClassTag[Tuple2[K2, V2]]]
+ new JavaPairRDD(rdd.map(f)(ctag))(f.keyType(), f.valueType())
}
/**
@@ -119,8 +120,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala
- def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
- JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(f.keyType(), f.valueType())
+ val ctag = implicitly[ClassTag[Tuple2[K2, V2]]]
+ JavaPairRDD.fromRDD(rdd.flatMap(fn)(ctag))(f.keyType(), f.valueType())
}
/**
@@ -202,10 +203,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* mapping to that key.
*/
def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JList[T]] = {
- implicit val kcm: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
- implicit val vcm: ClassTag[JList[T]] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[JList[T]]]
- JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(f.returnType)))(kcm, vcm)
+ implicit val ctagK: ClassTag[K] = fakeClassTag
+ implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
+ JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(f.returnType)))
}
/**
@@ -213,10 +213,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* mapping to that key.
*/
def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JList[T]] = {
- implicit val kcm: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
- implicit val vcm: ClassTag[JList[T]] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[JList[T]]]
- JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(f.returnType)))(kcm, vcm)
+ implicit val ctagK: ClassTag[K] = fakeClassTag
+ implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
+ JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(f.returnType)))
}
/**
@@ -407,7 +406,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Creates tuples of the elements in this RDD by applying `f`.
*/
def keyBy[K](f: JFunction[T, K]): JavaPairRDD[K, T] = {
- implicit val kcm: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
+ implicit val ctag: ClassTag[K] = fakeClassTag
JavaPairRDD.fromRDD(rdd.keyBy(f))
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 22dc9c9e2e..dc26b7f621 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -23,19 +23,17 @@ import scala.collection.JavaConversions
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
+import com.google.common.base.Optional
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.InputFormat
-import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapred.{InputFormat, JobConf}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-import com.google.common.base.Optional
import org.apache.spark._
-import org.apache.spark.SparkContext.IntAccumulatorParam
-import org.apache.spark.SparkContext.DoubleAccumulatorParam
+import org.apache.spark.SparkContext.{DoubleAccumulatorParam, IntAccumulatorParam}
+import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
-
/**
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
* [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones.
@@ -96,7 +94,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
/** Distribute a local Scala collection to form an RDD. */
def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
- implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ implicit val ctag: ClassTag[T] = fakeClassTag
sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices)
}
@@ -107,8 +105,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
/** Distribute a local Scala collection to form an RDD. */
def parallelizePairs[K, V](list: java.util.List[Tuple2[K, V]], numSlices: Int)
: JavaPairRDD[K, V] = {
- implicit val kcm: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
- implicit val vcm: ClassTag[V] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+ implicit val ctagK: ClassTag[K] = fakeClassTag
+ implicit val ctagV: ClassTag[V] = fakeClassTag
JavaPairRDD.fromRDD(sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices))
}
@@ -149,8 +147,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
valueClass: Class[V],
minSplits: Int
): JavaPairRDD[K, V] = {
- implicit val kcm: ClassTag[K] = ClassTag(keyClass)
- implicit val vcm: ClassTag[V] = ClassTag(valueClass)
+ implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
+ implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits))
}
@@ -163,8 +161,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
*/
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]):
JavaPairRDD[K, V] = {
- implicit val kcm: ClassTag[K] = ClassTag(keyClass)
- implicit val vcm: ClassTag[V] = ClassTag(valueClass)
+ implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
+ implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass))
}
@@ -176,8 +174,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* that there's very little effort required to save arbitrary objects.
*/
def objectFile[T](path: String, minSplits: Int): JavaRDD[T] = {
- implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- sc.objectFile(path, minSplits)(cm)
+ implicit val ctag: ClassTag[T] = fakeClassTag
+ sc.objectFile(path, minSplits)(ctag)
}
/**
@@ -188,8 +186,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* that there's very little effort required to save arbitrary objects.
*/
def objectFile[T](path: String): JavaRDD[T] = {
- implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- sc.objectFile(path)(cm)
+ implicit val ctag: ClassTag[T] = fakeClassTag
+ sc.objectFile(path)(ctag)
}
/**
@@ -209,8 +207,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
valueClass: Class[V],
minSplits: Int
): JavaPairRDD[K, V] = {
- implicit val kcm: ClassTag[K] = ClassTag(keyClass)
- implicit val vcm: ClassTag[V] = ClassTag(valueClass)
+ implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
+ implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits))
}
@@ -229,8 +227,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
keyClass: Class[K],
valueClass: Class[V]
): JavaPairRDD[K, V] = {
- implicit val kcm: ClassTag[K] = ClassTag(keyClass)
- implicit val vcm: ClassTag[V] = ClassTag(valueClass)
+ implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
+ implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass))
}
@@ -248,8 +246,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
valueClass: Class[V],
minSplits: Int
): JavaPairRDD[K, V] = {
- implicit val kcm: ClassTag[K] = ClassTag(keyClass)
- implicit val vcm: ClassTag[V] = ClassTag(valueClass)
+ implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
+ implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits))
}
@@ -266,8 +264,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
keyClass: Class[K],
valueClass: Class[V]
): JavaPairRDD[K, V] = {
- implicit val kcm: ClassTag[K] = ClassTag(keyClass)
- implicit val vcm: ClassTag[V] = ClassTag(valueClass)
+ implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
+ implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.hadoopFile(path,
inputFormatClass, keyClass, valueClass))
}
@@ -287,8 +285,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
kClass: Class[K],
vClass: Class[V],
conf: Configuration): JavaPairRDD[K, V] = {
- implicit val kcm: ClassTag[K] = ClassTag(kClass)
- implicit val vcm: ClassTag[V] = ClassTag(vClass)
+ implicit val ctagK: ClassTag[K] = ClassTag(kClass)
+ implicit val ctagV: ClassTag[V] = ClassTag(vClass)
new JavaPairRDD(sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf))
}
@@ -306,26 +304,26 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
fClass: Class[F],
kClass: Class[K],
vClass: Class[V]): JavaPairRDD[K, V] = {
- implicit val kcm: ClassTag[K] = ClassTag(kClass)
- implicit val vcm: ClassTag[V] = ClassTag(vClass)
+ implicit val ctagK: ClassTag[K] = ClassTag(kClass)
+ implicit val ctagV: ClassTag[V] = ClassTag(vClass)
new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass))
}
/** Build the union of two or more RDDs. */
override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = {
val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
- implicit val cm: ClassTag[T] = first.classTag
- sc.union(rdds)(cm)
+ implicit val ctag: ClassTag[T] = first.classTag
+ sc.union(rdds)
}
/** Build the union of two or more RDDs. */
override def union[K, V](first: JavaPairRDD[K, V], rest: java.util.List[JavaPairRDD[K, V]])
: JavaPairRDD[K, V] = {
val rdds: Seq[RDD[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
- implicit val cm: ClassTag[(K, V)] = first.classTag
- implicit val kcm: ClassTag[K] = first.kClassTag
- implicit val vcm: ClassTag[V] = first.vClassTag
- new JavaPairRDD(sc.union(rdds)(cm))(kcm, vcm)
+ implicit val ctag: ClassTag[(K, V)] = first.classTag
+ implicit val ctagK: ClassTag[K] = first.kClassTag
+ implicit val ctagV: ClassTag[V] = first.vClassTag
+ new JavaPairRDD(sc.union(rdds))
}
/** Build the union of two or more RDDs. */
@@ -447,8 +445,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
def getCheckpointDir = JavaUtils.optionToOptional(sc.getCheckpointDir)
protected def checkpointFile[T](path: String): JavaRDD[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ implicit val ctag: ClassTag[T] = fakeClassTag
new JavaRDD(sc.checkpointFile(path))
}
@@ -535,4 +532,18 @@ object JavaSparkContext {
* your driver program.
*/
def jarOfObject(obj: AnyRef): Array[String] = SparkContext.jarOfObject(obj).toArray
+
+ /**
+ * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
+ *
+ * This method is used to keep ClassTags out of the external Java API, as the Java compiler
+ * cannot produce them automatically. While this ClassTag-faking does please the compiler,
+ * it can cause problems at runtime if the Scala API relies on ClassTags for correctness.
+ *
+ * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior, just worse performance
+ * or security issues. For instance, an Array[AnyRef] can hold any type T, but may lose primitive
+ * specialization.
+ */
+ private[spark]
+ def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
}