aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-07-27 12:23:27 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-07-27 12:23:27 -0700
commitb51d733a5783ef29077951e842882bb002a4139e (patch)
treed8098fcdfa33408d7716aedc80057836cb6f2bcc /core
parent5c5aa2ff8135b531ba84861157693144d42ef455 (diff)
downloadspark-b51d733a5783ef29077951e842882bb002a4139e.tar.gz
spark-b51d733a5783ef29077951e842882bb002a4139e.tar.bz2
spark-b51d733a5783ef29077951e842882bb002a4139e.zip
Fixed Java union methods having same erasure.
Changed union() methods on lists to take a separate "first element" argument in order to differentiate them to the compiler, because Java 7 considered it an error to have them all take Lists parameterized with different types.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/api/java/JavaSparkContext.scala23
-rw-r--r--core/src/main/scala/spark/api/java/JavaSparkContextVarargsWorkaround.java42
-rw-r--r--core/src/test/scala/spark/JavaAPISuite.java7
3 files changed, 47 insertions, 25 deletions
diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
index 00e1aff58c..2d43bfa4ef 100644
--- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
@@ -174,22 +174,23 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass))
}
- override def union[T](jrdds: java.util.List[JavaRDD[T]]): JavaRDD[T] = {
- val rdds: Seq[RDD[T]] = asScalaBuffer(jrdds).map(_.rdd)
- implicit val cm: ClassManifest[T] = jrdds.head.classManifest
+ override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = {
+ val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
+ implicit val cm: ClassManifest[T] = first.classManifest
sc.union(rdds: _*)(cm)
}
- override def union[K, V](jrdds: java.util.List[JavaPairRDD[K, V]]): JavaPairRDD[K, V] = {
- val rdds: Seq[RDD[(K, V)]] = asScalaBuffer(jrdds).map(_.rdd)
- implicit val cm: ClassManifest[(K, V)] = jrdds.head.classManifest
- implicit val kcm: ClassManifest[K] = jrdds.head.kManifest
- implicit val vcm: ClassManifest[V] = jrdds.head.vManifest
+ override def union[K, V](first: JavaPairRDD[K, V], rest: java.util.List[JavaPairRDD[K, V]])
+ : JavaPairRDD[K, V] = {
+ val rdds: Seq[RDD[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
+ implicit val cm: ClassManifest[(K, V)] = first.classManifest
+ implicit val kcm: ClassManifest[K] = first.kManifest
+ implicit val vcm: ClassManifest[V] = first.vManifest
new JavaPairRDD(sc.union(rdds: _*)(cm))(kcm, vcm)
}
- override def union(jrdds: java.util.List[JavaDoubleRDD]): JavaDoubleRDD = {
- val rdds: Seq[RDD[Double]] = asScalaBuffer(jrdds).map(_.srdd)
+ override def union(first: JavaDoubleRDD, rest: java.util.List[JavaDoubleRDD]): JavaDoubleRDD = {
+ val rdds: Seq[RDD[Double]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.srdd)
new JavaDoubleRDD(sc.union(rdds: _*))
}
@@ -215,4 +216,4 @@ object JavaSparkContext {
implicit def fromSparkContext(sc: SparkContext): JavaSparkContext = new JavaSparkContext(sc)
implicit def toSparkContext(jsc: JavaSparkContext): SparkContext = jsc.sc
-} \ No newline at end of file
+}
diff --git a/core/src/main/scala/spark/api/java/JavaSparkContextVarargsWorkaround.java b/core/src/main/scala/spark/api/java/JavaSparkContextVarargsWorkaround.java
index 505a090f67..97344e73da 100644
--- a/core/src/main/scala/spark/api/java/JavaSparkContextVarargsWorkaround.java
+++ b/core/src/main/scala/spark/api/java/JavaSparkContextVarargsWorkaround.java
@@ -1,25 +1,47 @@
package spark.api.java;
import java.util.Arrays;
+import java.util.ArrayList;
import java.util.List;
// See
// http://scala-programming-language.1934581.n4.nabble.com/Workaround-for-implementing-java-varargs-in-2-7-2-final-tp1944767p1944772.html
abstract class JavaSparkContextVarargsWorkaround {
- public <T> JavaRDD<T> union(JavaRDD<T> ... rdds) {
- return union(Arrays.asList(rdds));
+ public <T> JavaRDD<T> union(JavaRDD<T>... rdds) {
+ if (rdds.length == 0) {
+ throw new IllegalArgumentException("Union called on empty list");
+ }
+ ArrayList<JavaRDD<T>> rest = new ArrayList<JavaRDD<T>>(rdds.length - 1);
+ for (int i = 1; i < rdds.length; i++) {
+ rest.add(rdds[i]);
+ }
+ return union(rdds[0], rest);
}
- public JavaDoubleRDD union(JavaDoubleRDD ... rdds) {
- return union(Arrays.asList(rdds));
+ public JavaDoubleRDD union(JavaDoubleRDD... rdds) {
+ if (rdds.length == 0) {
+ throw new IllegalArgumentException("Union called on empty list");
+ }
+ ArrayList<JavaDoubleRDD> rest = new ArrayList<JavaDoubleRDD>(rdds.length - 1);
+ for (int i = 1; i < rdds.length; i++) {
+ rest.add(rdds[i]);
+ }
+ return union(rdds[0], rest);
}
- public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V> ... rdds) {
- return union(Arrays.asList(rdds));
+ public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V>... rdds) {
+ if (rdds.length == 0) {
+ throw new IllegalArgumentException("Union called on empty list");
+ }
+ ArrayList<JavaPairRDD<K, V>> rest = new ArrayList<JavaPairRDD<K, V>>(rdds.length - 1);
+ for (int i = 1; i < rdds.length; i++) {
+ rest.add(rdds[i]);
+ }
+ return union(rdds[0], rest);
}
- abstract public <T> JavaRDD<T> union(List<JavaRDD<T>> rdds);
- abstract public JavaDoubleRDD union(List<JavaDoubleRDD> rdds);
- abstract public <K, V> JavaPairRDD<K, V> union(List<JavaPairRDD<K, V>> rdds);
-
+ // These methods take separate "first" and "rest" elements to avoid having the same type erasure
+ abstract public <T> JavaRDD<T> union(JavaRDD<T> first, List<JavaRDD<T>> rest);
+ abstract public JavaDoubleRDD union(JavaDoubleRDD first, List<JavaDoubleRDD> rest);
+ abstract public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V> first, List<JavaPairRDD<K, V>> rest);
}
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index 436a8ab0c7..5f0293e55b 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -66,10 +66,9 @@ public class JavaAPISuite implements Serializable {
JavaRDD<String> sUnion = sc.union(s1, s2);
Assert.assertEquals(4, sUnion.count());
// List
- List<JavaRDD<String>> srdds = new ArrayList<JavaRDD<String>>();
- srdds.add(s1);
- srdds.add(s2);
- sUnion = sc.union(srdds);
+ List<JavaRDD<String>> list = new ArrayList<JavaRDD<String>>();
+ list.add(s2);
+ sUnion = sc.union(s1, list);
Assert.assertEquals(4, sUnion.count());
// Union of JavaDoubleRDDs