aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2014-03-03 22:31:30 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-03-03 22:31:30 -0800
commit181ec5030792a10f3ce77e997d0e2eda9bcd6139 (patch)
tree9b88504e5a3eca8177e4ebe1257ea9ce56120c13 /core
parentb14ede789abfabe25144385e8dc2fb96691aba81 (diff)
downloadspark-181ec5030792a10f3ce77e997d0e2eda9bcd6139.tar.gz
spark-181ec5030792a10f3ce77e997d0e2eda9bcd6139.tar.bz2
spark-181ec5030792a10f3ce77e997d0e2eda9bcd6139.zip
[java8API] SPARK-964 Investigate the potential for using JDK 8 lambda expressions for the Java/Scala APIs
Author: Prashant Sharma <prashant.s@imaginea.com> Author: Patrick Wendell <pwendell@gmail.com> Closes #17 from ScrapCodes/java8-lambdas and squashes the following commits: 95850e6 [Patrick Wendell] Some doc improvements and build changes to the Java 8 patch. 85a954e [Prashant Sharma] Nit. import orderings. 673f7ac [Prashant Sharma] Added support for -java-home as well 80a13e8 [Prashant Sharma] Used fake class tag syntax 26eb3f6 [Prashant Sharma] Patrick's comments on PR. 35d8d79 [Prashant Sharma] Specified java 8 building in the docs 31d4cd6 [Prashant Sharma] Maven build to support -Pjava8-tests flag. 4ab87d3 [Prashant Sharma] Review feedback on the pr c33dc2c [Prashant Sharma] SPARK-964, Java 8 API Support.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java (renamed from core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.scala)11
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java27
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java (renamed from core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala)10
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java (renamed from core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala)10
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/Function.java (renamed from core/src/main/scala/org/apache/spark/api/java/function/Function.scala)12
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/Function2.java (renamed from core/src/main/scala/org/apache/spark/api/java/function/Function2.scala)10
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/Function3.java (renamed from core/src/main/scala/org/apache/spark/api/java/function/Function3.scala)9
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java30
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/PairFunction.java29
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java27
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala67
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala82
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/PairFunction.scala33
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/VoidFunction.scala33
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction1.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction2.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala34
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java38
22 files changed, 241 insertions, 354 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.scala b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
index 7500a89436..57fd0a7a80 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.scala
+++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
@@ -15,16 +15,13 @@
* limitations under the License.
*/
-package org.apache.spark.api.java.function
+package org.apache.spark.api.java.function;
-import java.lang.{Double => JDouble, Iterable => JIterable}
+import java.io.Serializable;
/**
* A function that returns zero or more records of type Double from each input record.
*/
-// DoubleFlatMapFunction does not extend FlatMapFunction because flatMap is
-// overloaded for both FlatMapFunction and DoubleFlatMapFunction.
-abstract class DoubleFlatMapFunction[T] extends WrappedFunction1[T, JIterable[JDouble]]
- with Serializable {
- // Intentionally left blank
+public interface DoubleFlatMapFunction<T> extends Serializable {
+ public Iterable<Double> call(T t) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java b/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
new file mode 100644
index 0000000000..150144e0e4
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+/**
+ * A function that returns Doubles, and can be used to construct DoubleRDDs.
+ */
+public interface DoubleFunction<T> extends Serializable {
+ public double call(T t) throws Exception;
+}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
index bdb01f7670..fa75842047 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.spark.api.java.function
+package org.apache.spark.api.java.function;
-import scala.reflect.ClassTag
+import java.io.Serializable;
/**
* A function that returns zero or more output records from each input record.
*/
-abstract class FlatMapFunction[T, R] extends Function[T, java.lang.Iterable[R]] {
- def elementType(): ClassTag[R] = ClassTag.Any.asInstanceOf[ClassTag[R]]
-}
+public interface FlatMapFunction<T, R> extends Serializable {
+ public Iterable<R> call(T t) throws Exception;
+} \ No newline at end of file
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
index aae1349c5e..d1fdec0724 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.spark.api.java.function
+package org.apache.spark.api.java.function;
-import scala.reflect.ClassTag
+import java.io.Serializable;
/**
* A function that takes two inputs and returns zero or more output records.
*/
-abstract class FlatMapFunction2[A, B, C] extends Function2[A, B, java.lang.Iterable[C]] {
- def elementType() : ClassTag[C] = ClassTag.Any.asInstanceOf[ClassTag[C]]
-}
+public interface FlatMapFunction2<T1, T2, R> extends Serializable {
+ public Iterable<R> call(T1 t1, T2 t2) throws Exception;
+} \ No newline at end of file
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function.scala b/core/src/main/java/org/apache/spark/api/java/function/Function.java
index a5e1701f77..d00551bb0a 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function.scala
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function.java
@@ -15,17 +15,15 @@
* limitations under the License.
*/
-package org.apache.spark.api.java.function
+package org.apache.spark.api.java.function;
-import scala.reflect.ClassTag
-import org.apache.spark.api.java.JavaSparkContext
+import java.io.Serializable;
/**
- * Base class for functions whose return types do not create special RDDs. PairFunction and
+ * Base interface for functions whose return types do not create special RDDs. PairFunction and
* DoubleFunction are handled separately, to allow PairRDDs and DoubleRDDs to be constructed
* when mapping RDDs of other types.
*/
-abstract class Function[T, R] extends WrappedFunction1[T, R] with Serializable {
- def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag
+public interface Function<T1, R> extends Serializable {
+ public R call(T1 v1) throws Exception;
}
-
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function2.scala b/core/src/main/java/org/apache/spark/api/java/function/Function2.java
index fa3616cbcb..793caaa61a 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function2.scala
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function2.java
@@ -15,15 +15,13 @@
* limitations under the License.
*/
-package org.apache.spark.api.java.function
+package org.apache.spark.api.java.function;
-import scala.reflect.ClassTag
-import org.apache.spark.api.java.JavaSparkContext
+import java.io.Serializable;
/**
* A two-argument function that takes arguments of type T1 and T2 and returns an R.
*/
-abstract class Function2[T1, T2, R] extends WrappedFunction2[T1, T2, R] with Serializable {
- def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag
+public interface Function2<T1, T2, R> extends Serializable {
+ public R call(T1 v1, T2 v2) throws Exception;
}
-
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function3.scala b/core/src/main/java/org/apache/spark/api/java/function/Function3.java
index 45152891e9..b4151c3417 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function3.scala
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function3.java
@@ -15,14 +15,13 @@
* limitations under the License.
*/
-package org.apache.spark.api.java.function
+package org.apache.spark.api.java.function;
-import org.apache.spark.api.java.JavaSparkContext
-import scala.reflect.ClassTag
+import java.io.Serializable;
/**
* A three-argument function that takes arguments of type T1, T2 and T3 and returns an R.
*/
-abstract class Function3[T1, T2, T3, R] extends WrappedFunction3[T1, T2, T3, R] with Serializable {
- def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag
+public interface Function3<T1, T2, T3, R> extends Serializable {
+ public R call(T1 v1, T2 v2, T3 v3) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
new file mode 100644
index 0000000000..691ef2eceb
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+import scala.Tuple2;
+
+/**
+ * A function that returns zero or more key-value pair records from each input record. The
+ * key-value pairs are represented as scala.Tuple2 objects.
+ */
+public interface PairFlatMapFunction<T, K, V> extends Serializable {
+ public Iterable<Tuple2<K, V>> call(T t) throws Exception;
+}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java
new file mode 100644
index 0000000000..abd9bcc07a
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+import scala.Tuple2;
+
+/**
+ * A function that returns key-value pairs (Tuple2<K, V>), and can be used to construct PairRDDs.
+ */
+public interface PairFunction<T, K, V> extends Serializable {
+ public Tuple2<K, V> call(T t) throws Exception;
+}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
new file mode 100644
index 0000000000..2a10435b75
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+/**
+ * A function with no return value.
+ */
+public interface VoidFunction<T> extends Serializable {
+ public void call(T t) throws Exception;
+}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index 071044463d..d1787061bc 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@ -83,7 +83,7 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[JDouble, Ja
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: JFunction[JDouble, java.lang.Boolean]): JavaDoubleRDD =
- fromRDD(srdd.filter(x => f(x).booleanValue()))
+ fromRDD(srdd.filter(x => f.call(x).booleanValue()))
/**
* Return a new RDD that is reduced into `numPartitions` partitions.
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 3f672900cb..857626fe84 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -32,7 +32,7 @@ import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.Partitioner._
import org.apache.spark.SparkContext.rddToPairRDDFunctions
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
-import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, PairFunction}
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.rdd.{OrderedRDDFunctions, RDD}
import org.apache.spark.storage.StorageLevel
@@ -89,7 +89,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] =
- new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue()))
+ new JavaPairRDD[K, V](rdd.filter(x => f.call(x).booleanValue()))
/**
* Return a new RDD that is reduced into `numPartitions` partitions.
@@ -165,9 +165,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Simplified version of combineByKey that hash-partitions the output RDD.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
- mergeValue: JFunction2[C, V, C],
- mergeCombiners: JFunction2[C, C, C],
- numPartitions: Int): JavaPairRDD[K, C] =
+ mergeValue: JFunction2[C, V, C],
+ mergeCombiners: JFunction2[C, C, C],
+ numPartitions: Int): JavaPairRDD[K, C] =
combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
/**
@@ -442,7 +442,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
*/
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
import scala.collection.JavaConverters._
- def fn = (x: V) => f.apply(x).asScala
+ def fn = (x: V) => f.call(x).asScala
implicit val ctag: ClassTag[U] = fakeClassTag
fromRDD(rdd.flatMapValues(fn))
}
@@ -511,49 +511,49 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
/** Output the RDD to any Hadoop-supported file system. */
def saveAsHadoopFile[F <: OutputFormat[_, _]](
- path: String,
- keyClass: Class[_],
- valueClass: Class[_],
- outputFormatClass: Class[F],
- conf: JobConf) {
+ path: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[F],
+ conf: JobConf) {
rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
}
/** Output the RDD to any Hadoop-supported file system. */
def saveAsHadoopFile[F <: OutputFormat[_, _]](
- path: String,
- keyClass: Class[_],
- valueClass: Class[_],
- outputFormatClass: Class[F]) {
+ path: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[F]) {
rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass)
}
/** Output the RDD to any Hadoop-supported file system, compressing with the supplied codec. */
def saveAsHadoopFile[F <: OutputFormat[_, _]](
- path: String,
- keyClass: Class[_],
- valueClass: Class[_],
- outputFormatClass: Class[F],
- codec: Class[_ <: CompressionCodec]) {
+ path: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[F],
+ codec: Class[_ <: CompressionCodec]) {
rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, codec)
}
/** Output the RDD to any Hadoop-supported file system. */
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
- path: String,
- keyClass: Class[_],
- valueClass: Class[_],
- outputFormatClass: Class[F],
- conf: Configuration) {
+ path: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[F],
+ conf: Configuration) {
rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
}
/** Output the RDD to any Hadoop-supported file system. */
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
- path: String,
- keyClass: Class[_],
- valueClass: Class[_],
- outputFormatClass: Class[F]) {
+ path: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[F]) {
rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass)
}
@@ -700,6 +700,15 @@ object JavaPairRDD {
implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd
+ private[spark]
+ implicit def toScalaFunction2[T1, T2, R](fun: JFunction2[T1, T2, R]): Function2[T1, T2, R] = {
+ (x: T1, x1: T2) => fun.call(x, x1)
+ }
+
+ private[spark] implicit def toScalaFunction[T, R](fun: JFunction[T, R]): T => R = x => fun.call(x)
+
+ private[spark]
+ implicit def pairFunToScalaFun[A, B, C](x: PairFunction[A, B, C]): A => (B, C) = y => x.call(y)
/** Convert a JavaRDD of key-value pairs to JavaPairRDD. */
def fromJavaRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = {
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index d7ce8fdfc2..e973c46edd 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -70,7 +70,7 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: JFunction[T, java.lang.Boolean]): JavaRDD[T] =
- wrapRDD(rdd.filter((x => f(x).booleanValue())))
+ wrapRDD(rdd.filter((x => f.call(x).booleanValue())))
/**
* Return a new RDD that is reduced into `numPartitions` partitions.
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 729668fb67..af0114bee3 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -67,7 +67,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[R](f: JFunction[T, R]): JavaRDD[R] =
- new JavaRDD(rdd.map(f)(f.returnType()))(f.returnType())
+ new JavaRDD(rdd.map(f)(fakeClassTag))(fakeClassTag)
/**
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
@@ -82,15 +82,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
- def map[R](f: DoubleFunction[T]): JavaDoubleRDD =
- new JavaDoubleRDD(rdd.map(x => f(x).doubleValue()))
+ def mapToDouble[R](f: DoubleFunction[T]): JavaDoubleRDD = {
+ new JavaDoubleRDD(rdd.map(x => f.call(x).doubleValue()))
+ }
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
- def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
- val ctag = implicitly[ClassTag[Tuple2[K2, V2]]]
- new JavaPairRDD(rdd.map(f)(ctag))(f.keyType(), f.valueType())
+ def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
+ def cm = implicitly[ClassTag[(K2, V2)]]
+ new JavaPairRDD(rdd.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}
/**
@@ -99,17 +100,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = {
import scala.collection.JavaConverters._
- def fn = (x: T) => f.apply(x).asScala
- JavaRDD.fromRDD(rdd.flatMap(fn)(f.elementType()))(f.elementType())
+ def fn = (x: T) => f.call(x).asScala
+ JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
- def flatMap(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
+ def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
import scala.collection.JavaConverters._
- def fn = (x: T) => f.apply(x).asScala
+ def fn = (x: T) => f.call(x).asScala
new JavaDoubleRDD(rdd.flatMap(fn).map((x: java.lang.Double) => x.doubleValue()))
}
@@ -117,19 +118,19 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
- def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
+ def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
import scala.collection.JavaConverters._
- def fn = (x: T) => f.apply(x).asScala
- val ctag = implicitly[ClassTag[Tuple2[K2, V2]]]
- JavaPairRDD.fromRDD(rdd.flatMap(fn)(ctag))(f.keyType(), f.valueType())
+ def fn = (x: T) => f.call(x).asScala
+ def cm = implicitly[ClassTag[(K2, V2)]]
+ JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
- JavaRDD.fromRDD(rdd.mapPartitions(fn)(f.elementType()))(f.elementType())
+ def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ JavaRDD.fromRDD(rdd.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
/**
@@ -137,52 +138,53 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],
preservesPartitioning: Boolean): JavaRDD[U] = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
- JavaRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning)(f.elementType()))(f.elementType())
+ def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ JavaRDD.fromRDD(
+ rdd.mapPartitions(fn, preservesPartitioning)(fakeClassTag[U]))(fakeClassTag[U])
}
/**
- * Return a new RDD by applying a function to each partition of this RDD.
+ * Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+ def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
+ def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: java.lang.Double) => x.doubleValue()))
}
/**
- * Return a new RDD by applying a function to each partition of this RDD.
+ * Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
+ def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
JavaPairRDD[K2, V2] = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
- JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(f.keyType(), f.valueType())
+ def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
}
-
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]],
- preservesPartitioning: Boolean): JavaDoubleRDD = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+ def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]],
+ preservesPartitioning: Boolean): JavaDoubleRDD = {
+ def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning)
- .map((x: java.lang.Double) => x.doubleValue()))
+ .map(x => x.doubleValue()))
}
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2],
+ def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2],
preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
- JavaPairRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning))(f.keyType(), f.valueType())
+ def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ JavaPairRDD.fromRDD(
+ rdd.mapPartitions(fn, preservesPartitioning))(fakeClassTag[K2], fakeClassTag[V2])
}
/**
* Applies a function f to each partition of this RDD.
*/
def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) {
- rdd.foreachPartition((x => f(asJavaIterator(x))))
+ rdd.foreachPartition((x => f.call(asJavaIterator(x))))
}
/**
@@ -205,7 +207,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JList[T]] = {
implicit val ctagK: ClassTag[K] = fakeClassTag
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
- JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(f.returnType)))
+ JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag)))
}
/**
@@ -215,7 +217,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JList[T]] = {
implicit val ctagK: ClassTag[K] = fakeClassTag
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
- JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(f.returnType)))
+ JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[K])))
}
/**
@@ -255,9 +257,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
other: JavaRDDLike[U, _],
f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = {
def fn = (x: Iterator[T], y: Iterator[U]) => asScalaIterator(
- f.apply(asJavaIterator(x), asJavaIterator(y)).iterator())
+ f.call(asJavaIterator(x), asJavaIterator(y)).iterator())
JavaRDD.fromRDD(
- rdd.zipPartitions(other.rdd)(fn)(other.classTag, f.elementType()))(f.elementType())
+ rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V])
}
// Actions (launch a job to return a value to the user program)
@@ -266,7 +268,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Applies a function f to all elements of this RDD.
*/
def foreach(f: VoidFunction[T]) {
- val cleanF = rdd.context.clean(f)
+ val cleanF = rdd.context.clean((x: T) => f.call(x))
rdd.foreach(cleanF)
}
@@ -320,7 +322,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def aggregate[U](zeroValue: U)(seqOp: JFunction2[U, T, U],
combOp: JFunction2[U, U, U]): U =
- rdd.aggregate(zeroValue)(seqOp, combOp)(seqOp.returnType)
+ rdd.aggregate(zeroValue)(seqOp, combOp)(fakeClassTag[U])
/**
* Return the number of elements in the RDD.
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.scala
deleted file mode 100644
index 2cdf2e92c3..0000000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import java.lang.{Double => JDouble}
-
-/**
- * A function that returns Doubles, and can be used to construct DoubleRDDs.
- */
-// DoubleFunction does not extend Function because some UDF functions, like map,
-// are overloaded for both Function and DoubleFunction.
-abstract class DoubleFunction[T] extends WrappedFunction1[T, JDouble] with Serializable {
- // Intentionally left blank
-}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.scala
deleted file mode 100644
index 8467bbb892..0000000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import java.lang.{Iterable => JIterable}
-import org.apache.spark.api.java.JavaSparkContext
-import scala.reflect.ClassTag
-
-/**
- * A function that returns zero or more key-value pair records from each input record. The
- * key-value pairs are represented as scala.Tuple2 objects.
- */
-// PairFlatMapFunction does not extend FlatMapFunction because flatMap is
-// overloaded for both FlatMapFunction and PairFlatMapFunction.
-abstract class PairFlatMapFunction[T, K, V] extends WrappedFunction1[T, JIterable[(K, V)]]
- with Serializable {
-
- def keyType(): ClassTag[K] = JavaSparkContext.fakeClassTag
-
- def valueType(): ClassTag[V] = JavaSparkContext.fakeClassTag
-}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.scala
deleted file mode 100644
index d0ba0b6307..0000000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import scala.reflect.ClassTag
-import org.apache.spark.api.java.JavaSparkContext
-
-/**
- * A function that returns key-value pairs (Tuple2<K, V>), and can be used to construct PairRDDs.
- */
-// PairFunction does not extend Function because some UDF functions, like map,
-// are overloaded for both Function and PairFunction.
-abstract class PairFunction[T, K, V] extends WrappedFunction1[T, (K, V)] with Serializable {
-
- def keyType(): ClassTag[K] = JavaSparkContext.fakeClassTag
-
- def valueType(): ClassTag[V] = JavaSparkContext.fakeClassTag
-}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/VoidFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/VoidFunction.scala
deleted file mode 100644
index ea94313a4a..0000000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/VoidFunction.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-/**
- * A function with no return value.
- */
-// This allows Java users to write void methods without having to return Unit.
-abstract class VoidFunction[T] extends Serializable {
- @throws(classOf[Exception])
- def call(t: T) : Unit
-}
-
-// VoidFunction cannot extend AbstractFunction1 (because that would force users to explicitly
-// return Unit), so it is implicitly converted to a Function1[T, Unit]:
-object VoidFunction {
- implicit def toFunction[T](f: VoidFunction[T]) : Function1[T, Unit] = ((x : T) => f.call(x))
-}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction1.scala b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction1.scala
deleted file mode 100644
index cfe694f65d..0000000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction1.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import scala.runtime.AbstractFunction1
-
-/**
- * Subclass of Function1 for ease of calling from Java. The main thing it does is re-expose the
- * apply() method as call() and declare that it can throw Exception (since AbstractFunction1.apply
- * isn't marked to allow that).
- */
-private[spark] abstract class WrappedFunction1[T, R] extends AbstractFunction1[T, R] {
- @throws(classOf[Exception])
- def call(t: T): R
-
- final def apply(t: T): R = call(t)
-}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction2.scala b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction2.scala
deleted file mode 100644
index eb9277c6fb..0000000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction2.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import scala.runtime.AbstractFunction2
-
-/**
- * Subclass of Function2 for ease of calling from Java. The main thing it does is re-expose the
- * apply() method as call() and declare that it can throw Exception (since AbstractFunction2.apply
- * isn't marked to allow that).
- */
-private[spark] abstract class WrappedFunction2[T1, T2, R] extends AbstractFunction2[T1, T2, R] {
- @throws(classOf[Exception])
- def call(t1: T1, t2: T2): R
-
- final def apply(t1: T1, t2: T2): R = call(t1, t2)
-}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala
deleted file mode 100644
index d314dbdf1d..0000000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import scala.runtime.AbstractFunction3
-
-/**
- * Subclass of Function3 for ease of calling from Java. The main thing it does is re-expose the
- * apply() method as call() and declare that it can throw Exception (since AbstractFunction3.apply
- * isn't marked to allow that).
- */
-private[spark] abstract class WrappedFunction3[T1, T2, T3, R]
- extends AbstractFunction3[T1, T2, T3, R] {
- @throws(classOf[Exception])
- def call(t1: T1, t2: T2, t3: T3): R
-
- final def apply(t1: T1, t2: T2, t3: T3): R = call(t1, t2, t3)
-}
-
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index aa5079c159..c7d0e2d577 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -386,14 +386,14 @@ public class JavaAPISuite implements Serializable {
@Test
public void map() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() {
+ JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction<Integer>() {
@Override
- public Double call(Integer x) {
+ public double call(Integer x) {
return 1.0 * x;
}
}).cache();
doubles.collect();
- JavaPairRDD<Integer, Integer> pairs = rdd.map(new PairFunction<Integer, Integer, Integer>() {
+ JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer x) {
return new Tuple2<Integer, Integer>(x, x);
@@ -422,7 +422,7 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals("Hello", words.first());
Assert.assertEquals(11, words.count());
- JavaPairRDD<String, String> pairs = rdd.flatMap(
+ JavaPairRDD<String, String> pairs = rdd.flatMapToPair(
new PairFlatMapFunction<String, String, String>() {
@Override
@@ -436,7 +436,7 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(new Tuple2<String, String>("Hello", "Hello"), pairs.first());
Assert.assertEquals(11, pairs.count());
- JavaDoubleRDD doubles = rdd.flatMap(new DoubleFlatMapFunction<String>() {
+ JavaDoubleRDD doubles = rdd.flatMapToDouble(new DoubleFlatMapFunction<String>() {
@Override
public Iterable<Double> call(String s) {
List<Double> lengths = new LinkedList<Double>();
@@ -459,7 +459,7 @@ public class JavaAPISuite implements Serializable {
JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
// Regression test for SPARK-668:
- JavaPairRDD<String, Integer> swapped = pairRDD.flatMap(
+ JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair(
new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) throws Exception {
@@ -469,7 +469,7 @@ public class JavaAPISuite implements Serializable {
swapped.collect();
// There was never a bug here, but it's worth testing:
- pairRDD.map(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
+ pairRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> item) throws Exception {
return item.swap();
@@ -592,7 +592,7 @@ public class JavaAPISuite implements Serializable {
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
- rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+ rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
@Override
public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
@@ -601,7 +601,7 @@ public class JavaAPISuite implements Serializable {
// Try reading the output back as an object file
JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class,
- Text.class).map(new PairFunction<Tuple2<IntWritable, Text>, Integer, String>() {
+ Text.class).mapToPair(new PairFunction<Tuple2<IntWritable, Text>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<IntWritable, Text> pair) {
return new Tuple2<Integer, String>(pair._1().get(), pair._2().toString());
@@ -622,7 +622,7 @@ public class JavaAPISuite implements Serializable {
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
- rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+ rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
@Override
public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
@@ -653,7 +653,7 @@ public class JavaAPISuite implements Serializable {
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
- rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+ rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
@Override
public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
@@ -713,7 +713,7 @@ public class JavaAPISuite implements Serializable {
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
- rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+ rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
@Override
public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
@@ -743,7 +743,7 @@ public class JavaAPISuite implements Serializable {
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
- rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+ rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
@Override
public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
@@ -766,9 +766,9 @@ public class JavaAPISuite implements Serializable {
@Test
public void zip() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() {
+ JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction<Integer>() {
@Override
- public Double call(Integer x) {
+ public double call(Integer x) {
return 1.0 * x;
}
});
@@ -893,13 +893,13 @@ public class JavaAPISuite implements Serializable {
@Test
public void mapOnPairRDD() {
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4));
- JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
+ JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer i) throws Exception {
return new Tuple2<Integer, Integer>(i, i % 2);
}
});
- JavaPairRDD<Integer, Integer> rdd3 = rdd2.map(
+ JavaPairRDD<Integer, Integer> rdd3 = rdd2.mapToPair(
new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) throws Exception {
@@ -919,7 +919,7 @@ public class JavaAPISuite implements Serializable {
public void collectPartitions() {
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
- JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
+ JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer i) throws Exception {
return new Tuple2<Integer, Integer>(i, i % 2);
@@ -984,7 +984,7 @@ public class JavaAPISuite implements Serializable {
public void collectAsMapWithIntArrayValues() {
// Regression test for SPARK-1040
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
- JavaPairRDD<Integer, int[]> pairRDD = rdd.map(new PairFunction<Integer, Integer, int[]>() {
+ JavaPairRDD<Integer, int[]> pairRDD = rdd.mapToPair(new PairFunction<Integer, Integer, int[]>() {
@Override
public Tuple2<Integer, int[]> call(Integer x) throws Exception {
return new Tuple2<Integer, int[]>(x, new int[] { x });