aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-01-25 22:36:07 -0800
committerReynold Xin <rxin@apache.org>2014-01-25 22:36:07 -0800
commitc66a2ef1c2dc9c218069b3ce8c39a49e5b92fc16 (patch)
treee6343f303a9f79b1fca0b29900b233d9d5c61f07
parent05be7047744c88e64e7e6bd973f9bcfacd00da5f (diff)
parent740e865f40704dc9158a6cf635990580fb6adcac (diff)
downloadspark-c66a2ef1c2dc9c218069b3ce8c39a49e5b92fc16.tar.gz
spark-c66a2ef1c2dc9c218069b3ce8c39a49e5b92fc16.tar.bz2
spark-c66a2ef1c2dc9c218069b3ce8c39a49e5b92fc16.zip
Merge pull request #511 from JoshRosen/SPARK-1040
Fix ClassCastException in JavaPairRDD.collectAsMap() (SPARK-1040) This fixes [SPARK-1040](https://spark-project.atlassian.net/browse/SPARK-1040), an issue where JavaPairRDD.collectAsMap() could sometimes fail with ClassCastException. I applied the same fix to the Spark Streaming Java APIs. The commit message describes the fix in more detail. I also increased the verbosity of JUnit test output under SBT to make it easier to verify that the Java tests are actually running.
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/JavaAPISuite.java17
-rw-r--r--pom.xml2
-rw-r--r--project/SparkBuild.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala2
6 files changed, 25 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 9680c6f3e1..4db7339e67 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -88,7 +88,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
- def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
+ def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
new JavaPairRDD(rdd.map(f)(cm))(f.keyType(), f.valueType())
}
@@ -119,7 +119,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala
- def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
+ def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(f.keyType(), f.valueType())
}
diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
index 23ec6c3b31..8c573ac0d6 100644
--- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
@@ -387,18 +387,21 @@ public class JavaAPISuite implements Serializable {
return 1.0 * x;
}
}).cache();
+ doubles.collect();
JavaPairRDD<Integer, Integer> pairs = rdd.map(new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer x) {
return new Tuple2<Integer, Integer>(x, x);
}
}).cache();
+ pairs.collect();
JavaRDD<String> strings = rdd.map(new Function<Integer, String>() {
@Override
public String call(Integer x) {
return x.toString();
}
}).cache();
+ strings.collect();
}
@Test
@@ -962,4 +965,18 @@ public class JavaAPISuite implements Serializable {
}
}
+
+ @Test
+ public void collectAsMapWithIntArrayValues() {
+ // Regression test for SPARK-1040
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[] { 1 }));
+ JavaPairRDD<Integer, int[]> pairRDD = rdd.map(new PairFunction<Integer, Integer, int[]>() {
+ @Override
+ public Tuple2<Integer, int[]> call(Integer x) throws Exception {
+ return new Tuple2<Integer, int[]>(x, new int[] { x });
+ }
+ });
+ pairRDD.collect(); // Works fine
+ Map<Integer, int[]> map = pairRDD.collectAsMap(); // Used to crash with ClassCastException
+ }
}
diff --git a/pom.xml b/pom.xml
index 54072b053c..1ac8f0fa07 100644
--- a/pom.xml
+++ b/pom.xml
@@ -389,7 +389,7 @@
<dependency>
<groupId>com.novocode</groupId>
<artifactId>junit-interface</artifactId>
- <version>0.9</version>
+ <version>0.10</version>
<scope>test</scope>
</dependency>
<dependency>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 46a1c640e3..e33f230188 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -212,12 +212,13 @@ object SparkBuild extends Build {
"org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"),
"org.scalatest" %% "scalatest" % "1.9.1" % "test",
"org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
- "com.novocode" % "junit-interface" % "0.9" % "test",
+ "com.novocode" % "junit-interface" % "0.10" % "test",
"org.easymock" % "easymock" % "3.1" % "test",
"org.mockito" % "mockito-all" % "1.8.5" % "test",
"commons-io" % "commons-io" % "2.4" % "test"
),
+ testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"),
parallelExecution := true,
/* Workaround for issue #206 (fixed after SBT 0.11.0) */
watchTransitiveSources <<= Defaults.inDependencies[Task[Seq[File]]](watchSources.task,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index a493a8279f..64fe204cdf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -138,7 +138,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/** Return a new DStream by applying a function to all elements of this DStream. */
def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
- def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
+ def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
}
@@ -159,7 +159,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala
- def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
+ def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType())
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 79fa6a623d..62cfa0a229 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -745,7 +745,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
override val classTag: ClassTag[(K, V)] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K, V]]]
+ implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K, V]]]
}
object JavaPairDStream {