From 740e865f40704dc9158a6cf635990580fb6adcac Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sat, 25 Jan 2014 16:39:20 -0800 Subject: Fix ClassCastException in JavaPairRDD.collectAsMap() (SPARK-1040) This fixes an issue where collectAsMap() could fail when called on a JavaPairRDD that was derived by transforming a non-JavaPairRDD. The root problem was that we were creating the JavaPairRDD's ClassTag by casting a ClassTag[AnyRef] to a ClassTag[Tuple2[K2, V2]]. To fix this, I cast a ClassTag[Tuple2[_, _]] instead, since this actually produces a ClassTag of the appropriate type because ClassTags don't capture type parameters: scala> implicitly[ClassTag[Tuple2[_, _]]] == implicitly[ClassTag[Tuple2[Int, Int]]] res8: Boolean = true scala> implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[Int, Int]]] == implicitly[ClassTag[Tuple2[Int, Int]]] res9: Boolean = false --- .../scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala | 4 ++-- .../scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index a493a8279f..64fe204cdf 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -138,7 +138,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** Return a new DStream by applying a function to all elements of this DStream. */ def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = { - def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] + def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType()) } @@ -159,7 +159,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = { import scala.collection.JavaConverters._ def fn = (x: T) => f.apply(x).asScala - def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] + def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType()) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 79fa6a623d..62cfa0a229 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -745,7 +745,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } override val classTag: ClassTag[(K, V)] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K, V]]] + implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K, V]]] } object JavaPairDStream { -- cgit v1.2.3