diff options
author | Reynold Xin <rxin@apache.org> | 2014-01-25 22:36:07 -0800 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-01-25 22:36:07 -0800 |
commit | c66a2ef1c2dc9c218069b3ce8c39a49e5b92fc16 (patch) | |
tree | e6343f303a9f79b1fca0b29900b233d9d5c61f07 | |
parent | 05be7047744c88e64e7e6bd973f9bcfacd00da5f (diff) | |
parent | 740e865f40704dc9158a6cf635990580fb6adcac (diff) | |
download | spark-c66a2ef1c2dc9c218069b3ce8c39a49e5b92fc16.tar.gz spark-c66a2ef1c2dc9c218069b3ce8c39a49e5b92fc16.tar.bz2 spark-c66a2ef1c2dc9c218069b3ce8c39a49e5b92fc16.zip |
Merge pull request #511 from JoshRosen/SPARK-1040
Fix ClassCastException in JavaPairRDD.collectAsMap() (SPARK-1040)
This fixes [SPARK-1040](https://spark-project.atlassian.net/browse/SPARK-1040), an issue where JavaPairRDD.collectAsMap() could sometimes fail with ClassCastException. I applied the same fix to the Spark Streaming Java APIs. The commit message describes the fix in more detail.
I also increased the verbosity of JUnit test output under SBT to make it easier to verify that the Java tests are actually running.
6 files changed, 25 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 9680c6f3e1..4db7339e67 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -88,7 +88,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return a new RDD by applying a function to all elements of this RDD. */ def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = { - def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] + def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] new JavaPairRDD(rdd.map(f)(cm))(f.keyType(), f.valueType()) } @@ -119,7 +119,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = { import scala.collection.JavaConverters._ def fn = (x: T) => f.apply(x).asScala - def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] + def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(f.keyType(), f.valueType()) } diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java index 23ec6c3b31..8c573ac0d6 100644 --- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java @@ -387,18 +387,21 @@ public class JavaAPISuite implements Serializable { return 1.0 * x; } }).cache(); + doubles.collect(); JavaPairRDD<Integer, Integer> pairs = rdd.map(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer x) { return new Tuple2<Integer, Integer>(x, x); } }).cache(); + pairs.collect(); JavaRDD<String> strings = rdd.map(new Function<Integer, String>() { @Override public String call(Integer x) { return x.toString(); } }).cache(); + strings.collect(); } @Test @@ -962,4 +965,18 @@ public class JavaAPISuite implements Serializable { } } + + @Test + public void collectAsMapWithIntArrayValues() { + // Regression test for SPARK-1040 + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[] { 1 })); + JavaPairRDD<Integer, int[]> pairRDD = rdd.map(new PairFunction<Integer, Integer, int[]>() { + @Override + public Tuple2<Integer, int[]> call(Integer x) throws Exception { + return new Tuple2<Integer, int[]>(x, new int[] { x }); + } + }); + pairRDD.collect(); // Works fine + Map<Integer, int[]> map = pairRDD.collectAsMap(); // Used to crash with ClassCastException + } } @@ -389,7 +389,7 @@ <dependency> <groupId>com.novocode</groupId> <artifactId>junit-interface</artifactId> - <version>0.9</version> + <version>0.10</version> <scope>test</scope> </dependency> <dependency> diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 46a1c640e3..e33f230188 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -212,12 +212,13 @@ object SparkBuild extends Build { "org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"), "org.scalatest" %% "scalatest" % "1.9.1" % "test", "org.scalacheck" %% "scalacheck" % "1.10.0" % "test", - "com.novocode" % "junit-interface" % "0.9" % "test", + "com.novocode" % "junit-interface" % "0.10" % "test", "org.easymock" % "easymock" % "3.1" % "test", "org.mockito" % "mockito-all" % "1.8.5" % "test", "commons-io" % "commons-io" % "2.4" % "test" ), + testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"), parallelExecution := true, /* Workaround for issue #206 (fixed after SBT 0.11.0) */ watchTransitiveSources <<= Defaults.inDependencies[Task[Seq[File]]](watchSources.task, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index a493a8279f..64fe204cdf 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -138,7 +138,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** Return a new DStream by applying a function to all elements of this DStream. */ def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = { - def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] + def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType()) } @@ -159,7 +159,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = { import scala.collection.JavaConverters._ def fn = (x: T) => f.apply(x).asScala - def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] + def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType()) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 79fa6a623d..62cfa0a229 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -745,7 +745,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } override val classTag: ClassTag[(K, V)] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K, V]]] + implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K, V]]] } object JavaPairDStream { |