aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java (renamed from core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.scala)11
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java27
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java (renamed from core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala)10
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java (renamed from core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala)10
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/Function.java (renamed from core/src/main/scala/org/apache/spark/api/java/function/Function.scala)12
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/Function2.java (renamed from core/src/main/scala/org/apache/spark/api/java/function/Function2.scala)10
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/Function3.java (renamed from core/src/main/scala/org/apache/spark/api/java/function/Function3.scala)9
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java30
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/PairFunction.java29
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java27
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala67
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala82
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/PairFunction.scala33
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/VoidFunction.scala33
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction1.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction2.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala34
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java38
-rwxr-xr-xdev/run-tests10
-rw-r--r--docs/building-with-maven.md12
-rw-r--r--docs/java-programming-guide.md56
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java6
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaKMeans.java2
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java2
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaPageRank.java6
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaTC.java6
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaWordCount.java2
-rw-r--r--examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java4
-rw-r--r--examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java2
-rw-r--r--examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java2
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java2
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java2
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java2
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala6
-rw-r--r--extras/README.md1
-rw-r--r--extras/java8-tests/README.md24
-rw-r--r--extras/java8-tests/pom.xml151
-rw-r--r--extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java391
-rw-r--r--extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java841
-rw-r--r--extras/java8-tests/src/test/resources/log4j.properties28
-rw-r--r--pom.xml25
-rw-r--r--project/SparkBuild.scala20
-rwxr-xr-xsbt/sbt-launch-lib.bash11
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala119
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala101
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala4
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java62
52 files changed, 1946 insertions, 551 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.scala b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
index 7500a89436..57fd0a7a80 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.scala
+++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
@@ -15,16 +15,13 @@
* limitations under the License.
*/
-package org.apache.spark.api.java.function
+package org.apache.spark.api.java.function;
-import java.lang.{Double => JDouble, Iterable => JIterable}
+import java.io.Serializable;
/**
* A function that returns zero or more records of type Double from each input record.
*/
-// DoubleFlatMapFunction does not extend FlatMapFunction because flatMap is
-// overloaded for both FlatMapFunction and DoubleFlatMapFunction.
-abstract class DoubleFlatMapFunction[T] extends WrappedFunction1[T, JIterable[JDouble]]
- with Serializable {
- // Intentionally left blank
+public interface DoubleFlatMapFunction<T> extends Serializable {
+ public Iterable<Double> call(T t) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java b/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
new file mode 100644
index 0000000000..150144e0e4
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+/**
+ * A function that returns Doubles, and can be used to construct DoubleRDDs.
+ */
+public interface DoubleFunction<T> extends Serializable {
+ public double call(T t) throws Exception;
+}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
index bdb01f7670..fa75842047 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.spark.api.java.function
+package org.apache.spark.api.java.function;
-import scala.reflect.ClassTag
+import java.io.Serializable;
/**
* A function that returns zero or more output records from each input record.
*/
-abstract class FlatMapFunction[T, R] extends Function[T, java.lang.Iterable[R]] {
- def elementType(): ClassTag[R] = ClassTag.Any.asInstanceOf[ClassTag[R]]
-}
+public interface FlatMapFunction<T, R> extends Serializable {
+ public Iterable<R> call(T t) throws Exception;
+} \ No newline at end of file
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
index aae1349c5e..d1fdec0724 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.spark.api.java.function
+package org.apache.spark.api.java.function;
-import scala.reflect.ClassTag
+import java.io.Serializable;
/**
* A function that takes two inputs and returns zero or more output records.
*/
-abstract class FlatMapFunction2[A, B, C] extends Function2[A, B, java.lang.Iterable[C]] {
- def elementType() : ClassTag[C] = ClassTag.Any.asInstanceOf[ClassTag[C]]
-}
+public interface FlatMapFunction2<T1, T2, R> extends Serializable {
+ public Iterable<R> call(T1 t1, T2 t2) throws Exception;
+} \ No newline at end of file
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function.scala b/core/src/main/java/org/apache/spark/api/java/function/Function.java
index a5e1701f77..d00551bb0a 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function.scala
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function.java
@@ -15,17 +15,15 @@
* limitations under the License.
*/
-package org.apache.spark.api.java.function
+package org.apache.spark.api.java.function;
-import scala.reflect.ClassTag
-import org.apache.spark.api.java.JavaSparkContext
+import java.io.Serializable;
/**
- * Base class for functions whose return types do not create special RDDs. PairFunction and
+ * Base interface for functions whose return types do not create special RDDs. PairFunction and
* DoubleFunction are handled separately, to allow PairRDDs and DoubleRDDs to be constructed
* when mapping RDDs of other types.
*/
-abstract class Function[T, R] extends WrappedFunction1[T, R] with Serializable {
- def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag
+public interface Function<T1, R> extends Serializable {
+ public R call(T1 v1) throws Exception;
}
-
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function2.scala b/core/src/main/java/org/apache/spark/api/java/function/Function2.java
index fa3616cbcb..793caaa61a 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function2.scala
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function2.java
@@ -15,15 +15,13 @@
* limitations under the License.
*/
-package org.apache.spark.api.java.function
+package org.apache.spark.api.java.function;
-import scala.reflect.ClassTag
-import org.apache.spark.api.java.JavaSparkContext
+import java.io.Serializable;
/**
* A two-argument function that takes arguments of type T1 and T2 and returns an R.
*/
-abstract class Function2[T1, T2, R] extends WrappedFunction2[T1, T2, R] with Serializable {
- def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag
+public interface Function2<T1, T2, R> extends Serializable {
+ public R call(T1 v1, T2 v2) throws Exception;
}
-
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function3.scala b/core/src/main/java/org/apache/spark/api/java/function/Function3.java
index 45152891e9..b4151c3417 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function3.scala
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function3.java
@@ -15,14 +15,13 @@
* limitations under the License.
*/
-package org.apache.spark.api.java.function
+package org.apache.spark.api.java.function;
-import org.apache.spark.api.java.JavaSparkContext
-import scala.reflect.ClassTag
+import java.io.Serializable;
/**
* A three-argument function that takes arguments of type T1, T2 and T3 and returns an R.
*/
-abstract class Function3[T1, T2, T3, R] extends WrappedFunction3[T1, T2, T3, R] with Serializable {
- def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag
+public interface Function3<T1, T2, T3, R> extends Serializable {
+ public R call(T1 v1, T2 v2, T3 v3) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
new file mode 100644
index 0000000000..691ef2eceb
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+import scala.Tuple2;
+
+/**
+ * A function that returns zero or more key-value pair records from each input record. The
+ * key-value pairs are represented as scala.Tuple2 objects.
+ */
+public interface PairFlatMapFunction<T, K, V> extends Serializable {
+ public Iterable<Tuple2<K, V>> call(T t) throws Exception;
+}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java
new file mode 100644
index 0000000000..abd9bcc07a
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+import scala.Tuple2;
+
+/**
+ * A function that returns key-value pairs (Tuple2<K, V>), and can be used to construct PairRDDs.
+ */
+public interface PairFunction<T, K, V> extends Serializable {
+ public Tuple2<K, V> call(T t) throws Exception;
+}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
new file mode 100644
index 0000000000..2a10435b75
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+/**
+ * A function with no return value.
+ */
+public interface VoidFunction<T> extends Serializable {
+ public void call(T t) throws Exception;
+}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index 071044463d..d1787061bc 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@ -83,7 +83,7 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[JDouble, Ja
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: JFunction[JDouble, java.lang.Boolean]): JavaDoubleRDD =
- fromRDD(srdd.filter(x => f(x).booleanValue()))
+ fromRDD(srdd.filter(x => f.call(x).booleanValue()))
/**
* Return a new RDD that is reduced into `numPartitions` partitions.
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 3f672900cb..857626fe84 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -32,7 +32,7 @@ import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.Partitioner._
import org.apache.spark.SparkContext.rddToPairRDDFunctions
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
-import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, PairFunction}
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.rdd.{OrderedRDDFunctions, RDD}
import org.apache.spark.storage.StorageLevel
@@ -89,7 +89,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] =
- new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue()))
+ new JavaPairRDD[K, V](rdd.filter(x => f.call(x).booleanValue()))
/**
* Return a new RDD that is reduced into `numPartitions` partitions.
@@ -165,9 +165,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Simplified version of combineByKey that hash-partitions the output RDD.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
- mergeValue: JFunction2[C, V, C],
- mergeCombiners: JFunction2[C, C, C],
- numPartitions: Int): JavaPairRDD[K, C] =
+ mergeValue: JFunction2[C, V, C],
+ mergeCombiners: JFunction2[C, C, C],
+ numPartitions: Int): JavaPairRDD[K, C] =
combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
/**
@@ -442,7 +442,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
*/
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
import scala.collection.JavaConverters._
- def fn = (x: V) => f.apply(x).asScala
+ def fn = (x: V) => f.call(x).asScala
implicit val ctag: ClassTag[U] = fakeClassTag
fromRDD(rdd.flatMapValues(fn))
}
@@ -511,49 +511,49 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
/** Output the RDD to any Hadoop-supported file system. */
def saveAsHadoopFile[F <: OutputFormat[_, _]](
- path: String,
- keyClass: Class[_],
- valueClass: Class[_],
- outputFormatClass: Class[F],
- conf: JobConf) {
+ path: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[F],
+ conf: JobConf) {
rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
}
/** Output the RDD to any Hadoop-supported file system. */
def saveAsHadoopFile[F <: OutputFormat[_, _]](
- path: String,
- keyClass: Class[_],
- valueClass: Class[_],
- outputFormatClass: Class[F]) {
+ path: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[F]) {
rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass)
}
/** Output the RDD to any Hadoop-supported file system, compressing with the supplied codec. */
def saveAsHadoopFile[F <: OutputFormat[_, _]](
- path: String,
- keyClass: Class[_],
- valueClass: Class[_],
- outputFormatClass: Class[F],
- codec: Class[_ <: CompressionCodec]) {
+ path: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[F],
+ codec: Class[_ <: CompressionCodec]) {
rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, codec)
}
/** Output the RDD to any Hadoop-supported file system. */
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
- path: String,
- keyClass: Class[_],
- valueClass: Class[_],
- outputFormatClass: Class[F],
- conf: Configuration) {
+ path: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[F],
+ conf: Configuration) {
rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
}
/** Output the RDD to any Hadoop-supported file system. */
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
- path: String,
- keyClass: Class[_],
- valueClass: Class[_],
- outputFormatClass: Class[F]) {
+ path: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[F]) {
rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass)
}
@@ -700,6 +700,15 @@ object JavaPairRDD {
implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd
+ private[spark]
+ implicit def toScalaFunction2[T1, T2, R](fun: JFunction2[T1, T2, R]): Function2[T1, T2, R] = {
+ (x: T1, x1: T2) => fun.call(x, x1)
+ }
+
+ private[spark] implicit def toScalaFunction[T, R](fun: JFunction[T, R]): T => R = x => fun.call(x)
+
+ private[spark]
+ implicit def pairFunToScalaFun[A, B, C](x: PairFunction[A, B, C]): A => (B, C) = y => x.call(y)
/** Convert a JavaRDD of key-value pairs to JavaPairRDD. */
def fromJavaRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = {
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index d7ce8fdfc2..e973c46edd 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -70,7 +70,7 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: JFunction[T, java.lang.Boolean]): JavaRDD[T] =
- wrapRDD(rdd.filter((x => f(x).booleanValue())))
+ wrapRDD(rdd.filter((x => f.call(x).booleanValue())))
/**
* Return a new RDD that is reduced into `numPartitions` partitions.
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 729668fb67..af0114bee3 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -67,7 +67,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[R](f: JFunction[T, R]): JavaRDD[R] =
- new JavaRDD(rdd.map(f)(f.returnType()))(f.returnType())
+ new JavaRDD(rdd.map(f)(fakeClassTag))(fakeClassTag)
/**
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
@@ -82,15 +82,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
- def map[R](f: DoubleFunction[T]): JavaDoubleRDD =
- new JavaDoubleRDD(rdd.map(x => f(x).doubleValue()))
+ def mapToDouble[R](f: DoubleFunction[T]): JavaDoubleRDD = {
+ new JavaDoubleRDD(rdd.map(x => f.call(x).doubleValue()))
+ }
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
- def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
- val ctag = implicitly[ClassTag[Tuple2[K2, V2]]]
- new JavaPairRDD(rdd.map(f)(ctag))(f.keyType(), f.valueType())
+ def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
+ def cm = implicitly[ClassTag[(K2, V2)]]
+ new JavaPairRDD(rdd.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}
/**
@@ -99,17 +100,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = {
import scala.collection.JavaConverters._
- def fn = (x: T) => f.apply(x).asScala
- JavaRDD.fromRDD(rdd.flatMap(fn)(f.elementType()))(f.elementType())
+ def fn = (x: T) => f.call(x).asScala
+ JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
- def flatMap(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
+ def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
import scala.collection.JavaConverters._
- def fn = (x: T) => f.apply(x).asScala
+ def fn = (x: T) => f.call(x).asScala
new JavaDoubleRDD(rdd.flatMap(fn).map((x: java.lang.Double) => x.doubleValue()))
}
@@ -117,19 +118,19 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
- def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
+ def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
import scala.collection.JavaConverters._
- def fn = (x: T) => f.apply(x).asScala
- val ctag = implicitly[ClassTag[Tuple2[K2, V2]]]
- JavaPairRDD.fromRDD(rdd.flatMap(fn)(ctag))(f.keyType(), f.valueType())
+ def fn = (x: T) => f.call(x).asScala
+ def cm = implicitly[ClassTag[(K2, V2)]]
+ JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
- JavaRDD.fromRDD(rdd.mapPartitions(fn)(f.elementType()))(f.elementType())
+ def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ JavaRDD.fromRDD(rdd.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
/**
@@ -137,52 +138,53 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],
preservesPartitioning: Boolean): JavaRDD[U] = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
- JavaRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning)(f.elementType()))(f.elementType())
+ def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ JavaRDD.fromRDD(
+ rdd.mapPartitions(fn, preservesPartitioning)(fakeClassTag[U]))(fakeClassTag[U])
}
/**
- * Return a new RDD by applying a function to each partition of this RDD.
+ * Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+ def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
+ def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: java.lang.Double) => x.doubleValue()))
}
/**
- * Return a new RDD by applying a function to each partition of this RDD.
+ * Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
+ def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
JavaPairRDD[K2, V2] = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
- JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(f.keyType(), f.valueType())
+ def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
}
-
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]],
- preservesPartitioning: Boolean): JavaDoubleRDD = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+ def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]],
+ preservesPartitioning: Boolean): JavaDoubleRDD = {
+ def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning)
- .map((x: java.lang.Double) => x.doubleValue()))
+ .map(x => x.doubleValue()))
}
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2],
+ def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2],
preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
- JavaPairRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning))(f.keyType(), f.valueType())
+ def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ JavaPairRDD.fromRDD(
+ rdd.mapPartitions(fn, preservesPartitioning))(fakeClassTag[K2], fakeClassTag[V2])
}
/**
* Applies a function f to each partition of this RDD.
*/
def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) {
- rdd.foreachPartition((x => f(asJavaIterator(x))))
+ rdd.foreachPartition((x => f.call(asJavaIterator(x))))
}
/**
@@ -205,7 +207,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JList[T]] = {
implicit val ctagK: ClassTag[K] = fakeClassTag
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
- JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(f.returnType)))
+ JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag)))
}
/**
@@ -215,7 +217,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JList[T]] = {
implicit val ctagK: ClassTag[K] = fakeClassTag
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
- JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(f.returnType)))
+ JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[K])))
}
/**
@@ -255,9 +257,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
other: JavaRDDLike[U, _],
f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = {
def fn = (x: Iterator[T], y: Iterator[U]) => asScalaIterator(
- f.apply(asJavaIterator(x), asJavaIterator(y)).iterator())
+ f.call(asJavaIterator(x), asJavaIterator(y)).iterator())
JavaRDD.fromRDD(
- rdd.zipPartitions(other.rdd)(fn)(other.classTag, f.elementType()))(f.elementType())
+ rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V])
}
// Actions (launch a job to return a value to the user program)
@@ -266,7 +268,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Applies a function f to all elements of this RDD.
*/
def foreach(f: VoidFunction[T]) {
- val cleanF = rdd.context.clean(f)
+ val cleanF = rdd.context.clean((x: T) => f.call(x))
rdd.foreach(cleanF)
}
@@ -320,7 +322,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def aggregate[U](zeroValue: U)(seqOp: JFunction2[U, T, U],
combOp: JFunction2[U, U, U]): U =
- rdd.aggregate(zeroValue)(seqOp, combOp)(seqOp.returnType)
+ rdd.aggregate(zeroValue)(seqOp, combOp)(fakeClassTag[U])
/**
* Return the number of elements in the RDD.
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.scala
deleted file mode 100644
index 2cdf2e92c3..0000000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import java.lang.{Double => JDouble}
-
-/**
- * A function that returns Doubles, and can be used to construct DoubleRDDs.
- */
-// DoubleFunction does not extend Function because some UDF functions, like map,
-// are overloaded for both Function and DoubleFunction.
-abstract class DoubleFunction[T] extends WrappedFunction1[T, JDouble] with Serializable {
- // Intentionally left blank
-}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.scala
deleted file mode 100644
index 8467bbb892..0000000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import java.lang.{Iterable => JIterable}
-import org.apache.spark.api.java.JavaSparkContext
-import scala.reflect.ClassTag
-
-/**
- * A function that returns zero or more key-value pair records from each input record. The
- * key-value pairs are represented as scala.Tuple2 objects.
- */
-// PairFlatMapFunction does not extend FlatMapFunction because flatMap is
-// overloaded for both FlatMapFunction and PairFlatMapFunction.
-abstract class PairFlatMapFunction[T, K, V] extends WrappedFunction1[T, JIterable[(K, V)]]
- with Serializable {
-
- def keyType(): ClassTag[K] = JavaSparkContext.fakeClassTag
-
- def valueType(): ClassTag[V] = JavaSparkContext.fakeClassTag
-}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.scala
deleted file mode 100644
index d0ba0b6307..0000000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import scala.reflect.ClassTag
-import org.apache.spark.api.java.JavaSparkContext
-
-/**
- * A function that returns key-value pairs (Tuple2<K, V>), and can be used to construct PairRDDs.
- */
-// PairFunction does not extend Function because some UDF functions, like map,
-// are overloaded for both Function and PairFunction.
-abstract class PairFunction[T, K, V] extends WrappedFunction1[T, (K, V)] with Serializable {
-
- def keyType(): ClassTag[K] = JavaSparkContext.fakeClassTag
-
- def valueType(): ClassTag[V] = JavaSparkContext.fakeClassTag
-}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/VoidFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/VoidFunction.scala
deleted file mode 100644
index ea94313a4a..0000000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/VoidFunction.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-/**
- * A function with no return value.
- */
-// This allows Java users to write void methods without having to return Unit.
-abstract class VoidFunction[T] extends Serializable {
- @throws(classOf[Exception])
- def call(t: T) : Unit
-}
-
-// VoidFunction cannot extend AbstractFunction1 (because that would force users to explicitly
-// return Unit), so it is implicitly converted to a Function1[T, Unit]:
-object VoidFunction {
- implicit def toFunction[T](f: VoidFunction[T]) : Function1[T, Unit] = ((x : T) => f.call(x))
-}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction1.scala b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction1.scala
deleted file mode 100644
index cfe694f65d..0000000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction1.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import scala.runtime.AbstractFunction1
-
-/**
- * Subclass of Function1 for ease of calling from Java. The main thing it does is re-expose the
- * apply() method as call() and declare that it can throw Exception (since AbstractFunction1.apply
- * isn't marked to allow that).
- */
-private[spark] abstract class WrappedFunction1[T, R] extends AbstractFunction1[T, R] {
- @throws(classOf[Exception])
- def call(t: T): R
-
- final def apply(t: T): R = call(t)
-}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction2.scala b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction2.scala
deleted file mode 100644
index eb9277c6fb..0000000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction2.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import scala.runtime.AbstractFunction2
-
-/**
- * Subclass of Function2 for ease of calling from Java. The main thing it does is re-expose the
- * apply() method as call() and declare that it can throw Exception (since AbstractFunction2.apply
- * isn't marked to allow that).
- */
-private[spark] abstract class WrappedFunction2[T1, T2, R] extends AbstractFunction2[T1, T2, R] {
- @throws(classOf[Exception])
- def call(t1: T1, t2: T2): R
-
- final def apply(t1: T1, t2: T2): R = call(t1, t2)
-}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala
deleted file mode 100644
index d314dbdf1d..0000000000
--- a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.api.java.function
-
-import scala.runtime.AbstractFunction3
-
-/**
- * Subclass of Function3 for ease of calling from Java. The main thing it does is re-expose the
- * apply() method as call() and declare that it can throw Exception (since AbstractFunction3.apply
- * isn't marked to allow that).
- */
-private[spark] abstract class WrappedFunction3[T1, T2, T3, R]
- extends AbstractFunction3[T1, T2, T3, R] {
- @throws(classOf[Exception])
- def call(t1: T1, t2: T2, t3: T3): R
-
- final def apply(t1: T1, t2: T2, t3: T3): R = call(t1, t2, t3)
-}
-
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index aa5079c159..c7d0e2d577 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -386,14 +386,14 @@ public class JavaAPISuite implements Serializable {
@Test
public void map() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() {
+ JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction<Integer>() {
@Override
- public Double call(Integer x) {
+ public double call(Integer x) {
return 1.0 * x;
}
}).cache();
doubles.collect();
- JavaPairRDD<Integer, Integer> pairs = rdd.map(new PairFunction<Integer, Integer, Integer>() {
+ JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer x) {
return new Tuple2<Integer, Integer>(x, x);
@@ -422,7 +422,7 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals("Hello", words.first());
Assert.assertEquals(11, words.count());
- JavaPairRDD<String, String> pairs = rdd.flatMap(
+ JavaPairRDD<String, String> pairs = rdd.flatMapToPair(
new PairFlatMapFunction<String, String, String>() {
@Override
@@ -436,7 +436,7 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(new Tuple2<String, String>("Hello", "Hello"), pairs.first());
Assert.assertEquals(11, pairs.count());
- JavaDoubleRDD doubles = rdd.flatMap(new DoubleFlatMapFunction<String>() {
+ JavaDoubleRDD doubles = rdd.flatMapToDouble(new DoubleFlatMapFunction<String>() {
@Override
public Iterable<Double> call(String s) {
List<Double> lengths = new LinkedList<Double>();
@@ -459,7 +459,7 @@ public class JavaAPISuite implements Serializable {
JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
// Regression test for SPARK-668:
- JavaPairRDD<String, Integer> swapped = pairRDD.flatMap(
+ JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair(
new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) throws Exception {
@@ -469,7 +469,7 @@ public class JavaAPISuite implements Serializable {
swapped.collect();
// There was never a bug here, but it's worth testing:
- pairRDD.map(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
+ pairRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> item) throws Exception {
return item.swap();
@@ -592,7 +592,7 @@ public class JavaAPISuite implements Serializable {
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
- rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+ rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
@Override
public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
@@ -601,7 +601,7 @@ public class JavaAPISuite implements Serializable {
// Try reading the output back as an object file
JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class,
- Text.class).map(new PairFunction<Tuple2<IntWritable, Text>, Integer, String>() {
+ Text.class).mapToPair(new PairFunction<Tuple2<IntWritable, Text>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<IntWritable, Text> pair) {
return new Tuple2<Integer, String>(pair._1().get(), pair._2().toString());
@@ -622,7 +622,7 @@ public class JavaAPISuite implements Serializable {
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
- rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+ rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
@Override
public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
@@ -653,7 +653,7 @@ public class JavaAPISuite implements Serializable {
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
- rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+ rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
@Override
public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
@@ -713,7 +713,7 @@ public class JavaAPISuite implements Serializable {
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
- rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+ rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
@Override
public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
@@ -743,7 +743,7 @@ public class JavaAPISuite implements Serializable {
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
- rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+ rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
@Override
public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
@@ -766,9 +766,9 @@ public class JavaAPISuite implements Serializable {
@Test
public void zip() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() {
+ JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction<Integer>() {
@Override
- public Double call(Integer x) {
+ public double call(Integer x) {
return 1.0 * x;
}
});
@@ -893,13 +893,13 @@ public class JavaAPISuite implements Serializable {
@Test
public void mapOnPairRDD() {
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4));
- JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
+ JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer i) throws Exception {
return new Tuple2<Integer, Integer>(i, i % 2);
}
});
- JavaPairRDD<Integer, Integer> rdd3 = rdd2.map(
+ JavaPairRDD<Integer, Integer> rdd3 = rdd2.mapToPair(
new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) throws Exception {
@@ -919,7 +919,7 @@ public class JavaAPISuite implements Serializable {
public void collectPartitions() {
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
- JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
+ JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer i) throws Exception {
return new Tuple2<Integer, Integer>(i, i % 2);
@@ -984,7 +984,7 @@ public class JavaAPISuite implements Serializable {
public void collectAsMapWithIntArrayValues() {
// Regression test for SPARK-1040
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
- JavaPairRDD<Integer, int[]> pairRDD = rdd.map(new PairFunction<Integer, Integer, int[]>() {
+ JavaPairRDD<Integer, int[]> pairRDD = rdd.mapToPair(new PairFunction<Integer, Integer, int[]>() {
@Override
public Tuple2<Integer, int[]> call(Integer x) throws Exception {
return new Tuple2<Integer, int[]>(x, new int[] { x });
diff --git a/dev/run-tests b/dev/run-tests
index d65a397b4c..cf0b940c09 100755
--- a/dev/run-tests
+++ b/dev/run-tests
@@ -27,6 +27,16 @@ rm -rf ./work
# Fail fast
set -e
+if test -x "$JAVA_HOME/bin/java"; then
+ declare java_cmd="$JAVA_HOME/bin/java"
+else
+ declare java_cmd=java
+fi
+
+JAVA_VERSION=$($java_cmd -version 2>&1 | sed 's/java version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
+[ "$JAVA_VERSION" -ge 18 ] && echo "" || echo "[Warn] Java 8 tests will not run, because JDK version is < 1.8."
+
+
echo "========================================================================="
echo "Running Scala style checks"
echo "========================================================================="
diff --git a/docs/building-with-maven.md b/docs/building-with-maven.md
index ded1292688..a982c4dbac 100644
--- a/docs/building-with-maven.md
+++ b/docs/building-with-maven.md
@@ -25,6 +25,8 @@ If you don't run this, you may see errors like the following:
You can fix this by setting the `MAVEN_OPTS` variable as discussed before.
+*Note: For Java 1.8 and above this step is not required.*
+
## Specifying the Hadoop version ##
Because HDFS is not protocol-compatible across versions, if you want to read from HDFS, you'll need to build Spark against the specific HDFS version in your environment. You can do this through the "hadoop.version" property. If unset, Spark will build against Hadoop 1.0.4 by default.
@@ -76,3 +78,13 @@ The maven build includes support for building a Debian package containing the as
$ mvn -Pdeb -DskipTests clean package
The debian package can then be found under assembly/target. We added the short commit hash to the file name so that we can distinguish individual packages built for SNAPSHOT versions.
+
+## Running java 8 test suites.
+
+Running only java 8 tests and nothing else.
+
+ $ mvn install -DskipTests -Pjava8-tests
+
+Java 8 tests are run when -Pjava8-tests profile is enabled, they will run in spite of -DskipTests.
+For these tests to run your system must have a JDK 8 installation.
+If you have JDK 8 installed but it is not the system default, you can set JAVA_HOME to point to JDK 8 before running the tests.
diff --git a/docs/java-programming-guide.md b/docs/java-programming-guide.md
index 5c73dbb25e..6632360f6e 100644
--- a/docs/java-programming-guide.md
+++ b/docs/java-programming-guide.md
@@ -21,15 +21,21 @@ operations (e.g. map) and handling RDDs of different types, as discussed next.
There are a few key differences between the Java and Scala APIs:
-* Java does not support anonymous or first-class functions, so functions must
- be implemented by extending the
+* Java does not support anonymous or first-class functions, so functions are passed
+ using anonymous classes that implement the
[`org.apache.spark.api.java.function.Function`](api/core/index.html#org.apache.spark.api.java.function.Function),
[`Function2`](api/core/index.html#org.apache.spark.api.java.function.Function2), etc.
- classes.
+ interfaces.
* To maintain type safety, the Java API defines specialized Function and RDD
classes for key-value pairs and doubles. For example,
[`JavaPairRDD`](api/core/index.html#org.apache.spark.api.java.JavaPairRDD)
stores key-value pairs.
+* Some methods are defined on the basis of the passed anonymous function's
+ (a.k.a lambda expression) return type,
+ for example mapToPair(...) or flatMapToPair returns
+ [`JavaPairRDD`](api/core/index.html#org.apache.spark.api.java.JavaPairRDD),
+ similarly mapToDouble and flatMapToDouble returns
+ [`JavaDoubleRDD`](api/core/index.html#org.apache.spark.api.java.JavaDoubleRDD).
* RDD methods like `collect()` and `countByKey()` return Java collections types,
such as `java.util.List` and `java.util.Map`.
* Key-value pairs, which are simply written as `(key, value)` in Scala, are represented
@@ -53,10 +59,10 @@ each specialized RDD class, so filtering a `PairRDD` returns a new `PairRDD`,
etc (this acheives the "same-result-type" principle used by the [Scala collections
framework](http://docs.scala-lang.org/overviews/core/architecture-of-scala-collections.html)).
-## Function Classes
+## Function Interfaces
-The following table lists the function classes used by the Java API. Each
-class has a single abstract method, `call()`, that must be implemented.
+The following table lists the function interfaces used by the Java API. Each
+interface has a single abstract method, `call()`, that must be implemented.
<table class="table">
<tr><th>Class</th><th>Function Type</th></tr>
@@ -78,7 +84,6 @@ RDD [storage level](scala-programming-guide.html#rdd-persistence) constants, suc
declared in the [org.apache.spark.api.java.StorageLevels](api/core/index.html#org.apache.spark.api.java.StorageLevels) class. To
define your own storage level, you can use StorageLevels.create(...).
-
# Other Features
The Java API supports other Spark features, including
@@ -86,6 +91,21 @@ The Java API supports other Spark features, including
[broadcast variables](scala-programming-guide.html#broadcast-variables), and
[caching](scala-programming-guide.html#rdd-persistence).
+# Upgrading From Pre-1.0 Versions of Spark
+
+In version 1.0 of Spark the Java API was refactored to better support Java 8
+lambda expressions. Users upgrading from older versions of Spark should note
+the following changes:
+
+* All `org.apache.spark.api.java.function.*` have been changed from abstract
+ classes to interfaces. This means that concrete implementations of these
+ `Function` classes will need to use `implements` rather than `extends`.
+* Certain transformation functions now have multiple versions depending
+ on the return type. In Spark core, the map functions (map, flatMap,
+ mapPartitons) have type-specific versions, e.g.
+ [`mapToPair`](api/core/index.html#org.apache.spark.api.java.JavaRDD@mapToPair[K2,V2](f:org.apache.spark.api.java.function.PairFunction[T,K2,V2]):org.apache.spark.api.java.JavaPairRDD[K2,V2])
+ and [`mapToDouble`](api/core/index.html#org.apache.spark.api.java.JavaRDD@mapToDouble[R](f:org.apache.spark.api.java.function.DoubleFunction[T]):org.apache.spark.api.java.JavaDoubleRDD).
+ Spark Streaming also uses the same approach, e.g. [`transformToPair`](api/streaming/index.html#org.apache.spark.streaming.api.java.JavaDStream@transformToPair[K2,V2](transformFunc:org.apache.spark.api.java.function.Function[R,org.apache.spark.api.java.JavaPairRDD[K2,V2]]):org.apache.spark.streaming.api.java.JavaPairDStream[K2,V2]).
# Example
@@ -127,11 +147,20 @@ class Split extends FlatMapFunction<String, String> {
JavaRDD<String> words = lines.flatMap(new Split());
{% endhighlight %}
+Java 8+ users can also write the above `FlatMapFunction` in a more concise way using
+a lambda expression:
+
+{% highlight java %}
+JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(s.split(" ")));
+{% endhighlight %}
+
+This lambda syntax can be applied to all anonymous classes in Java 8.
+
Continuing with the word count example, we map each word to a `(word, 1)` pair:
{% highlight java %}
import scala.Tuple2;
-JavaPairRDD<String, Integer> ones = words.map(
+JavaPairRDD<String, Integer> ones = words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2(s, 1);
@@ -140,7 +169,7 @@ JavaPairRDD<String, Integer> ones = words.map(
);
{% endhighlight %}
-Note that `map` was passed a `PairFunction<String, String, Integer>` and
+Note that `mapToPair` was passed a `PairFunction<String, String, Integer>` and
returned a `JavaPairRDD<String, Integer>`.
To finish the word count program, we will use `reduceByKey` to count the
@@ -164,7 +193,7 @@ possible to chain the RDD transformations, so the word count example could also
be written as:
{% highlight java %}
-JavaPairRDD<String, Integer> counts = lines.flatMap(
+JavaPairRDD<String, Integer> counts = lines.flatMapToPair(
...
).map(
...
@@ -180,10 +209,11 @@ just a matter of style.
We currently provide documentation for the Java API as Scaladoc, in the
[`org.apache.spark.api.java` package](api/core/index.html#org.apache.spark.api.java.package), because
-some of the classes are implemented in Scala. The main downside is that the types and function
+some of the classes are implemented in Scala. It is important to note that the types and function
definitions show Scala syntax (for example, `def reduce(func: Function2[T, T]): T` instead of
-`T reduce(Function2<T, T> func)`).
-We hope to generate documentation with Java-style syntax in the future.
+`T reduce(Function2<T, T> func)`). In addition, the Scala `trait` modifier is used for Java
+interface classes. We hope to generate documentation with Java-style syntax in the future to
+avoid these quirks.
# Where to Go from Here
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
index d552c47b22..6b49244ba4 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
@@ -45,7 +45,7 @@ public final class JavaHdfsLR {
double y;
}
- static class ParsePoint extends Function<String, DataPoint> {
+ static class ParsePoint implements Function<String, DataPoint> {
private static final Pattern SPACE = Pattern.compile(" ");
@Override
@@ -60,7 +60,7 @@ public final class JavaHdfsLR {
}
}
- static class VectorSum extends Function2<double[], double[], double[]> {
+ static class VectorSum implements Function2<double[], double[], double[]> {
@Override
public double[] call(double[] a, double[] b) {
double[] result = new double[D];
@@ -71,7 +71,7 @@ public final class JavaHdfsLR {
}
}
- static class ComputeGradient extends Function<DataPoint, double[]> {
+ static class ComputeGradient implements Function<DataPoint, double[]> {
private final double[] weights;
ComputeGradient(double[] weights) {
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
index 0dc879275a..2d797279d5 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
@@ -98,7 +98,7 @@ public final class JavaKMeans {
double tempDist;
do {
// allocate each vector to closest centroid
- JavaPairRDD<Integer, Vector> closest = data.map(
+ JavaPairRDD<Integer, Vector> closest = data.mapToPair(
new PairFunction<Vector, Integer, Vector>() {
@Override
public Tuple2<Integer, Vector> call(Vector vector) {
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
index 9eb1cadd71..a518fe2f27 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
@@ -108,7 +108,7 @@ public final class JavaLogQuery {
JavaRDD<String> dataSet = (args.length == 2) ? jsc.textFile(args[1]) : jsc.parallelize(exampleApacheLogs);
- JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.map(new PairFunction<String, Tuple3<String, String, String>, Stats>() {
+ JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.mapToPair(new PairFunction<String, Tuple3<String, String, String>, Stats>() {
@Override
public Tuple2<Tuple3<String, String, String>, Stats> call(String s) {
return new Tuple2<Tuple3<String, String, String>, Stats>(extractKey(s), extractStats(s));
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
index a84245b0c7..e53925b50c 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
@@ -42,7 +42,7 @@ import java.util.regex.Pattern;
public final class JavaPageRank {
private static final Pattern SPACES = Pattern.compile("\\s+");
- private static class Sum extends Function2<Double, Double, Double> {
+ private static class Sum implements Function2<Double, Double, Double> {
@Override
public Double call(Double a, Double b) {
return a + b;
@@ -66,7 +66,7 @@ public final class JavaPageRank {
JavaRDD<String> lines = ctx.textFile(args[1], 1);
// Loads all URLs from input file and initialize their neighbors.
- JavaPairRDD<String, List<String>> links = lines.map(new PairFunction<String, String, String>() {
+ JavaPairRDD<String, List<String>> links = lines.mapToPair(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) {
String[] parts = SPACES.split(s);
@@ -86,7 +86,7 @@ public final class JavaPageRank {
for (int current = 0; current < Integer.parseInt(args[2]); current++) {
// Calculates URL contributions to the rank of other URLs.
JavaPairRDD<String, Double> contribs = links.join(ranks).values()
- .flatMap(new PairFlatMapFunction<Tuple2<List<String>, Double>, String, Double>() {
+ .flatMapToPair(new PairFlatMapFunction<Tuple2<List<String>, Double>, String, Double>() {
@Override
public Iterable<Tuple2<String, Double>> call(Tuple2<List<String>, Double> s) {
List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>();
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaTC.java b/examples/src/main/java/org/apache/spark/examples/JavaTC.java
index 2ceb0fd94b..6cfe25c80e 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaTC.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaTC.java
@@ -50,7 +50,7 @@ public final class JavaTC {
return new ArrayList<Tuple2<Integer, Integer>>(edges);
}
- static class ProjectFn extends PairFunction<Tuple2<Integer, Tuple2<Integer, Integer>>,
+ static class ProjectFn implements PairFunction<Tuple2<Integer, Tuple2<Integer, Integer>>,
Integer, Integer> {
static final ProjectFn INSTANCE = new ProjectFn();
@@ -77,7 +77,7 @@ public final class JavaTC {
// the graph to obtain the path (x, z).
// Because join() joins on keys, the edges are stored in reversed order.
- JavaPairRDD<Integer, Integer> edges = tc.map(
+ JavaPairRDD<Integer, Integer> edges = tc.mapToPair(
new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> e) {
@@ -91,7 +91,7 @@ public final class JavaTC {
oldCount = nextCount;
// Perform the join, obtaining an RDD of (y, (z, x)) pairs,
// then project the result to obtain the new (x, z) paths.
- tc = tc.union(tc.join(edges).map(ProjectFn.INSTANCE)).distinct().cache();
+ tc = tc.union(tc.join(edges).mapToPair(ProjectFn.INSTANCE)).distinct().cache();
nextCount = tc.count();
} while (nextCount != oldCount);
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
index 6651f98d56..fa1b977ab1 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
@@ -49,7 +49,7 @@ public final class JavaWordCount {
}
});
- JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
+ JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
index 435a86e62a..64a3a04fb7 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
@@ -35,7 +35,7 @@ import scala.Tuple2;
*/
public final class JavaALS {
- static class ParseRating extends Function<String, Rating> {
+ static class ParseRating implements Function<String, Rating> {
private static final Pattern COMMA = Pattern.compile(",");
@Override
@@ -48,7 +48,7 @@ public final class JavaALS {
}
}
- static class FeaturesToString extends Function<Tuple2<Object, double[]>, String> {
+ static class FeaturesToString implements Function<Tuple2<Object, double[]>, String> {
@Override
public String call(Tuple2<Object, double[]> element) {
return element._1() + "," + Arrays.toString(element._2());
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
index 4b2658f257..76ebdccfd6 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
@@ -32,7 +32,7 @@ import java.util.regex.Pattern;
*/
public final class JavaKMeans {
- static class ParsePoint extends Function<String, double[]> {
+ static class ParsePoint implements Function<String, double[]> {
private static final Pattern SPACE = Pattern.compile(" ");
@Override
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
index 21586ce817..667c72f379 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
@@ -34,7 +34,7 @@ import java.util.regex.Pattern;
*/
public final class JavaLR {
- static class ParsePoint extends Function<String, LabeledPoint> {
+ static class ParsePoint implements Function<String, LabeledPoint> {
private static final Pattern COMMA = Pattern.compile(",");
private static final Pattern SPACE = Pattern.compile(" ");
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
index 2ffd351b4e..d704be08d6 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
@@ -89,7 +89,7 @@ public final class JavaKafkaWordCount {
}
});
- JavaPairDStream<String, Integer> wordCounts = words.map(
+ JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
index 7777c9832a..7f68d451e9 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
@@ -69,7 +69,7 @@ public final class JavaNetworkWordCount {
return Lists.newArrayList(SPACE.split(x));
}
});
- JavaPairDStream<String, Integer> wordCounts = words.map(
+ JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
index 26c44620ab..88ad341641 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
@@ -63,7 +63,7 @@ public final class JavaQueueStream {
// Create the QueueInputDStream and use it do some processing
JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue);
- JavaPairDStream<Integer, Integer> mappedStream = inputStream.map(
+ JavaPairDStream<Integer, Integer> mappedStream = inputStream.mapToPair(
new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer i) {
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
index c989ec0f27..b254e00714 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -75,7 +75,7 @@ object ZeroMQUtils {
): JavaDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator
createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
}
@@ -99,7 +99,7 @@ object ZeroMQUtils {
): JavaDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator
createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel)
}
@@ -122,7 +122,7 @@ object ZeroMQUtils {
): JavaDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator
createStream[T](jssc.ssc, publisherUrl, subscribe, fn)
}
}
diff --git a/extras/README.md b/extras/README.md
new file mode 100644
index 0000000000..1b4174b7d5
--- /dev/null
+++ b/extras/README.md
@@ -0,0 +1 @@
+This directory contains build components not included by default in Spark's build.
diff --git a/extras/java8-tests/README.md b/extras/java8-tests/README.md
new file mode 100644
index 0000000000..e95b73ac77
--- /dev/null
+++ b/extras/java8-tests/README.md
@@ -0,0 +1,24 @@
+# Java 8 Test Suites
+
+These tests require having Java 8 installed and are isolated from the main Spark build.
+If Java 8 is not your system's default Java version, you will need to point Spark's build
+to your Java location. The set-up depends a bit on the build system:
+
+* Sbt users can either set JAVA_HOME to the location of a Java 8 JDK or explicitly pass
+ `-java-home` to the sbt launch script. If a Java 8 JDK is detected sbt will automatically
+ include the Java 8 test project.
+
+ `$ JAVA_HOME=/opt/jdk1.8.0/ sbt/sbt clean "test-only org.apache.spark.Java8APISuite"`
+
+* For Maven users,
+
+ Maven users can also refer to their Java 8 directory using JAVA_HOME. However, Maven will not
+ automatically detect the presence of a Java 8 JDK, so a special build profile `-Pjava8-tests`
+ must be used.
+
+ `$ JAVA_HOME=/opt/jdk1.8.0/ mvn clean install -DskipTests`
+ `$ JAVA_HOME=/opt/jdk1.8.0/ mvn test -Pjava8-tests -DwildcardSuites=org.apache.spark.Java8APISuite`
+
+ Note that the above command can only be run from project root directory since this module
+ depends on core and the test-jars of core and streaming. This means an install step is
+ required to make the test dependencies visible to the Java 8 sub-project.
diff --git a/extras/java8-tests/pom.xml b/extras/java8-tests/pom.xml
new file mode 100644
index 0000000000..602f66f9c5
--- /dev/null
+++ b/extras/java8-tests/pom.xml
@@ -0,0 +1,151 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements. See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the "License"); you may not use this file except in compliance with
+~ the License. You may obtain a copy of the License at
+~
+~ http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>java8-tests_2.10</artifactId>
+ <packaging>pom</packaging>
+ <name>Spark Project Java8 Tests POM</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>com.novocode</groupId>
+ <artifactId>junit-interface</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <id>java8-tests</id>
+ </profile>
+ </profiles>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>test</id>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <systemPropertyVariables>
+ <!-- For some reason surefire isn't setting this log4j file on the
+ test classpath automatically. So we add it manually. -->
+ <log4j.configuration>
+ file:src/test/resources/log4j.properties
+ </log4j.configuration>
+ </systemPropertyVariables>
+ <skipTests>false</skipTests>
+ <includes>
+ <include>**/Suite*.java</include>
+ <include>**/*Suite.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>test-compile-first</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <fork>true</fork>
+ <verbose>true</verbose>
+ <forceJavacCompilerUse>true</forceJavacCompilerUse>
+ <source>1.8</source>
+ <compilerVersion>1.8</compilerVersion>
+ <target>1.8</target>
+ <encoding>UTF-8</encoding>
+ <maxmem>1024m</maxmem>
+ </configuration>
+ </plugin>
+ <plugin>
+ <!-- disabled -->
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>none</phase>
+ </execution>
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>none</phase>
+ </execution>
+ <execution>
+ <id>scala-test-compile-first</id>
+ <phase>none</phase>
+ </execution>
+ <execution>
+ <id>attach-scaladocs</id>
+ <phase>none</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>test</id>
+ <phase>none</phase>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
new file mode 100644
index 0000000000..f67251217e
--- /dev/null
+++ b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.*;
+
+import scala.Tuple2;
+
+import com.google.common.base.Optional;
+import com.google.common.io.Files;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.JavaDoubleRDD;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.*;
+
+/**
+ * Most of these tests replicate org.apache.spark.JavaAPISuite using java 8
+ * lambda syntax.
+ */
+public class Java8APISuite implements Serializable {
+ static int foreachCalls = 0;
+ private transient JavaSparkContext sc;
+
+ @Before
+ public void setUp() {
+ sc = new JavaSparkContext("local", "JavaAPISuite");
+ }
+
+ @After
+ public void tearDown() {
+ sc.stop();
+ sc = null;
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.driver.port");
+ }
+
+ @Test
+ public void foreachWithAnonymousClass() {
+ foreachCalls = 0;
+ JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
+ rdd.foreach(new VoidFunction<String>() {
+ @Override
+ public void call(String s) {
+ foreachCalls++;
+ }
+ });
+ Assert.assertEquals(2, foreachCalls);
+ }
+
+ @Test
+ public void foreach() {
+ foreachCalls = 0;
+ JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
+ rdd.foreach((x) -> foreachCalls++);
+ Assert.assertEquals(2, foreachCalls);
+ }
+
+ @Test
+ public void groupBy() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+ Function<Integer, Boolean> isOdd = x -> x % 2 == 0;
+ JavaPairRDD<Boolean, List<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
+ Assert.assertEquals(2, oddsAndEvens.count());
+ Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size()); // Evens
+ Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
+
+ oddsAndEvens = rdd.groupBy(isOdd, 1);
+ Assert.assertEquals(2, oddsAndEvens.count());
+ Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size()); // Evens
+ Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
+ }
+
+ @Test
+ public void leftOuterJoin() {
+ JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<Integer, Integer>(1, 1),
+ new Tuple2<Integer, Integer>(1, 2),
+ new Tuple2<Integer, Integer>(2, 1),
+ new Tuple2<Integer, Integer>(3, 1)
+ ));
+ JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<Integer, Character>(1, 'x'),
+ new Tuple2<Integer, Character>(2, 'y'),
+ new Tuple2<Integer, Character>(2, 'z'),
+ new Tuple2<Integer, Character>(4, 'w')
+ ));
+ List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined =
+ rdd1.leftOuterJoin(rdd2).collect();
+ Assert.assertEquals(5, joined.size());
+ Tuple2<Integer, Tuple2<Integer, Optional<Character>>> firstUnmatched =
+ rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
+ Assert.assertEquals(3, firstUnmatched._1().intValue());
+ }
+
+ @Test
+ public void foldReduce() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+ Function2<Integer, Integer, Integer> add = (a, b) -> a + b;
+
+ int sum = rdd.fold(0, add);
+ Assert.assertEquals(33, sum);
+
+ sum = rdd.reduce(add);
+ Assert.assertEquals(33, sum);
+ }
+
+ @Test
+ public void foldByKey() {
+ List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
+ new Tuple2<Integer, Integer>(2, 1),
+ new Tuple2<Integer, Integer>(2, 1),
+ new Tuple2<Integer, Integer>(1, 1),
+ new Tuple2<Integer, Integer>(3, 2),
+ new Tuple2<Integer, Integer>(3, 1)
+ );
+ JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
+ JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b);
+ Assert.assertEquals(1, sums.lookup(1).get(0).intValue());
+ Assert.assertEquals(2, sums.lookup(2).get(0).intValue());
+ Assert.assertEquals(3, sums.lookup(3).get(0).intValue());
+ }
+
+ @Test
+ public void reduceByKey() {
+ List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
+ new Tuple2<Integer, Integer>(2, 1),
+ new Tuple2<Integer, Integer>(2, 1),
+ new Tuple2<Integer, Integer>(1, 1),
+ new Tuple2<Integer, Integer>(3, 2),
+ new Tuple2<Integer, Integer>(3, 1)
+ );
+ JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
+ JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
+ Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
+ Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
+ Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
+
+ Map<Integer, Integer> localCounts = counts.collectAsMap();
+ Assert.assertEquals(1, localCounts.get(1).intValue());
+ Assert.assertEquals(2, localCounts.get(2).intValue());
+ Assert.assertEquals(3, localCounts.get(3).intValue());
+
+ localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
+ Assert.assertEquals(1, localCounts.get(1).intValue());
+ Assert.assertEquals(2, localCounts.get(2).intValue());
+ Assert.assertEquals(3, localCounts.get(3).intValue());
+ }
+
+ @Test
+ public void map() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+ JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x).cache();
+ doubles.collect();
+ JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<Integer, Integer>(x, x))
+ .cache();
+ pairs.collect();
+ JavaRDD<String> strings = rdd.map(x -> x.toString()).cache();
+ strings.collect();
+ }
+
+ @Test
+ public void flatMap() {
+ JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!",
+ "The quick brown fox jumps over the lazy dog."));
+ JavaRDD<String> words = rdd.flatMap(x -> Arrays.asList(x.split(" ")));
+
+ Assert.assertEquals("Hello", words.first());
+ Assert.assertEquals(11, words.count());
+
+ JavaPairRDD<String, String> pairs = rdd.flatMapToPair(s -> {
+ List<Tuple2<String, String>> pairs2 = new LinkedList<Tuple2<String, String>>();
+ for (String word : s.split(" ")) pairs2.add(new Tuple2<String, String>(word, word));
+ return pairs2;
+ });
+
+ Assert.assertEquals(new Tuple2<String, String>("Hello", "Hello"), pairs.first());
+ Assert.assertEquals(11, pairs.count());
+
+ JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> {
+ List<Double> lengths = new LinkedList<Double>();
+ for (String word : s.split(" ")) lengths.add(word.length() * 1.0);
+ return lengths;
+ });
+
+ Double x = doubles.first();
+ Assert.assertEquals(5.0, doubles.first().doubleValue(), 0.01);
+ Assert.assertEquals(11, pairs.count());
+ }
+
+ @Test
+ public void mapsFromPairsToPairs() {
+ List<Tuple2<Integer, String>> pairs = Arrays.asList(
+ new Tuple2<Integer, String>(1, "a"),
+ new Tuple2<Integer, String>(2, "aa"),
+ new Tuple2<Integer, String>(3, "aaa")
+ );
+ JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
+
+ // Regression test for SPARK-668:
+ JavaPairRDD<String, Integer> swapped =
+ pairRDD.flatMapToPair(x -> Collections.singletonList(x.swap()));
+ swapped.collect();
+
+ // There was never a bug here, but it's worth testing:
+ pairRDD.map(item -> item.swap()).collect();
+ }
+
+ @Test
+ public void mapPartitions() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
+ JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> {
+ int sum = 0;
+ while (iter.hasNext()) {
+ sum += iter.next();
+ }
+ return Collections.singletonList(sum);
+ });
+
+ Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
+ }
+
+ @Test
+ public void sequenceFile() {
+ File tempDir = Files.createTempDir();
+ String outputDir = new File(tempDir, "output").getAbsolutePath();
+ List<Tuple2<Integer, String>> pairs = Arrays.asList(
+ new Tuple2<Integer, String>(1, "a"),
+ new Tuple2<Integer, String>(2, "aa"),
+ new Tuple2<Integer, String>(3, "aaa")
+ );
+ JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
+
+ rdd.mapToPair(pair ->
+ new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2())))
+ .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
+
+ // Try reading the output back as an object file
+ JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, Text.class)
+ .mapToPair(pair -> new Tuple2<Integer, String>(pair._1().get(), pair._2().toString()));
+ Assert.assertEquals(pairs, readRDD.collect());
+ }
+
+ @Test
+ public void zip() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+ JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x);
+ JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
+ zipped.count();
+ }
+
+ @Test
+ public void zipPartitions() {
+ JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
+ JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2);
+ FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
+ (Iterator<Integer> i, Iterator<String> s) -> {
+ int sizeI = 0;
+ int sizeS = 0;
+ while (i.hasNext()) {
+ sizeI += 1;
+ i.next();
+ }
+ while (s.hasNext()) {
+ sizeS += 1;
+ s.next();
+ }
+ return Arrays.asList(sizeI, sizeS);
+ };
+ JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
+ Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
+ }
+
+ @Test
+ public void accumulators() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+
+ final Accumulator<Integer> intAccum = sc.intAccumulator(10);
+ rdd.foreach(x -> intAccum.add(x));
+ Assert.assertEquals((Integer) 25, intAccum.value());
+
+ final Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
+ rdd.foreach(x -> doubleAccum.add((double) x));
+ Assert.assertEquals((Double) 25.0, doubleAccum.value());
+
+ // Try a custom accumulator type
+ AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() {
+ public Float addInPlace(Float r, Float t) {
+ return r + t;
+ }
+
+ public Float addAccumulator(Float r, Float t) {
+ return r + t;
+ }
+
+ public Float zero(Float initialValue) {
+ return 0.0f;
+ }
+ };
+
+ final Accumulator<Float> floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam);
+ rdd.foreach(x -> floatAccum.add((float) x));
+ Assert.assertEquals((Float) 25.0f, floatAccum.value());
+
+ // Test the setValue method
+ floatAccum.setValue(5.0f);
+ Assert.assertEquals((Float) 5.0f, floatAccum.value());
+ }
+
+ @Test
+ public void keyBy() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
+ List<Tuple2<String, Integer>> s = rdd.keyBy(x -> x.toString()).collect();
+ Assert.assertEquals(new Tuple2<String, Integer>("1", 1), s.get(0));
+ Assert.assertEquals(new Tuple2<String, Integer>("2", 2), s.get(1));
+ }
+
+ @Test
+ public void mapOnPairRDD() {
+ JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
+ JavaPairRDD<Integer, Integer> rdd2 =
+ rdd1.mapToPair(i -> new Tuple2<Integer, Integer>(i, i % 2));
+ JavaPairRDD<Integer, Integer> rdd3 =
+ rdd2.mapToPair(in -> new Tuple2<Integer, Integer>(in._2(), in._1()));
+ Assert.assertEquals(Arrays.asList(
+ new Tuple2<Integer, Integer>(1, 1),
+ new Tuple2<Integer, Integer>(0, 2),
+ new Tuple2<Integer, Integer>(1, 3),
+ new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
+ }
+
+ @Test
+ public void collectPartitions() {
+ JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
+
+ JavaPairRDD<Integer, Integer> rdd2 =
+ rdd1.mapToPair(i -> new Tuple2<Integer, Integer>(i, i % 2));
+ List[] parts = rdd1.collectPartitions(new int[]{0});
+ Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
+
+ parts = rdd1.collectPartitions(new int[]{1, 2});
+ Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
+ Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
+
+ Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1),
+ new Tuple2<Integer, Integer>(2, 0)),
+ rdd2.collectPartitions(new int[]{0})[0]);
+
+ parts = rdd2.collectPartitions(new int[]{1, 2});
+ Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1),
+ new Tuple2<Integer, Integer>(4, 0)), parts[0]);
+ Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1),
+ new Tuple2<Integer, Integer>(6, 0),
+ new Tuple2<Integer, Integer>(7, 1)), parts[1]);
+ }
+
+ @Test
+ public void collectAsMapWithIntArrayValues() {
+ // Regression test for SPARK-1040
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[]{1}));
+ JavaPairRDD<Integer, int[]> pairRDD =
+ rdd.mapToPair(x -> new Tuple2<Integer, int[]>(x, new int[]{x}));
+ pairRDD.collect(); // Works fine
+ Map<Integer, int[]> map = pairRDD.collectAsMap(); // Used to crash with ClassCastException
+ }
+}
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
new file mode 100644
index 0000000000..43df0dea61
--- /dev/null
+++ b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
@@ -0,0 +1,841 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import java.io.Serializable;
+import java.util.*;
+
+import scala.Tuple2;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.spark.HashPartitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+
+/**
+ * Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using java 8
+ * lambda syntax.
+ */
+public class Java8APISuite extends LocalJavaStreamingContext implements Serializable {
+
+ @Test
+ public void testMap() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("hello", "world"),
+ Arrays.asList("goodnight", "moon"));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(5, 5),
+ Arrays.asList(9, 4));
+
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> letterCount = stream.map(s -> s.length());
+ JavaTestUtils.attachTestOutputStream(letterCount);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testFilter() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("giants", "dodgers"),
+ Arrays.asList("yankees", "red socks"));
+
+ List<List<String>> expected = Arrays.asList(
+ Arrays.asList("giants"),
+ Arrays.asList("yankees"));
+
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<String> filtered = stream.filter(s -> s.contains("a"));
+ JavaTestUtils.attachTestOutputStream(filtered);
+ List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testMapPartitions() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("giants", "dodgers"),
+ Arrays.asList("yankees", "red socks"));
+
+ List<List<String>> expected = Arrays.asList(
+ Arrays.asList("GIANTSDODGERS"),
+ Arrays.asList("YANKEESRED SOCKS"));
+
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<String> mapped = stream.mapPartitions(in -> {
+ String out = "";
+ while (in.hasNext()) {
+ out = out + in.next().toUpperCase();
+ }
+ return Lists.newArrayList(out);
+ });
+ JavaTestUtils.attachTestOutputStream(mapped);
+ List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testReduce() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1, 2, 3),
+ Arrays.asList(4, 5, 6),
+ Arrays.asList(7, 8, 9));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(6),
+ Arrays.asList(15),
+ Arrays.asList(24));
+
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> reduced = stream.reduce((x, y) -> x + y);
+ JavaTestUtils.attachTestOutputStream(reduced);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testReduceByWindow() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1, 2, 3),
+ Arrays.asList(4, 5, 6),
+ Arrays.asList(7, 8, 9));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(6),
+ Arrays.asList(21),
+ Arrays.asList(39),
+ Arrays.asList(24));
+
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> reducedWindowed = stream.reduceByWindow((x, y) -> x + y,
+ (x, y) -> x - y, new Duration(2000), new Duration(1000));
+ JavaTestUtils.attachTestOutputStream(reducedWindowed);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testTransform() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1, 2, 3),
+ Arrays.asList(4, 5, 6),
+ Arrays.asList(7, 8, 9));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(3, 4, 5),
+ Arrays.asList(6, 7, 8),
+ Arrays.asList(9, 10, 11));
+
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> transformed = stream.transform(in -> in.map(i -> i + 2));
+
+ JavaTestUtils.attachTestOutputStream(transformed);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testVariousTransform() {
+ // tests whether all variations of transform can be called from Java
+
+ List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1));
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+
+ List<List<Tuple2<String, Integer>>> pairInputData =
+ Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(
+ JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1));
+
+ JavaDStream<Integer> transformed1 = stream.transform(in -> null);
+ JavaDStream<Integer> transformed2 = stream.transform((x, time) -> null);
+ JavaPairDStream<String, Integer> transformed3 = stream.transformToPair(x -> null);
+ JavaPairDStream<String, Integer> transformed4 = stream.transformToPair((x, time) -> null);
+ JavaDStream<Integer> pairTransformed1 = pairStream.transform(x -> null);
+ JavaDStream<Integer> pairTransformed2 = pairStream.transform((x, time) -> null);
+ JavaPairDStream<String, String> pairTransformed3 = pairStream.transformToPair(x -> null);
+ JavaPairDStream<String, String> pairTransformed4 =
+ pairStream.transformToPair((x, time) -> null);
+
+ }
+
+ @Test
+ public void testTransformWith() {
+ List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, String>("california", "dodgers"),
+ new Tuple2<String, String>("new york", "yankees")),
+ Arrays.asList(
+ new Tuple2<String, String>("california", "sharks"),
+ new Tuple2<String, String>("new york", "rangers")));
+
+ List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, String>("california", "giants"),
+ new Tuple2<String, String>("new york", "mets")),
+ Arrays.asList(
+ new Tuple2<String, String>("california", "ducks"),
+ new Tuple2<String, String>("new york", "islanders")));
+
+
+ List<HashSet<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
+ Sets.newHashSet(
+ new Tuple2<String, Tuple2<String, String>>("california",
+ new Tuple2<String, String>("dodgers", "giants")),
+ new Tuple2<String, Tuple2<String, String>>("new york",
+ new Tuple2<String, String>("yankees", "mets"))),
+ Sets.newHashSet(
+ new Tuple2<String, Tuple2<String, String>>("california",
+ new Tuple2<String, String>("sharks", "ducks")),
+ new Tuple2<String, Tuple2<String, String>>("new york",
+ new Tuple2<String, String>("rangers", "islanders"))));
+
+ JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
+ ssc, stringStringKVStream1, 1);
+ JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
+
+ JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
+ ssc, stringStringKVStream2, 1);
+ JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
+
+ JavaPairDStream<String, Tuple2<String, String>> joined =
+ pairStream1.transformWithToPair(pairStream2,(x, y, z) -> x.join(y));
+
+ JavaTestUtils.attachTestOutputStream(joined);
+ List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+ List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList();
+ for (List<Tuple2<String, Tuple2<String, String>>> res : result) {
+ unorderedResult.add(Sets.newHashSet(res));
+ }
+
+ Assert.assertEquals(expected, unorderedResult);
+ }
+
+
+ @Test
+ public void testVariousTransformWith() {
+ // tests whether all variations of transformWith can be called from Java
+
+ List<List<Integer>> inputData1 = Arrays.asList(Arrays.asList(1));
+ List<List<String>> inputData2 = Arrays.asList(Arrays.asList("x"));
+ JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1);
+ JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1);
+
+ List<List<Tuple2<String, Integer>>> pairInputData1 =
+ Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+ List<List<Tuple2<Double, Character>>> pairInputData2 =
+ Arrays.asList(Arrays.asList(new Tuple2<Double, Character>(1.0, 'x')));
+ JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream(
+ JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1));
+ JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream(
+ JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1));
+
+ JavaDStream<Double> transformed1 = stream1.transformWith(stream2, (x, y, z) -> null);
+ JavaDStream<Double> transformed2 = stream1.transformWith(pairStream1,(x, y, z) -> null);
+
+ JavaPairDStream<Double, Double> transformed3 =
+ stream1.transformWithToPair(stream2,(x, y, z) -> null);
+
+ JavaPairDStream<Double, Double> transformed4 =
+ stream1.transformWithToPair(pairStream1,(x, y, z) -> null);
+
+ JavaDStream<Double> pairTransformed1 = pairStream1.transformWith(stream2,(x, y, z) -> null);
+
+ JavaDStream<Double> pairTransformed2_ =
+ pairStream1.transformWith(pairStream1,(x, y, z) -> null);
+
+ JavaPairDStream<Double, Double> pairTransformed3 =
+ pairStream1.transformWithToPair(stream2,(x, y, z) -> null);
+
+ JavaPairDStream<Double, Double> pairTransformed4 =
+ pairStream1.transformWithToPair(pairStream2,(x, y, z) -> null);
+ }
+
+ @Test
+ public void testStreamingContextTransform() {
+ List<List<Integer>> stream1input = Arrays.asList(
+ Arrays.asList(1),
+ Arrays.asList(2)
+ );
+
+ List<List<Integer>> stream2input = Arrays.asList(
+ Arrays.asList(3),
+ Arrays.asList(4)
+ );
+
+ List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList(
+ Arrays.asList(new Tuple2<Integer, String>(1, "x")),
+ Arrays.asList(new Tuple2<Integer, String>(2, "y"))
+ );
+
+ List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(1, new Tuple2<Integer, String>(1, "x"))),
+ Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(2, new Tuple2<Integer, String>(2, "y")))
+ );
+
+ JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1);
+ JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, stream2input, 1);
+ JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream(
+ JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1));
+
+ List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2);
+
+ // This is just to test whether this transform to JavaStream compiles
+ JavaDStream<Long> transformed1 = ssc.transform(
+ listOfDStreams1, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
+ assert (listOfRDDs.size() == 2);
+ return null;
+ });
+
+ List<JavaDStream<?>> listOfDStreams2 =
+ Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream());
+
+ JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair(
+ listOfDStreams2, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
+ assert (listOfRDDs.size() == 3);
+ JavaRDD<Integer> rdd1 = (JavaRDD<Integer>) listOfRDDs.get(0);
+ JavaRDD<Integer> rdd2 = (JavaRDD<Integer>) listOfRDDs.get(1);
+ JavaRDD<Tuple2<Integer, String>> rdd3 = (JavaRDD<Tuple2<Integer, String>>) listOfRDDs.get(2);
+ JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
+ PairFunction<Integer, Integer, Integer> mapToTuple =
+ (Integer i) -> new Tuple2<Integer, Integer>(i, i);
+ return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
+ });
+ JavaTestUtils.attachTestOutputStream(transformed2);
+ List<List<Tuple2<Integer, Tuple2<Integer, String>>>> result =
+ JavaTestUtils.runStreams(ssc, 2, 2);
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testFlatMap() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("go", "giants"),
+ Arrays.asList("boo", "dodgers"),
+ Arrays.asList("athletics"));
+
+ List<List<String>> expected = Arrays.asList(
+ Arrays.asList("g", "o", "g", "i", "a", "n", "t", "s"),
+ Arrays.asList("b", "o", "o", "d", "o", "d", "g", "e", "r", "s"),
+ Arrays.asList("a", "t", "h", "l", "e", "t", "i", "c", "s"));
+
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<String> flatMapped = stream.flatMap(s -> Lists.newArrayList(s.split("(?!^)")));
+ JavaTestUtils.attachTestOutputStream(flatMapped);
+ List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testPairFlatMap() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("giants"),
+ Arrays.asList("dodgers"),
+ Arrays.asList("athletics"));
+
+ List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, String>(6, "g"),
+ new Tuple2<Integer, String>(6, "i"),
+ new Tuple2<Integer, String>(6, "a"),
+ new Tuple2<Integer, String>(6, "n"),
+ new Tuple2<Integer, String>(6, "t"),
+ new Tuple2<Integer, String>(6, "s")),
+ Arrays.asList(
+ new Tuple2<Integer, String>(7, "d"),
+ new Tuple2<Integer, String>(7, "o"),
+ new Tuple2<Integer, String>(7, "d"),
+ new Tuple2<Integer, String>(7, "g"),
+ new Tuple2<Integer, String>(7, "e"),
+ new Tuple2<Integer, String>(7, "r"),
+ new Tuple2<Integer, String>(7, "s")),
+ Arrays.asList(
+ new Tuple2<Integer, String>(9, "a"),
+ new Tuple2<Integer, String>(9, "t"),
+ new Tuple2<Integer, String>(9, "h"),
+ new Tuple2<Integer, String>(9, "l"),
+ new Tuple2<Integer, String>(9, "e"),
+ new Tuple2<Integer, String>(9, "t"),
+ new Tuple2<Integer, String>(9, "i"),
+ new Tuple2<Integer, String>(9, "c"),
+ new Tuple2<Integer, String>(9, "s")));
+
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(s -> {
+ List<Tuple2<Integer, String>> out = Lists.newArrayList();
+ for (String letter : s.split("(?!^)")) {
+ out.add(new Tuple2<Integer, String>(s.length(), letter));
+ }
+ return out;
+ });
+
+ JavaTestUtils.attachTestOutputStream(flatMapped);
+ List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ /*
+ * Performs an order-invariant comparison of lists representing two RDD streams. This allows
+ * us to account for ordering variation within individual RDD's which occurs during windowing.
+ */
+ public static <T extends Comparable<T>> void assertOrderInvariantEquals(
+ List<List<T>> expected, List<List<T>> actual) {
+ for (List<T> list : expected) {
+ Collections.sort(list);
+ }
+ for (List<T> list : actual) {
+ Collections.sort(list);
+ }
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testPairFilter() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("giants", "dodgers"),
+ Arrays.asList("yankees", "red socks"));
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, Integer>("giants", 6)),
+ Arrays.asList(new Tuple2<String, Integer>("yankees", 7)));
+
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream =
+ stream.mapToPair(x -> new Tuple2<>(x, x.length()));
+ JavaPairDStream<String, Integer> filtered = pairStream.filter(x -> x._1().contains("a"));
+ JavaTestUtils.attachTestOutputStream(filtered);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
+ new Tuple2<String, String>("california", "giants"),
+ new Tuple2<String, String>("new york", "yankees"),
+ new Tuple2<String, String>("new york", "mets")),
+ Arrays.asList(new Tuple2<String, String>("california", "sharks"),
+ new Tuple2<String, String>("california", "ducks"),
+ new Tuple2<String, String>("new york", "rangers"),
+ new Tuple2<String, String>("new york", "islanders")));
+
+ List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 1),
+ new Tuple2<String, Integer>("california", 3),
+ new Tuple2<String, Integer>("new york", 4),
+ new Tuple2<String, Integer>("new york", 1)),
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 5),
+ new Tuple2<String, Integer>("california", 5),
+ new Tuple2<String, Integer>("new york", 3),
+ new Tuple2<String, Integer>("new york", 1)));
+
+ @Test
+ public void testPairMap() { // Maps pair -> pair of different type
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, String>(1, "california"),
+ new Tuple2<Integer, String>(3, "california"),
+ new Tuple2<Integer, String>(4, "new york"),
+ new Tuple2<Integer, String>(1, "new york")),
+ Arrays.asList(
+ new Tuple2<Integer, String>(5, "california"),
+ new Tuple2<Integer, String>(5, "california"),
+ new Tuple2<Integer, String>(3, "new york"),
+ new Tuple2<Integer, String>(1, "new york")));
+
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(x -> x.swap());
+ JavaTestUtils.attachTestOutputStream(reversed);
+ List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairMapPartitions() { // Maps pair -> pair of different type
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, String>(1, "california"),
+ new Tuple2<Integer, String>(3, "california"),
+ new Tuple2<Integer, String>(4, "new york"),
+ new Tuple2<Integer, String>(1, "new york")),
+ Arrays.asList(
+ new Tuple2<Integer, String>(5, "california"),
+ new Tuple2<Integer, String>(5, "california"),
+ new Tuple2<Integer, String>(3, "new york"),
+ new Tuple2<Integer, String>(1, "new york")));
+
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(in -> {
+ LinkedList<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
+ while (in.hasNext()) {
+ Tuple2<String, Integer> next = in.next();
+ out.add(next.swap());
+ }
+ return out;
+ });
+
+ JavaTestUtils.attachTestOutputStream(reversed);
+ List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairMap2() { // Maps pair -> single
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(1, 3, 4, 1),
+ Arrays.asList(5, 5, 3, 1));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaDStream<Integer> reversed = pairStream.map(in -> in._2());
+ JavaTestUtils.attachTestOutputStream(reversed);
+ List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
+ List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Integer>("hi", 1),
+ new Tuple2<String, Integer>("ho", 2)),
+ Arrays.asList(
+ new Tuple2<String, Integer>("hi", 1),
+ new Tuple2<String, Integer>("ho", 2)));
+
+ List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, String>(1, "h"),
+ new Tuple2<Integer, String>(1, "i"),
+ new Tuple2<Integer, String>(2, "h"),
+ new Tuple2<Integer, String>(2, "o")),
+ Arrays.asList(
+ new Tuple2<Integer, String>(1, "h"),
+ new Tuple2<Integer, String>(1, "i"),
+ new Tuple2<Integer, String>(2, "h"),
+ new Tuple2<Integer, String>(2, "o")));
+
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(in -> {
+ List<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
+ for (Character s : in._1().toCharArray()) {
+ out.add(new Tuple2<Integer, String>(in._2(), s.toString()));
+ }
+ return out;
+ });
+
+ JavaTestUtils.attachTestOutputStream(flatMapped);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairReduceByKey() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 4),
+ new Tuple2<String, Integer>("new york", 5)),
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 10),
+ new Tuple2<String, Integer>("new york", 4)));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey((x, y) -> x + y);
+
+ JavaTestUtils.attachTestOutputStream(reduced);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testCombineByKey() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 4),
+ new Tuple2<String, Integer>("new york", 5)),
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 10),
+ new Tuple2<String, Integer>("new york", 4)));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(i -> i,
+ (x, y) -> x + y, (x, y) -> x + y, new HashPartitioner(2));
+
+ JavaTestUtils.attachTestOutputStream(combined);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testReduceByKeyAndWindow() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, Integer>("california", 4),
+ new Tuple2<String, Integer>("new york", 5)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 14),
+ new Tuple2<String, Integer>("new york", 9)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 10),
+ new Tuple2<String, Integer>("new york", 4)));
+
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> reduceWindowed =
+ pairStream.reduceByKeyAndWindow((x, y) -> x + y, new Duration(2000), new Duration(1000));
+ JavaTestUtils.attachTestOutputStream(reduceWindowed);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testUpdateStateByKey() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, Integer>("california", 4),
+ new Tuple2<String, Integer>("new york", 5)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 14),
+ new Tuple2<String, Integer>("new york", 9)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 14),
+ new Tuple2<String, Integer>("new york", 9)));
+
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey((values, state) -> {
+ int out = 0;
+ if (state.isPresent()) {
+ out = out + state.get();
+ }
+ for (Integer v : values) {
+ out = out + v;
+ }
+ return Optional.of(out);
+ });
+
+ JavaTestUtils.attachTestOutputStream(updated);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testReduceByKeyAndWindowWithInverse() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, Integer>("california", 4),
+ new Tuple2<String, Integer>("new york", 5)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 14),
+ new Tuple2<String, Integer>("new york", 9)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 10),
+ new Tuple2<String, Integer>("new york", 4)));
+
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> reduceWindowed =
+ pairStream.reduceByKeyAndWindow((x, y) -> x + y, (x, y) -> x - y, new Duration(2000),
+ new Duration(1000));
+ JavaTestUtils.attachTestOutputStream(reduceWindowed);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairTransform() {
+ List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(1, 5),
+ new Tuple2<Integer, Integer>(4, 5),
+ new Tuple2<Integer, Integer>(2, 5)),
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(2, 5),
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(4, 5),
+ new Tuple2<Integer, Integer>(1, 5)));
+
+ List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(1, 5),
+ new Tuple2<Integer, Integer>(2, 5),
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(4, 5)),
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(1, 5),
+ new Tuple2<Integer, Integer>(2, 5),
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(4, 5)));
+
+ JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(in -> in.sortByKey());
+
+ JavaTestUtils.attachTestOutputStream(sorted);
+ List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairToNormalRDDTransform() {
+ List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(1, 5),
+ new Tuple2<Integer, Integer>(4, 5),
+ new Tuple2<Integer, Integer>(2, 5)),
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(2, 5),
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(4, 5),
+ new Tuple2<Integer, Integer>(1, 5)));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(3, 1, 4, 2),
+ Arrays.asList(2, 3, 4, 1));
+
+ JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaDStream<Integer> firstParts = pairStream.transform(in -> in.map(x -> x._1()));
+ JavaTestUtils.attachTestOutputStream(firstParts);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testMapValues() {
+ List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+ List<List<Tuple2<String, String>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "DODGERS"),
+ new Tuple2<String, String>("california", "GIANTS"),
+ new Tuple2<String, String>("new york", "YANKEES"),
+ new Tuple2<String, String>("new york", "METS")),
+ Arrays.asList(new Tuple2<String, String>("california", "SHARKS"),
+ new Tuple2<String, String>("california", "DUCKS"),
+ new Tuple2<String, String>("new york", "RANGERS"),
+ new Tuple2<String, String>("new york", "ISLANDERS")));
+
+ JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, String> mapped = pairStream.mapValues(s -> s.toUpperCase());
+ JavaTestUtils.attachTestOutputStream(mapped);
+ List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testFlatMapValues() {
+ List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+ List<List<Tuple2<String, String>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "dodgers1"),
+ new Tuple2<String, String>("california", "dodgers2"),
+ new Tuple2<String, String>("california", "giants1"),
+ new Tuple2<String, String>("california", "giants2"),
+ new Tuple2<String, String>("new york", "yankees1"),
+ new Tuple2<String, String>("new york", "yankees2"),
+ new Tuple2<String, String>("new york", "mets1"),
+ new Tuple2<String, String>("new york", "mets2")),
+ Arrays.asList(new Tuple2<String, String>("california", "sharks1"),
+ new Tuple2<String, String>("california", "sharks2"),
+ new Tuple2<String, String>("california", "ducks1"),
+ new Tuple2<String, String>("california", "ducks2"),
+ new Tuple2<String, String>("new york", "rangers1"),
+ new Tuple2<String, String>("new york", "rangers2"),
+ new Tuple2<String, String>("new york", "islanders1"),
+ new Tuple2<String, String>("new york", "islanders2")));
+
+ JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+
+ JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues(in -> {
+ List<String> out = new ArrayList<String>();
+ out.add(in + "1");
+ out.add(in + "2");
+ return out;
+ });
+ JavaTestUtils.attachTestOutputStream(flatMapped);
+ List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+ Assert.assertEquals(expected, result);
+ }
+
+}
diff --git a/extras/java8-tests/src/test/resources/log4j.properties b/extras/java8-tests/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..180beaa8cc
--- /dev/null
+++ b/extras/java8-tests/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+org.eclipse.jetty.LEVEL=WARN
diff --git a/pom.xml b/pom.xml
index 7e28d7c194..c59fada5cd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -711,6 +711,31 @@
</modules>
</profile>
+ <profile>
+ <id>java8-tests</id>
+ <build>
+ <plugins>
+ <!-- Needed for publishing test jars as it is needed by java8-tests -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <modules>
+ <module>extras/java8-tests</module>
+ </modules>
+
+ </profile>
<profile>
<id>yarn</id>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index d45f6773fa..aa17848975 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -90,6 +90,14 @@ object SparkBuild extends Build {
}
lazy val hadoopClient = if (hadoopVersion.startsWith("0.20.") || hadoopVersion == "1.0.0") "hadoop-core" else "hadoop-client"
val maybeAvro = if (hadoopVersion.startsWith("0.23.") && isYarnEnabled) Seq("org.apache.avro" % "avro" % "1.7.4") else Seq()
+
+ // Conditionally include the java 8 sub-project
+ lazy val javaVersion = System.getProperty("java.specification.version")
+ lazy val isJava8Enabled = javaVersion.toDouble >= "1.8".toDouble
+ val maybeJava8Tests = if (isJava8Enabled) Seq[ProjectReference](java8Tests) else Seq[ProjectReference]()
+ lazy val java8Tests = Project("java8-tests", file("extras/java8-tests"), settings = java8TestsSettings).
+ dependsOn(core) dependsOn(streaming % "compile->compile;test->test")
+
// Conditionally include the yarn sub-project
lazy val yarnAlpha = Project("yarn-alpha", file("yarn/alpha"), settings = yarnAlphaSettings) dependsOn(core)
lazy val yarn = Project("yarn", file("yarn/stable"), settings = yarnSettings) dependsOn(core)
@@ -118,10 +126,11 @@ object SparkBuild extends Build {
lazy val examples = Project("examples", file("examples"), settings = examplesSettings)
.dependsOn(core, mllib, graphx, bagel, streaming, externalTwitter) dependsOn(allExternal: _*)
- // Everything except assembly, tools and examples belong to packageProjects
+ // Everything except assembly, tools, java8Tests and examples belong to packageProjects
lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib, graphx) ++ maybeYarnRef
- lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](examples, tools, assemblyProj)
+ lazy val allProjects = packageProjects ++ allExternalRefs ++
+ Seq[ProjectReference](examples, tools, assemblyProj) ++ maybeJava8Tests
def sharedSettings = Defaults.defaultSettings ++ Seq(
organization := "org.apache.spark",
@@ -132,6 +141,7 @@ object SparkBuild extends Build {
javacOptions := Seq("-target", JAVAC_JVM_VERSION, "-source", JAVAC_JVM_VERSION),
unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath },
retrieveManaged := true,
+ javaHome := Properties.envOrNone("JAVA_HOME").map(file),
// This is to add convenience of enabling sbt -Dsbt.offline=true for making the build offline.
offline := "true".equalsIgnoreCase(sys.props("sbt.offline")),
retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]",
@@ -370,6 +380,12 @@ object SparkBuild extends Build {
name := "spark-yarn"
)
+ def java8TestsSettings = sharedSettings ++ Seq(
+ name := "java8-tests",
+ javacOptions := Seq("-target", "1.8", "-source", "1.8"),
+ testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a")
+ )
+
// Conditionally include the YARN dependencies because some tools look at all sub-projects and will complain
// if we refer to nonexistent dependencies (e.g. hadoop-yarn-api from a Hadoop version without YARN).
def extraYarnSettings = if(isYarnEnabled) yarnEnabledSettings else Seq()
diff --git a/sbt/sbt-launch-lib.bash b/sbt/sbt-launch-lib.bash
index d65bbdc19c..00a6b41013 100755
--- a/sbt/sbt-launch-lib.bash
+++ b/sbt/sbt-launch-lib.bash
@@ -16,7 +16,14 @@ declare -a residual_args
declare -a java_args
declare -a scalac_args
declare -a sbt_commands
-declare java_cmd=java
+
+if test -x "$JAVA_HOME/bin/java"; then
+ echo -e "Using $JAVA_HOME as default JAVA_HOME."
+ echo "Note, this will be overridden by -java-home if it is set."
+ declare java_cmd="$JAVA_HOME/bin/java"
+else
+ declare java_cmd=java
+fi
echoerr () {
echo 1>&2 "$@"
@@ -131,7 +138,7 @@ process_args () {
-sbt-jar) require_arg path "$1" "$2" && sbt_jar="$2" && shift 2 ;;
-sbt-version) require_arg version "$1" "$2" && sbt_version="$2" && shift 2 ;;
- -java-home) require_arg path "$1" "$2" && java_cmd="$2/bin/java" && shift 2 ;;
+ -java-home) require_arg path "$1" "$2" && java_cmd="$2/bin/java" && export JAVA_HOME=$2 && shift 2 ;;
-D*) addJava "$1" && shift ;;
-J*) addJava "${1:2}" && shift ;;
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
index e23b725052..721d502732 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
@@ -41,7 +41,7 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classTag: ClassTag[T]
/** Return a new DStream containing only the elements that satisfy a predicate. */
def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] =
- dstream.filter((x => f(x).booleanValue()))
+ dstream.filter((x => f.call(x).booleanValue()))
/** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def cache(): JavaDStream[T] = dstream.cache()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 7aa7ead29b..a85cd04c93 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -17,19 +17,20 @@
package org.apache.spark.streaming.api.java
-import java.util.{List => JList}
+import java.util
import java.lang.{Long => JLong}
+import java.util.{List => JList}
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
-import org.apache.spark.streaming._
-import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaRDD}
-import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
-import org.apache.spark.api.java.function.{Function3 => JFunction3, _}
-import java.util
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaRDDLike}
+import org.apache.spark.api.java.JavaPairRDD._
+import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, Function3 => JFunction3, _}
import org.apache.spark.rdd.RDD
-import JavaDStream._
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.api.java.JavaDStream._
import org.apache.spark.streaming.dstream.DStream
trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]
@@ -123,23 +124,23 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* this DStream. Applying glom() to an RDD coalesces all elements within each partition into
* an array.
*/
- def glom(): JavaDStream[JList[T]] = {
+ def glom(): JavaDStream[JList[T]] =
new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq)))
- }
+
/** Return the [[org.apache.spark.streaming.StreamingContext]] associated with this DStream */
- def context(): StreamingContext = dstream.context()
+ def context(): StreamingContext = dstream.context
/** Return a new DStream by applying a function to all elements of this DStream. */
def map[R](f: JFunction[T, R]): JavaDStream[R] = {
- new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType())
+ new JavaDStream(dstream.map(f)(fakeClassTag))(fakeClassTag)
}
/** Return a new DStream by applying a function to all elements of this DStream. */
- def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
- def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
- new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
+ def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
+ def cm: ClassTag[(K2, V2)] = fakeClassTag
+ new JavaPairDStream(dstream.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}
/**
@@ -148,19 +149,19 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
*/
def flatMap[U](f: FlatMapFunction[T, U]): JavaDStream[U] = {
import scala.collection.JavaConverters._
- def fn = (x: T) => f.apply(x).asScala
- new JavaDStream(dstream.flatMap(fn)(f.elementType()))(f.elementType())
+ def fn = (x: T) => f.call(x).asScala
+ new JavaDStream(dstream.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
/**
* Return a new DStream by applying a function to all elements of this DStream,
* and then flattening the results
*/
- def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
+ def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
import scala.collection.JavaConverters._
- def fn = (x: T) => f.apply(x).asScala
- def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
- new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType())
+ def fn = (x: T) => f.call(x).asScala
+ def cm: ClassTag[(K2, V2)] = fakeClassTag
+ new JavaPairDStream(dstream.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}
/**
@@ -169,8 +170,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* of the RDD.
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
- new JavaDStream(dstream.mapPartitions(fn)(f.elementType()))(f.elementType())
+ def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ new JavaDStream(dstream.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
/**
@@ -178,10 +179,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
* of the RDD.
*/
- def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2])
+ def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2])
: JavaPairDStream[K2, V2] = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
- new JavaPairDStream(dstream.mapPartitions(fn))(f.keyType(), f.valueType())
+ def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ new JavaPairDStream(dstream.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
}
/**
@@ -283,8 +284,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* on each RDD of 'this' DStream.
*/
def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = {
- implicit val cm: ClassTag[U] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
+ implicit val cm: ClassTag[U] = fakeClassTag
+
def scalaTransform (in: RDD[T]): RDD[U] =
transformFunc.call(wrapRDD(in)).rdd
dstream.transform(scalaTransform(_))
@@ -295,8 +296,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* on each RDD of 'this' DStream.
*/
def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = {
- implicit val cm: ClassTag[U] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
+ implicit val cm: ClassTag[U] = fakeClassTag
+
def scalaTransform (in: RDD[T], time: Time): RDD[U] =
transformFunc.call(wrapRDD(in), time).rdd
dstream.transform(scalaTransform(_, _))
@@ -306,12 +307,11 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream.
*/
- def transform[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]):
+ def transformToPair[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]):
JavaPairDStream[K2, V2] = {
- implicit val cmk: ClassTag[K2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
- implicit val cmv: ClassTag[V2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
+ implicit val cmk: ClassTag[K2] = fakeClassTag
+ implicit val cmv: ClassTag[V2] = fakeClassTag
+
def scalaTransform (in: RDD[T]): RDD[(K2, V2)] =
transformFunc.call(wrapRDD(in)).rdd
dstream.transform(scalaTransform(_))
@@ -321,12 +321,11 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream.
*/
- def transform[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]):
+ def transformToPair[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]):
JavaPairDStream[K2, V2] = {
- implicit val cmk: ClassTag[K2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
- implicit val cmv: ClassTag[V2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
+ implicit val cmk: ClassTag[K2] = fakeClassTag
+ implicit val cmv: ClassTag[V2] = fakeClassTag
+
def scalaTransform (in: RDD[T], time: Time): RDD[(K2, V2)] =
transformFunc.call(wrapRDD(in), time).rdd
dstream.transform(scalaTransform(_, _))
@@ -340,10 +339,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
other: JavaDStream[U],
transformFunc: JFunction3[R, JavaRDD[U], Time, JavaRDD[W]]
): JavaDStream[W] = {
- implicit val cmu: ClassTag[U] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
- implicit val cmv: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cmu: ClassTag[U] = fakeClassTag
+ implicit val cmv: ClassTag[W] = fakeClassTag
+
def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[W] =
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
dstream.transformWith[U, W](other.dstream, scalaTransform(_, _, _))
@@ -353,16 +351,13 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream and 'other' DStream.
*/
- def transformWith[U, K2, V2](
+ def transformWithToPair[U, K2, V2](
other: JavaDStream[U],
transformFunc: JFunction3[R, JavaRDD[U], Time, JavaPairRDD[K2, V2]]
): JavaPairDStream[K2, V2] = {
- implicit val cmu: ClassTag[U] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
- implicit val cmk2: ClassTag[K2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
- implicit val cmv2: ClassTag[V2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
+ implicit val cmu: ClassTag[U] = fakeClassTag
+ implicit val cmk2: ClassTag[K2] = fakeClassTag
+ implicit val cmv2: ClassTag[V2] = fakeClassTag
def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[(K2, V2)] =
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
dstream.transformWith[U, (K2, V2)](other.dstream, scalaTransform(_, _, _))
@@ -376,12 +371,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
other: JavaPairDStream[K2, V2],
transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaRDD[W]]
): JavaDStream[W] = {
- implicit val cmk2: ClassTag[K2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
- implicit val cmv2: ClassTag[V2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
- implicit val cmw: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cmk2: ClassTag[K2] = fakeClassTag
+ implicit val cmv2: ClassTag[V2] = fakeClassTag
+ implicit val cmw: ClassTag[W] = fakeClassTag
+
def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[W] =
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
dstream.transformWith[(K2, V2), W](other.dstream, scalaTransform(_, _, _))
@@ -391,18 +384,14 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream and 'other' DStream.
*/
- def transformWith[K2, V2, K3, V3](
+ def transformWithToPair[K2, V2, K3, V3](
other: JavaPairDStream[K2, V2],
transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaPairRDD[K3, V3]]
): JavaPairDStream[K3, V3] = {
- implicit val cmk2: ClassTag[K2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
- implicit val cmv2: ClassTag[V2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
- implicit val cmk3: ClassTag[K3] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K3]]
- implicit val cmv3: ClassTag[V3] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V3]]
+ implicit val cmk2: ClassTag[K2] = fakeClassTag
+ implicit val cmv2: ClassTag[V2] = fakeClassTag
+ implicit val cmk3: ClassTag[K3] = fakeClassTag
+ implicit val cmv3: ClassTag[V3] = fakeClassTag
def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[(K3, V3)] =
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
dstream.transformWith[(K2, V2), (K3, V3)](other.dstream, scalaTransform(_, _, _))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 2c7ff87744..ac451d1913 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -17,24 +17,25 @@
package org.apache.spark.streaming.api.java
-import java.util.{List => JList}
import java.lang.{Long => JLong}
+import java.util.{List => JList}
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, Function3 => JFunction3}
-import org.apache.spark.Partitioner
+import com.google.common.base.Optional
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.api.java.{JavaUtils, JavaRDD, JavaPairRDD}
-import org.apache.spark.storage.StorageLevel
-import com.google.common.base.Optional
+import org.apache.spark.Partitioner
+import org.apache.spark.api.java.{JavaPairRDD, JavaUtils}
+import org.apache.spark.api.java.JavaPairRDD._
+import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.PairRDDFunctions
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
/**
@@ -54,7 +55,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/** Return a new DStream containing only the elements that satisfy a predicate. */
def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairDStream[K, V] =
- dstream.filter((x => f(x).booleanValue()))
+ dstream.filter((x => f.call(x).booleanValue()))
/** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def cache(): JavaPairDStream[K, V] = dstream.cache()
@@ -168,8 +169,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
mergeCombiners: JFunction2[C, C, C],
partitioner: Partitioner
): JavaPairDStream[K, C] = {
- implicit val cm: ClassTag[C] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
+ implicit val cm: ClassTag[C] = fakeClassTag
dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)
}
@@ -184,8 +184,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
partitioner: Partitioner,
mapSideCombine: Boolean
): JavaPairDStream[K, C] = {
- implicit val cm: ClassTag[C] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
+ implicit val cm: ClassTag[C] = fakeClassTag
dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine)
}
@@ -279,7 +278,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* DStream's batching interval
*/
def reduceByKeyAndWindow(
- reduceFunc: Function2[V, V, V],
+ reduceFunc: JFunction2[V, V, V],
windowDuration: Duration,
slideDuration: Duration
):JavaPairDStream[K, V] = {
@@ -299,7 +298,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param numPartitions Number of partitions of each RDD in the new DStream.
*/
def reduceByKeyAndWindow(
- reduceFunc: Function2[V, V, V],
+ reduceFunc: JFunction2[V, V, V],
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int
@@ -320,7 +319,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* DStream.
*/
def reduceByKeyAndWindow(
- reduceFunc: Function2[V, V, V],
+ reduceFunc: JFunction2[V, V, V],
windowDuration: Duration,
slideDuration: Duration,
partitioner: Partitioner
@@ -345,8 +344,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* DStream's batching interval
*/
def reduceByKeyAndWindow(
- reduceFunc: Function2[V, V, V],
- invReduceFunc: Function2[V, V, V],
+ reduceFunc: JFunction2[V, V, V],
+ invReduceFunc: JFunction2[V, V, V],
windowDuration: Duration,
slideDuration: Duration
): JavaPairDStream[K, V] = {
@@ -374,8 +373,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* set this to null if you do not want to filter
*/
def reduceByKeyAndWindow(
- reduceFunc: Function2[V, V, V],
- invReduceFunc: Function2[V, V, V],
+ reduceFunc: JFunction2[V, V, V],
+ invReduceFunc: JFunction2[V, V, V],
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int,
@@ -412,8 +411,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* set this to null if you do not want to filter
*/
def reduceByKeyAndWindow(
- reduceFunc: Function2[V, V, V],
- invReduceFunc: Function2[V, V, V],
+ reduceFunc: JFunction2[V, V, V],
+ invReduceFunc: JFunction2[V, V, V],
windowDuration: Duration,
slideDuration: Duration,
partitioner: Partitioner,
@@ -453,8 +452,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
*/
def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], Optional[S]])
: JavaPairDStream[K, S] = {
- implicit val cm: ClassTag[S] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
+ implicit val cm: ClassTag[S] = fakeClassTag
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc))
}
@@ -471,8 +469,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
numPartitions: Int)
: JavaPairDStream[K, S] = {
- implicit val cm: ClassTag[S] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
+ implicit val cm: ClassTag[S] = fakeClassTag
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), numPartitions)
}
@@ -490,8 +487,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
partitioner: Partitioner
): JavaPairDStream[K, S] = {
- implicit val cm: ClassTag[S] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
+ implicit val cm: ClassTag[S] = fakeClassTag
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), partitioner)
}
@@ -501,8 +497,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* 'this' DStream without changing the key.
*/
def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = {
- implicit val cm: ClassTag[U] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
+ implicit val cm: ClassTag[U] = fakeClassTag
dstream.mapValues(f)
}
@@ -524,8 +519,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* of partitions.
*/
def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}
@@ -537,8 +531,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
numPartitions: Int
): JavaPairDStream[K, (JList[V], JList[W])] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
dstream.cogroup(other.dstream, numPartitions)
.mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}
@@ -551,8 +544,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
partitioner: Partitioner
): JavaPairDStream[K, (JList[V], JList[W])] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
dstream.cogroup(other.dstream, partitioner)
.mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}
@@ -562,8 +554,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
*/
def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
dstream.join(other.dstream)
}
@@ -572,8 +563,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
*/
def join[W](other: JavaPairDStream[K, W], numPartitions: Int): JavaPairDStream[K, (V, W)] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
dstream.join(other.dstream, numPartitions)
}
@@ -585,8 +575,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
partitioner: Partitioner
): JavaPairDStream[K, (V, W)] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
dstream.join(other.dstream, partitioner)
}
@@ -596,8 +585,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* number of partitions.
*/
def leftOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, Optional[W])] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
val joinResult = dstream.leftOuterJoin(other.dstream)
joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
}
@@ -611,8 +599,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
numPartitions: Int
): JavaPairDStream[K, (V, Optional[W])] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
val joinResult = dstream.leftOuterJoin(other.dstream, numPartitions)
joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
}
@@ -625,8 +612,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
partitioner: Partitioner
): JavaPairDStream[K, (V, Optional[W])] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
val joinResult = dstream.leftOuterJoin(other.dstream, partitioner)
joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
}
@@ -652,8 +638,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
numPartitions: Int
): JavaPairDStream[K, (Optional[V], W)] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
val joinResult = dstream.rightOuterJoin(other.dstream, numPartitions)
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
}
@@ -667,8 +652,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
partitioner: Partitioner
): JavaPairDStream[K, (Optional[V], W)] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
val joinResult = dstream.rightOuterJoin(other.dstream, partitioner)
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
}
@@ -748,8 +732,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
new JavaDStream[(K, V)](dstream)
}
- override val classTag: ClassTag[(K, V)] =
- implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K, V]]]
+ override val classTag: ClassTag[(K, V)] = fakeClassTag
}
object JavaPairDStream {
@@ -758,10 +741,8 @@ object JavaPairDStream {
}
def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = {
- implicit val cmk: ClassTag[K] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
- implicit val cmv: ClassTag[V] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+ implicit val cmk: ClassTag[K] = fakeClassTag
+ implicit val cmv: ClassTag[V] = fakeClassTag
new JavaPairDStream[K, V](dstream.dstream)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index b082bb0585..c48d754e43 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -187,7 +187,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
converter: JFunction[InputStream, java.lang.Iterable[T]],
storageLevel: StorageLevel)
: JavaDStream[T] = {
- def fn = (x: InputStream) => converter.apply(x).toIterator
+ def fn = (x: InputStream) => converter.call(x).toIterator
implicit val cmt: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.socketStream(hostname, port, fn, storageLevel)
@@ -431,7 +431,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* In the transform function, convert the JavaRDD corresponding to that JavaDStream to
* a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD().
*/
- def transform[K, V](
+ def transformToPair[K, V](
dstreams: JList[JavaDStream[_]],
transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaPairRDD[K, V]]
): JavaPairDStream[K, V] = {
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 54a0791d04..e93bf18b6d 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -247,14 +247,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
- private class IntegerSum extends Function2<Integer, Integer, Integer> {
+ private class IntegerSum implements Function2<Integer, Integer, Integer> {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
}
- private class IntegerDifference extends Function2<Integer, Integer, Integer> {
+ private class IntegerDifference implements Function2<Integer, Integer, Integer> {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 - i2;
@@ -392,7 +392,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
);
- JavaPairDStream<String, Integer> transformed3 = stream.transform(
+ JavaPairDStream<String, Integer> transformed3 = stream.transformToPair(
new Function<JavaRDD<Integer>, JavaPairRDD<String, Integer>>() {
@Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) throws Exception {
return null;
@@ -400,7 +400,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
);
- JavaPairDStream<String, Integer> transformed4 = stream.transform(
+ JavaPairDStream<String, Integer> transformed4 = stream.transformToPair(
new Function2<JavaRDD<Integer>, Time, JavaPairRDD<String, Integer>>() {
@Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) throws Exception {
return null;
@@ -424,7 +424,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
);
- JavaPairDStream<String, String> pairTransformed3 = pairStream.transform(
+ JavaPairDStream<String, String> pairTransformed3 = pairStream.transformToPair(
new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, String>>() {
@Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) throws Exception {
return null;
@@ -432,7 +432,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
);
- JavaPairDStream<String, String> pairTransformed4 = pairStream.transform(
+ JavaPairDStream<String, String> pairTransformed4 = pairStream.transformToPair(
new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, String>>() {
@Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in, Time time) throws Exception {
return null;
@@ -482,7 +482,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
ssc, stringStringKVStream2, 1);
JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
- JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWith(
+ JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWithToPair(
pairStream2,
new Function3<
JavaPairRDD<String, String>,
@@ -551,7 +551,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
);
- JavaPairDStream<Double, Double> transformed3 = stream1.transformWith(
+ JavaPairDStream<Double, Double> transformed3 = stream1.transformWithToPair(
stream2,
new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() {
@Override
@@ -561,7 +561,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
);
- JavaPairDStream<Double, Double> transformed4 = stream1.transformWith(
+ JavaPairDStream<Double, Double> transformed4 = stream1.transformWithToPair(
pairStream1,
new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaPairRDD<Double, Double>>() {
@Override
@@ -591,7 +591,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
);
- JavaPairDStream<Double, Double> pairTransformed3 = pairStream1.transformWith(
+ JavaPairDStream<Double, Double> pairTransformed3 = pairStream1.transformWithToPair(
stream2,
new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() {
@Override
@@ -601,7 +601,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
);
- JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWith(
+ JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWithToPair(
pairStream2,
new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, Character>, Time, JavaPairRDD<Double, Double>>() {
@Override
@@ -656,7 +656,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<JavaDStream<?>> listOfDStreams2 =
Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream());
- JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transform(
+ JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair(
listOfDStreams2,
new Function2<List<JavaRDD<?>>, Time, JavaPairRDD<Integer, Tuple2<Integer, String>>>() {
public JavaPairRDD<Integer, Tuple2<Integer, String>> call(List<JavaRDD<?>> listOfRDDs, Time time) {
@@ -671,7 +671,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
return new Tuple2<Integer, Integer>(i, i);
}
};
- return rdd1.union(rdd2).map(mapToTuple).join(prdd3);
+ return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
}
}
);
@@ -742,17 +742,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
new Tuple2<Integer, String>(9, "s")));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<Integer,String> flatMapped = stream.flatMap(
- new PairFlatMapFunction<String, Integer, String>() {
- @Override
- public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
- List<Tuple2<Integer, String>> out = Lists.newArrayList();
- for (String letter: in.split("(?!^)")) {
- out.add(new Tuple2<Integer, String>(in.length(), letter));
- }
- return out;
+ JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(
+ new PairFlatMapFunction<String, Integer, String>() {
+ @Override
+ public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
+ List<Tuple2<Integer, String>> out = Lists.newArrayList();
+ for (String letter: in.split("(?!^)")) {
+ out.add(new Tuple2<Integer, String>(in.length(), letter));
}
- });
+ return out;
+ }
+ });
JavaTestUtils.attachTestOutputStream(flatMapped);
List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -816,7 +816,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList(new Tuple2<String, Integer>("yankees", 7)));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = stream.map(
+ JavaPairDStream<String, Integer> pairStream = stream.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String in) throws Exception {
@@ -880,7 +880,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<Integer, String> reversed = pairStream.map(
+ JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(
new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> in) throws Exception {
@@ -913,7 +913,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<Integer, String> reversed = pairStream.mapPartitions(
+ JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(
new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, String>() {
@Override
public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) throws Exception {
@@ -983,7 +983,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Tuple2<String, Integer>> stream =
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<Integer, String> flatMapped = pairStream.flatMap(
+ JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(
new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) throws Exception {
@@ -1228,7 +1228,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> reduceWindowed =
- pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), new Duration(2000), new Duration(1000));
+ pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(),
+ new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(reduceWindowed);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1300,7 +1301,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
ssc, inputData, 1);
JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<Integer, Integer> sorted = pairStream.transform(
+ JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(
new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, Integer>>() {
@Override
public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception {
@@ -1632,7 +1633,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@Test
public void testSocketString() {
- class Converter extends Function<InputStream, Iterable<String>> {
+
+ class Converter implements Function<InputStream, Iterable<String>> {
public Iterable<String> call(InputStream in) throws IOException {
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
List<String> out = new ArrayList<String>();