diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2014-03-03 22:31:30 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-03-03 22:31:30 -0800 |
commit | 181ec5030792a10f3ce77e997d0e2eda9bcd6139 (patch) | |
tree | 9b88504e5a3eca8177e4ebe1257ea9ce56120c13 /streaming | |
parent | b14ede789abfabe25144385e8dc2fb96691aba81 (diff) | |
download | spark-181ec5030792a10f3ce77e997d0e2eda9bcd6139.tar.gz spark-181ec5030792a10f3ce77e997d0e2eda9bcd6139.tar.bz2 spark-181ec5030792a10f3ce77e997d0e2eda9bcd6139.zip |
[java8API] SPARK-964 Investigate the potential for using JDK 8 lambda expressions for the Java/Scala APIs
Author: Prashant Sharma <prashant.s@imaginea.com>
Author: Patrick Wendell <pwendell@gmail.com>
Closes #17 from ScrapCodes/java8-lambdas and squashes the following commits:
95850e6 [Patrick Wendell] Some doc improvements and build changes to the Java 8 patch.
85a954e [Prashant Sharma] Nit. import orderings.
673f7ac [Prashant Sharma] Added support for -java-home as well
80a13e8 [Prashant Sharma] Used fake class tag syntax
26eb3f6 [Prashant Sharma] Patrick's comments on PR.
35d8d79 [Prashant Sharma] Specified java 8 building in the docs
31d4cd6 [Prashant Sharma] Maven build to support -Pjava8-tests flag.
4ab87d3 [Prashant Sharma] Review feedback on the pr
c33dc2c [Prashant Sharma] SPARK-964, Java 8 API Support.
Diffstat (limited to 'streaming')
5 files changed, 130 insertions, 158 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala index e23b725052..721d502732 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala @@ -41,7 +41,7 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classTag: ClassTag[T] /** Return a new DStream containing only the elements that satisfy a predicate. */ def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] = - dstream.filter((x => f(x).booleanValue())) + dstream.filter((x => f.call(x).booleanValue())) /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ def cache(): JavaDStream[T] = dstream.cache() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index 7aa7ead29b..a85cd04c93 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -17,19 +17,20 @@ package org.apache.spark.streaming.api.java -import java.util.{List => JList} +import java.util import java.lang.{Long => JLong} +import java.util.{List => JList} import scala.collection.JavaConversions._ import scala.reflect.ClassTag -import org.apache.spark.streaming._ -import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaRDD} -import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} -import org.apache.spark.api.java.function.{Function3 => JFunction3, _} -import java.util +import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaRDDLike} +import org.apache.spark.api.java.JavaPairRDD._ +import org.apache.spark.api.java.JavaSparkContext.fakeClassTag +import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, Function3 => JFunction3, _} import org.apache.spark.rdd.RDD -import JavaDStream._ +import org.apache.spark.streaming._ +import org.apache.spark.streaming.api.java.JavaDStream._ import org.apache.spark.streaming.dstream.DStream trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]] @@ -123,23 +124,23 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * this DStream. Applying glom() to an RDD coalesces all elements within each partition into * an array. */ - def glom(): JavaDStream[JList[T]] = { + def glom(): JavaDStream[JList[T]] = new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq))) - } + /** Return the [[org.apache.spark.streaming.StreamingContext]] associated with this DStream */ - def context(): StreamingContext = dstream.context() + def context(): StreamingContext = dstream.context /** Return a new DStream by applying a function to all elements of this DStream. */ def map[R](f: JFunction[T, R]): JavaDStream[R] = { - new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType()) + new JavaDStream(dstream.map(f)(fakeClassTag))(fakeClassTag) } /** Return a new DStream by applying a function to all elements of this DStream. */ - def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = { - def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] - new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType()) + def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = { + def cm: ClassTag[(K2, V2)] = fakeClassTag + new JavaPairDStream(dstream.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2]) } /** @@ -148,19 +149,19 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T */ def flatMap[U](f: FlatMapFunction[T, U]): JavaDStream[U] = { import scala.collection.JavaConverters._ - def fn = (x: T) => f.apply(x).asScala - new JavaDStream(dstream.flatMap(fn)(f.elementType()))(f.elementType()) + def fn = (x: T) => f.call(x).asScala + new JavaDStream(dstream.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U]) } /** * Return a new DStream by applying a function to all elements of this DStream, * and then flattening the results */ - def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = { + def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = { import scala.collection.JavaConverters._ - def fn = (x: T) => f.apply(x).asScala - def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]] - new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType()) + def fn = (x: T) => f.call(x).asScala + def cm: ClassTag[(K2, V2)] = fakeClassTag + new JavaPairDStream(dstream.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2]) } /** @@ -169,8 +170,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * of the RDD. */ def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = { - def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) - new JavaDStream(dstream.mapPartitions(fn)(f.elementType()))(f.elementType()) + def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator()) + new JavaDStream(dstream.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U]) } /** @@ -178,10 +179,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition * of the RDD. */ - def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]) + def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]) : JavaPairDStream[K2, V2] = { - def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) - new JavaPairDStream(dstream.mapPartitions(fn))(f.keyType(), f.valueType()) + def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator()) + new JavaPairDStream(dstream.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2]) } /** @@ -283,8 +284,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * on each RDD of 'this' DStream. */ def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = { - implicit val cm: ClassTag[U] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]] + implicit val cm: ClassTag[U] = fakeClassTag + def scalaTransform (in: RDD[T]): RDD[U] = transformFunc.call(wrapRDD(in)).rdd dstream.transform(scalaTransform(_)) @@ -295,8 +296,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * on each RDD of 'this' DStream. */ def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = { - implicit val cm: ClassTag[U] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]] + implicit val cm: ClassTag[U] = fakeClassTag + def scalaTransform (in: RDD[T], time: Time): RDD[U] = transformFunc.call(wrapRDD(in), time).rdd dstream.transform(scalaTransform(_, _)) @@ -306,12 +307,11 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * Return a new DStream in which each RDD is generated by applying a function * on each RDD of 'this' DStream. */ - def transform[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]): + def transformToPair[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]): JavaPairDStream[K2, V2] = { - implicit val cmk: ClassTag[K2] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]] - implicit val cmv: ClassTag[V2] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]] + implicit val cmk: ClassTag[K2] = fakeClassTag + implicit val cmv: ClassTag[V2] = fakeClassTag + def scalaTransform (in: RDD[T]): RDD[(K2, V2)] = transformFunc.call(wrapRDD(in)).rdd dstream.transform(scalaTransform(_)) @@ -321,12 +321,11 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * Return a new DStream in which each RDD is generated by applying a function * on each RDD of 'this' DStream. */ - def transform[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]): + def transformToPair[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]): JavaPairDStream[K2, V2] = { - implicit val cmk: ClassTag[K2] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]] - implicit val cmv: ClassTag[V2] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]] + implicit val cmk: ClassTag[K2] = fakeClassTag + implicit val cmv: ClassTag[V2] = fakeClassTag + def scalaTransform (in: RDD[T], time: Time): RDD[(K2, V2)] = transformFunc.call(wrapRDD(in), time).rdd dstream.transform(scalaTransform(_, _)) @@ -340,10 +339,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T other: JavaDStream[U], transformFunc: JFunction3[R, JavaRDD[U], Time, JavaRDD[W]] ): JavaDStream[W] = { - implicit val cmu: ClassTag[U] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]] - implicit val cmv: ClassTag[W] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val cmu: ClassTag[U] = fakeClassTag + implicit val cmv: ClassTag[W] = fakeClassTag + def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[W] = transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd dstream.transformWith[U, W](other.dstream, scalaTransform(_, _, _)) @@ -353,16 +351,13 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * Return a new DStream in which each RDD is generated by applying a function * on each RDD of 'this' DStream and 'other' DStream. */ - def transformWith[U, K2, V2]( + def transformWithToPair[U, K2, V2]( other: JavaDStream[U], transformFunc: JFunction3[R, JavaRDD[U], Time, JavaPairRDD[K2, V2]] ): JavaPairDStream[K2, V2] = { - implicit val cmu: ClassTag[U] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]] - implicit val cmk2: ClassTag[K2] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]] - implicit val cmv2: ClassTag[V2] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]] + implicit val cmu: ClassTag[U] = fakeClassTag + implicit val cmk2: ClassTag[K2] = fakeClassTag + implicit val cmv2: ClassTag[V2] = fakeClassTag def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[(K2, V2)] = transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd dstream.transformWith[U, (K2, V2)](other.dstream, scalaTransform(_, _, _)) @@ -376,12 +371,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T other: JavaPairDStream[K2, V2], transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaRDD[W]] ): JavaDStream[W] = { - implicit val cmk2: ClassTag[K2] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]] - implicit val cmv2: ClassTag[V2] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]] - implicit val cmw: ClassTag[W] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val cmk2: ClassTag[K2] = fakeClassTag + implicit val cmv2: ClassTag[V2] = fakeClassTag + implicit val cmw: ClassTag[W] = fakeClassTag + def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[W] = transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd dstream.transformWith[(K2, V2), W](other.dstream, scalaTransform(_, _, _)) @@ -391,18 +384,14 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * Return a new DStream in which each RDD is generated by applying a function * on each RDD of 'this' DStream and 'other' DStream. */ - def transformWith[K2, V2, K3, V3]( + def transformWithToPair[K2, V2, K3, V3]( other: JavaPairDStream[K2, V2], transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaPairRDD[K3, V3]] ): JavaPairDStream[K3, V3] = { - implicit val cmk2: ClassTag[K2] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]] - implicit val cmv2: ClassTag[V2] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]] - implicit val cmk3: ClassTag[K3] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K3]] - implicit val cmv3: ClassTag[V3] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V3]] + implicit val cmk2: ClassTag[K2] = fakeClassTag + implicit val cmv2: ClassTag[V2] = fakeClassTag + implicit val cmk3: ClassTag[K3] = fakeClassTag + implicit val cmv3: ClassTag[V3] = fakeClassTag def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[(K3, V3)] = transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd dstream.transformWith[(K2, V2), (K3, V3)](other.dstream, scalaTransform(_, _, _)) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 2c7ff87744..ac451d1913 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -17,24 +17,25 @@ package org.apache.spark.streaming.api.java -import java.util.{List => JList} import java.lang.{Long => JLong} +import java.util.{List => JList} import scala.collection.JavaConversions._ import scala.reflect.ClassTag -import org.apache.spark.streaming._ -import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, Function3 => JFunction3} -import org.apache.spark.Partitioner +import com.google.common.base.Optional +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.{JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} -import org.apache.hadoop.conf.Configuration -import org.apache.spark.api.java.{JavaUtils, JavaRDD, JavaPairRDD} -import org.apache.spark.storage.StorageLevel -import com.google.common.base.Optional +import org.apache.spark.Partitioner +import org.apache.spark.api.java.{JavaPairRDD, JavaUtils} +import org.apache.spark.api.java.JavaPairRDD._ +import org.apache.spark.api.java.JavaSparkContext.fakeClassTag +import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} import org.apache.spark.rdd.RDD -import org.apache.spark.rdd.PairRDDFunctions +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ +import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.DStream /** @@ -54,7 +55,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** Return a new DStream containing only the elements that satisfy a predicate. */ def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairDStream[K, V] = - dstream.filter((x => f(x).booleanValue())) + dstream.filter((x => f.call(x).booleanValue())) /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ def cache(): JavaPairDStream[K, V] = dstream.cache() @@ -168,8 +169,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( mergeCombiners: JFunction2[C, C, C], partitioner: Partitioner ): JavaPairDStream[K, C] = { - implicit val cm: ClassTag[C] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]] + implicit val cm: ClassTag[C] = fakeClassTag dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) } @@ -184,8 +184,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( partitioner: Partitioner, mapSideCombine: Boolean ): JavaPairDStream[K, C] = { - implicit val cm: ClassTag[C] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]] + implicit val cm: ClassTag[C] = fakeClassTag dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine) } @@ -279,7 +278,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * DStream's batching interval */ def reduceByKeyAndWindow( - reduceFunc: Function2[V, V, V], + reduceFunc: JFunction2[V, V, V], windowDuration: Duration, slideDuration: Duration ):JavaPairDStream[K, V] = { @@ -299,7 +298,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * @param numPartitions Number of partitions of each RDD in the new DStream. */ def reduceByKeyAndWindow( - reduceFunc: Function2[V, V, V], + reduceFunc: JFunction2[V, V, V], windowDuration: Duration, slideDuration: Duration, numPartitions: Int @@ -320,7 +319,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * DStream. */ def reduceByKeyAndWindow( - reduceFunc: Function2[V, V, V], + reduceFunc: JFunction2[V, V, V], windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner @@ -345,8 +344,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * DStream's batching interval */ def reduceByKeyAndWindow( - reduceFunc: Function2[V, V, V], - invReduceFunc: Function2[V, V, V], + reduceFunc: JFunction2[V, V, V], + invReduceFunc: JFunction2[V, V, V], windowDuration: Duration, slideDuration: Duration ): JavaPairDStream[K, V] = { @@ -374,8 +373,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * set this to null if you do not want to filter */ def reduceByKeyAndWindow( - reduceFunc: Function2[V, V, V], - invReduceFunc: Function2[V, V, V], + reduceFunc: JFunction2[V, V, V], + invReduceFunc: JFunction2[V, V, V], windowDuration: Duration, slideDuration: Duration, numPartitions: Int, @@ -412,8 +411,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * set this to null if you do not want to filter */ def reduceByKeyAndWindow( - reduceFunc: Function2[V, V, V], - invReduceFunc: Function2[V, V, V], + reduceFunc: JFunction2[V, V, V], + invReduceFunc: JFunction2[V, V, V], windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner, @@ -453,8 +452,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( */ def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], Optional[S]]) : JavaPairDStream[K, S] = { - implicit val cm: ClassTag[S] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]] + implicit val cm: ClassTag[S] = fakeClassTag dstream.updateStateByKey(convertUpdateStateFunction(updateFunc)) } @@ -471,8 +469,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( updateFunc: JFunction2[JList[V], Optional[S], Optional[S]], numPartitions: Int) : JavaPairDStream[K, S] = { - implicit val cm: ClassTag[S] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]] + implicit val cm: ClassTag[S] = fakeClassTag dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), numPartitions) } @@ -490,8 +487,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( updateFunc: JFunction2[JList[V], Optional[S], Optional[S]], partitioner: Partitioner ): JavaPairDStream[K, S] = { - implicit val cm: ClassTag[S] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]] + implicit val cm: ClassTag[S] = fakeClassTag dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), partitioner) } @@ -501,8 +497,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * 'this' DStream without changing the key. */ def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = { - implicit val cm: ClassTag[U] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]] + implicit val cm: ClassTag[U] = fakeClassTag dstream.mapValues(f) } @@ -524,8 +519,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * of partitions. */ def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = { - implicit val cm: ClassTag[W] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val cm: ClassTag[W] = fakeClassTag dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) } @@ -537,8 +531,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], numPartitions: Int ): JavaPairDStream[K, (JList[V], JList[W])] = { - implicit val cm: ClassTag[W] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val cm: ClassTag[W] = fakeClassTag dstream.cogroup(other.dstream, numPartitions) .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) } @@ -551,8 +544,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], partitioner: Partitioner ): JavaPairDStream[K, (JList[V], JList[W])] = { - implicit val cm: ClassTag[W] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val cm: ClassTag[W] = fakeClassTag dstream.cogroup(other.dstream, partitioner) .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) } @@ -562,8 +554,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. */ def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = { - implicit val cm: ClassTag[W] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val cm: ClassTag[W] = fakeClassTag dstream.join(other.dstream) } @@ -572,8 +563,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. */ def join[W](other: JavaPairDStream[K, W], numPartitions: Int): JavaPairDStream[K, (V, W)] = { - implicit val cm: ClassTag[W] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val cm: ClassTag[W] = fakeClassTag dstream.join(other.dstream, numPartitions) } @@ -585,8 +575,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], partitioner: Partitioner ): JavaPairDStream[K, (V, W)] = { - implicit val cm: ClassTag[W] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val cm: ClassTag[W] = fakeClassTag dstream.join(other.dstream, partitioner) } @@ -596,8 +585,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * number of partitions. */ def leftOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, Optional[W])] = { - implicit val cm: ClassTag[W] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val cm: ClassTag[W] = fakeClassTag val joinResult = dstream.leftOuterJoin(other.dstream) joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))} } @@ -611,8 +599,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], numPartitions: Int ): JavaPairDStream[K, (V, Optional[W])] = { - implicit val cm: ClassTag[W] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val cm: ClassTag[W] = fakeClassTag val joinResult = dstream.leftOuterJoin(other.dstream, numPartitions) joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))} } @@ -625,8 +612,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], partitioner: Partitioner ): JavaPairDStream[K, (V, Optional[W])] = { - implicit val cm: ClassTag[W] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val cm: ClassTag[W] = fakeClassTag val joinResult = dstream.leftOuterJoin(other.dstream, partitioner) joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))} } @@ -652,8 +638,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], numPartitions: Int ): JavaPairDStream[K, (Optional[V], W)] = { - implicit val cm: ClassTag[W] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val cm: ClassTag[W] = fakeClassTag val joinResult = dstream.rightOuterJoin(other.dstream, numPartitions) joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)} } @@ -667,8 +652,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], partitioner: Partitioner ): JavaPairDStream[K, (Optional[V], W)] = { - implicit val cm: ClassTag[W] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val cm: ClassTag[W] = fakeClassTag val joinResult = dstream.rightOuterJoin(other.dstream, partitioner) joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)} } @@ -748,8 +732,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( new JavaDStream[(K, V)](dstream) } - override val classTag: ClassTag[(K, V)] = - implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K, V]]] + override val classTag: ClassTag[(K, V)] = fakeClassTag } object JavaPairDStream { @@ -758,10 +741,8 @@ object JavaPairDStream { } def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = { - implicit val cmk: ClassTag[K] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] - implicit val cmv: ClassTag[V] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] + implicit val cmk: ClassTag[K] = fakeClassTag + implicit val cmv: ClassTag[V] = fakeClassTag new JavaPairDStream[K, V](dstream.dstream) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index b082bb0585..c48d754e43 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -187,7 +187,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { converter: JFunction[InputStream, java.lang.Iterable[T]], storageLevel: StorageLevel) : JavaDStream[T] = { - def fn = (x: InputStream) => converter.apply(x).toIterator + def fn = (x: InputStream) => converter.call(x).toIterator implicit val cmt: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] ssc.socketStream(hostname, port, fn, storageLevel) @@ -431,7 +431,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { * In the transform function, convert the JavaRDD corresponding to that JavaDStream to * a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD(). */ - def transform[K, V]( + def transformToPair[K, V]( dstreams: JList[JavaDStream[_]], transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaPairRDD[K, V]] ): JavaPairDStream[K, V] = { diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 54a0791d04..e93bf18b6d 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -247,14 +247,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } - private class IntegerSum extends Function2<Integer, Integer, Integer> { + private class IntegerSum implements Function2<Integer, Integer, Integer> { @Override public Integer call(Integer i1, Integer i2) throws Exception { return i1 + i2; } } - private class IntegerDifference extends Function2<Integer, Integer, Integer> { + private class IntegerDifference implements Function2<Integer, Integer, Integer> { @Override public Integer call(Integer i1, Integer i2) throws Exception { return i1 - i2; @@ -392,7 +392,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } ); - JavaPairDStream<String, Integer> transformed3 = stream.transform( + JavaPairDStream<String, Integer> transformed3 = stream.transformToPair( new Function<JavaRDD<Integer>, JavaPairRDD<String, Integer>>() { @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) throws Exception { return null; @@ -400,7 +400,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } ); - JavaPairDStream<String, Integer> transformed4 = stream.transform( + JavaPairDStream<String, Integer> transformed4 = stream.transformToPair( new Function2<JavaRDD<Integer>, Time, JavaPairRDD<String, Integer>>() { @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) throws Exception { return null; @@ -424,7 +424,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } ); - JavaPairDStream<String, String> pairTransformed3 = pairStream.transform( + JavaPairDStream<String, String> pairTransformed3 = pairStream.transformToPair( new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, String>>() { @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) throws Exception { return null; @@ -432,7 +432,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } ); - JavaPairDStream<String, String> pairTransformed4 = pairStream.transform( + JavaPairDStream<String, String> pairTransformed4 = pairStream.transformToPair( new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, String>>() { @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in, Time time) throws Exception { return null; @@ -482,7 +482,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ssc, stringStringKVStream2, 1); JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2); - JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWith( + JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWithToPair( pairStream2, new Function3< JavaPairRDD<String, String>, @@ -551,7 +551,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } ); - JavaPairDStream<Double, Double> transformed3 = stream1.transformWith( + JavaPairDStream<Double, Double> transformed3 = stream1.transformWithToPair( stream2, new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() { @Override @@ -561,7 +561,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } ); - JavaPairDStream<Double, Double> transformed4 = stream1.transformWith( + JavaPairDStream<Double, Double> transformed4 = stream1.transformWithToPair( pairStream1, new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaPairRDD<Double, Double>>() { @Override @@ -591,7 +591,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } ); - JavaPairDStream<Double, Double> pairTransformed3 = pairStream1.transformWith( + JavaPairDStream<Double, Double> pairTransformed3 = pairStream1.transformWithToPair( stream2, new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() { @Override @@ -601,7 +601,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } ); - JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWith( + JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWithToPair( pairStream2, new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, Character>, Time, JavaPairRDD<Double, Double>>() { @Override @@ -656,7 +656,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<JavaDStream<?>> listOfDStreams2 = Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream()); - JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transform( + JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair( listOfDStreams2, new Function2<List<JavaRDD<?>>, Time, JavaPairRDD<Integer, Tuple2<Integer, String>>>() { public JavaPairRDD<Integer, Tuple2<Integer, String>> call(List<JavaRDD<?>> listOfRDDs, Time time) { @@ -671,7 +671,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa return new Tuple2<Integer, Integer>(i, i); } }; - return rdd1.union(rdd2).map(mapToTuple).join(prdd3); + return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3); } } ); @@ -742,17 +742,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa new Tuple2<Integer, String>(9, "s"))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<Integer,String> flatMapped = stream.flatMap( - new PairFlatMapFunction<String, Integer, String>() { - @Override - public Iterable<Tuple2<Integer, String>> call(String in) throws Exception { - List<Tuple2<Integer, String>> out = Lists.newArrayList(); - for (String letter: in.split("(?!^)")) { - out.add(new Tuple2<Integer, String>(in.length(), letter)); - } - return out; + JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair( + new PairFlatMapFunction<String, Integer, String>() { + @Override + public Iterable<Tuple2<Integer, String>> call(String in) throws Exception { + List<Tuple2<Integer, String>> out = Lists.newArrayList(); + for (String letter: in.split("(?!^)")) { + out.add(new Tuple2<Integer, String>(in.length(), letter)); } - }); + return out; + } + }); JavaTestUtils.attachTestOutputStream(flatMapped); List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -816,7 +816,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList(new Tuple2<String, Integer>("yankees", 7))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = stream.map( + JavaPairDStream<String, Integer> pairStream = stream.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String in) throws Exception { @@ -880,7 +880,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, String> reversed = pairStream.map( + JavaPairDStream<Integer, String> reversed = pairStream.mapToPair( new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> in) throws Exception { @@ -913,7 +913,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, String> reversed = pairStream.mapPartitions( + JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair( new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, String>() { @Override public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) throws Exception { @@ -983,7 +983,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, String> flatMapped = pairStream.flatMap( + JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair( new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) throws Exception { @@ -1228,7 +1228,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<String, Integer> reduceWindowed = - pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), new Duration(2000), new Duration(1000)); + pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), + new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(reduceWindowed); List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -1300,7 +1301,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ssc, inputData, 1); JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, Integer> sorted = pairStream.transform( + JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair( new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, Integer>>() { @Override public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception { @@ -1632,7 +1633,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa @Test public void testSocketString() { - class Converter extends Function<InputStream, Iterable<String>> { + + class Converter implements Function<InputStream, Iterable<String>> { public Iterable<String> call(InputStream in) throws IOException { BufferedReader reader = new BufferedReader(new InputStreamReader(in)); List<String> out = new ArrayList<String>(); |