aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-02-22 14:56:18 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-02-22 14:56:18 -0800
commit7341de0d48a4f9d11e13e3cd47c8fb2c3ab9d23e (patch)
tree67c6260d24eb3ef32aaaa7ae9c2ac595c39cf1c8 /core
parentfcf7fa84486b743bc84730dfaa29927e25c1690d (diff)
parente9fb25426ea0b6dbe4c946243a2ac0836b031c58 (diff)
downloadspark-7341de0d48a4f9d11e13e3cd47c8fb2c3ab9d23e.tar.gz
spark-7341de0d48a4f9d11e13e3cd47c8fb2c3ab9d23e.tar.bz2
spark-7341de0d48a4f9d11e13e3cd47c8fb2c3ab9d23e.zip
Merge pull request #475 from JoshRosen/spark-668
Remove hack workaround for SPARK-668
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDDLike.scala13
-rw-r--r--core/src/main/scala/spark/api/java/PairFlatMapWorkaround.java20
2 files changed, 7 insertions, 26 deletions
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index 90b45cf875..d884529d7a 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -12,7 +12,7 @@ import spark.storage.StorageLevel
import com.google.common.base.Optional
-trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround[T] {
+trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def wrapRDD(rdd: RDD[T]): This
implicit val classManifest: ClassManifest[T]
@@ -82,12 +82,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround
}
/**
- * Part of the workaround for SPARK-668; called in PairFlatMapWorkaround.java.
+ * Return a new RDD by first applying a function to all elements of this
+ * RDD, and then flattening the results.
*/
- private[spark] def doFlatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairRDD[K, V] = {
+ def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala
- def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
+ def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(f.keyType(), f.valueType())
}
@@ -110,8 +111,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitions[K, V](f: PairFlatMapFunction[java.util.Iterator[T], K, V]):
- JavaPairRDD[K, V] = {
+ def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
+ JavaPairRDD[K2, V2] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(f.keyType(), f.valueType())
}
diff --git a/core/src/main/scala/spark/api/java/PairFlatMapWorkaround.java b/core/src/main/scala/spark/api/java/PairFlatMapWorkaround.java
deleted file mode 100644
index 68b6fd6622..0000000000
--- a/core/src/main/scala/spark/api/java/PairFlatMapWorkaround.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package spark.api.java;
-
-import spark.api.java.JavaPairRDD;
-import spark.api.java.JavaRDDLike;
-import spark.api.java.function.PairFlatMapFunction;
-
-import java.io.Serializable;
-
-/**
- * Workaround for SPARK-668.
- */
-class PairFlatMapWorkaround<T> implements Serializable {
- /**
- * Return a new RDD by first applying a function to all elements of this
- * RDD, and then flattening the results.
- */
- public <K, V> JavaPairRDD<K, V> flatMap(PairFlatMapFunction<T, K, V> f) {
- return ((JavaRDDLike <T, ?>) this).doFlatMap(f);
- }
-}