aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2014-03-03 22:31:30 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-03-03 22:31:30 -0800
commit181ec5030792a10f3ce77e997d0e2eda9bcd6139 (patch)
tree9b88504e5a3eca8177e4ebe1257ea9ce56120c13 /streaming/src/main
parentb14ede789abfabe25144385e8dc2fb96691aba81 (diff)
downloadspark-181ec5030792a10f3ce77e997d0e2eda9bcd6139.tar.gz
spark-181ec5030792a10f3ce77e997d0e2eda9bcd6139.tar.bz2
spark-181ec5030792a10f3ce77e997d0e2eda9bcd6139.zip
[java8API] SPARK-964 Investigate the potential for using JDK 8 lambda expressions for the Java/Scala APIs
Author: Prashant Sharma <prashant.s@imaginea.com> Author: Patrick Wendell <pwendell@gmail.com> Closes #17 from ScrapCodes/java8-lambdas and squashes the following commits: 95850e6 [Patrick Wendell] Some doc improvements and build changes to the Java 8 patch. 85a954e [Prashant Sharma] Nit. import orderings. 673f7ac [Prashant Sharma] Added support for -java-home as well 80a13e8 [Prashant Sharma] Used fake class tag syntax 26eb3f6 [Prashant Sharma] Patrick's comments on PR. 35d8d79 [Prashant Sharma] Specified java 8 building in the docs 31d4cd6 [Prashant Sharma] Maven build to support -Pjava8-tests flag. 4ab87d3 [Prashant Sharma] Review feedback on the pr c33dc2c [Prashant Sharma] SPARK-964, Java 8 API Support.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala119
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala101
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala4
4 files changed, 98 insertions, 128 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
index e23b725052..721d502732 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
@@ -41,7 +41,7 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classTag: ClassTag[T]
/** Return a new DStream containing only the elements that satisfy a predicate. */
def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] =
- dstream.filter((x => f(x).booleanValue()))
+ dstream.filter((x => f.call(x).booleanValue()))
/** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def cache(): JavaDStream[T] = dstream.cache()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 7aa7ead29b..a85cd04c93 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -17,19 +17,20 @@
package org.apache.spark.streaming.api.java
-import java.util.{List => JList}
+import java.util
import java.lang.{Long => JLong}
+import java.util.{List => JList}
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
-import org.apache.spark.streaming._
-import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaRDD}
-import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
-import org.apache.spark.api.java.function.{Function3 => JFunction3, _}
-import java.util
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaRDDLike}
+import org.apache.spark.api.java.JavaPairRDD._
+import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, Function3 => JFunction3, _}
import org.apache.spark.rdd.RDD
-import JavaDStream._
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.api.java.JavaDStream._
import org.apache.spark.streaming.dstream.DStream
trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]
@@ -123,23 +124,23 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* this DStream. Applying glom() to an RDD coalesces all elements within each partition into
* an array.
*/
- def glom(): JavaDStream[JList[T]] = {
+ def glom(): JavaDStream[JList[T]] =
new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq)))
- }
+
/** Return the [[org.apache.spark.streaming.StreamingContext]] associated with this DStream */
- def context(): StreamingContext = dstream.context()
+ def context(): StreamingContext = dstream.context
/** Return a new DStream by applying a function to all elements of this DStream. */
def map[R](f: JFunction[T, R]): JavaDStream[R] = {
- new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType())
+ new JavaDStream(dstream.map(f)(fakeClassTag))(fakeClassTag)
}
/** Return a new DStream by applying a function to all elements of this DStream. */
- def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
- def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
- new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
+ def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
+ def cm: ClassTag[(K2, V2)] = fakeClassTag
+ new JavaPairDStream(dstream.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}
/**
@@ -148,19 +149,19 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
*/
def flatMap[U](f: FlatMapFunction[T, U]): JavaDStream[U] = {
import scala.collection.JavaConverters._
- def fn = (x: T) => f.apply(x).asScala
- new JavaDStream(dstream.flatMap(fn)(f.elementType()))(f.elementType())
+ def fn = (x: T) => f.call(x).asScala
+ new JavaDStream(dstream.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
/**
* Return a new DStream by applying a function to all elements of this DStream,
* and then flattening the results
*/
- def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
+ def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
import scala.collection.JavaConverters._
- def fn = (x: T) => f.apply(x).asScala
- def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
- new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType())
+ def fn = (x: T) => f.call(x).asScala
+ def cm: ClassTag[(K2, V2)] = fakeClassTag
+ new JavaPairDStream(dstream.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}
/**
@@ -169,8 +170,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* of the RDD.
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
- new JavaDStream(dstream.mapPartitions(fn)(f.elementType()))(f.elementType())
+ def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ new JavaDStream(dstream.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
/**
@@ -178,10 +179,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
* of the RDD.
*/
- def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2])
+ def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2])
: JavaPairDStream[K2, V2] = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
- new JavaPairDStream(dstream.mapPartitions(fn))(f.keyType(), f.valueType())
+ def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ new JavaPairDStream(dstream.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
}
/**
@@ -283,8 +284,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* on each RDD of 'this' DStream.
*/
def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = {
- implicit val cm: ClassTag[U] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
+ implicit val cm: ClassTag[U] = fakeClassTag
+
def scalaTransform (in: RDD[T]): RDD[U] =
transformFunc.call(wrapRDD(in)).rdd
dstream.transform(scalaTransform(_))
@@ -295,8 +296,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* on each RDD of 'this' DStream.
*/
def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = {
- implicit val cm: ClassTag[U] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
+ implicit val cm: ClassTag[U] = fakeClassTag
+
def scalaTransform (in: RDD[T], time: Time): RDD[U] =
transformFunc.call(wrapRDD(in), time).rdd
dstream.transform(scalaTransform(_, _))
@@ -306,12 +307,11 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream.
*/
- def transform[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]):
+ def transformToPair[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]):
JavaPairDStream[K2, V2] = {
- implicit val cmk: ClassTag[K2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
- implicit val cmv: ClassTag[V2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
+ implicit val cmk: ClassTag[K2] = fakeClassTag
+ implicit val cmv: ClassTag[V2] = fakeClassTag
+
def scalaTransform (in: RDD[T]): RDD[(K2, V2)] =
transformFunc.call(wrapRDD(in)).rdd
dstream.transform(scalaTransform(_))
@@ -321,12 +321,11 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream.
*/
- def transform[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]):
+ def transformToPair[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]):
JavaPairDStream[K2, V2] = {
- implicit val cmk: ClassTag[K2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
- implicit val cmv: ClassTag[V2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
+ implicit val cmk: ClassTag[K2] = fakeClassTag
+ implicit val cmv: ClassTag[V2] = fakeClassTag
+
def scalaTransform (in: RDD[T], time: Time): RDD[(K2, V2)] =
transformFunc.call(wrapRDD(in), time).rdd
dstream.transform(scalaTransform(_, _))
@@ -340,10 +339,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
other: JavaDStream[U],
transformFunc: JFunction3[R, JavaRDD[U], Time, JavaRDD[W]]
): JavaDStream[W] = {
- implicit val cmu: ClassTag[U] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
- implicit val cmv: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cmu: ClassTag[U] = fakeClassTag
+ implicit val cmv: ClassTag[W] = fakeClassTag
+
def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[W] =
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
dstream.transformWith[U, W](other.dstream, scalaTransform(_, _, _))
@@ -353,16 +351,13 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream and 'other' DStream.
*/
- def transformWith[U, K2, V2](
+ def transformWithToPair[U, K2, V2](
other: JavaDStream[U],
transformFunc: JFunction3[R, JavaRDD[U], Time, JavaPairRDD[K2, V2]]
): JavaPairDStream[K2, V2] = {
- implicit val cmu: ClassTag[U] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
- implicit val cmk2: ClassTag[K2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
- implicit val cmv2: ClassTag[V2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
+ implicit val cmu: ClassTag[U] = fakeClassTag
+ implicit val cmk2: ClassTag[K2] = fakeClassTag
+ implicit val cmv2: ClassTag[V2] = fakeClassTag
def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[(K2, V2)] =
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
dstream.transformWith[U, (K2, V2)](other.dstream, scalaTransform(_, _, _))
@@ -376,12 +371,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
other: JavaPairDStream[K2, V2],
transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaRDD[W]]
): JavaDStream[W] = {
- implicit val cmk2: ClassTag[K2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
- implicit val cmv2: ClassTag[V2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
- implicit val cmw: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cmk2: ClassTag[K2] = fakeClassTag
+ implicit val cmv2: ClassTag[V2] = fakeClassTag
+ implicit val cmw: ClassTag[W] = fakeClassTag
+
def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[W] =
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
dstream.transformWith[(K2, V2), W](other.dstream, scalaTransform(_, _, _))
@@ -391,18 +384,14 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream and 'other' DStream.
*/
- def transformWith[K2, V2, K3, V3](
+ def transformWithToPair[K2, V2, K3, V3](
other: JavaPairDStream[K2, V2],
transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaPairRDD[K3, V3]]
): JavaPairDStream[K3, V3] = {
- implicit val cmk2: ClassTag[K2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
- implicit val cmv2: ClassTag[V2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
- implicit val cmk3: ClassTag[K3] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K3]]
- implicit val cmv3: ClassTag[V3] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V3]]
+ implicit val cmk2: ClassTag[K2] = fakeClassTag
+ implicit val cmv2: ClassTag[V2] = fakeClassTag
+ implicit val cmk3: ClassTag[K3] = fakeClassTag
+ implicit val cmv3: ClassTag[V3] = fakeClassTag
def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[(K3, V3)] =
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
dstream.transformWith[(K2, V2), (K3, V3)](other.dstream, scalaTransform(_, _, _))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 2c7ff87744..ac451d1913 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -17,24 +17,25 @@
package org.apache.spark.streaming.api.java
-import java.util.{List => JList}
import java.lang.{Long => JLong}
+import java.util.{List => JList}
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, Function3 => JFunction3}
-import org.apache.spark.Partitioner
+import com.google.common.base.Optional
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.api.java.{JavaUtils, JavaRDD, JavaPairRDD}
-import org.apache.spark.storage.StorageLevel
-import com.google.common.base.Optional
+import org.apache.spark.Partitioner
+import org.apache.spark.api.java.{JavaPairRDD, JavaUtils}
+import org.apache.spark.api.java.JavaPairRDD._
+import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.PairRDDFunctions
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
/**
@@ -54,7 +55,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/** Return a new DStream containing only the elements that satisfy a predicate. */
def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairDStream[K, V] =
- dstream.filter((x => f(x).booleanValue()))
+ dstream.filter((x => f.call(x).booleanValue()))
/** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def cache(): JavaPairDStream[K, V] = dstream.cache()
@@ -168,8 +169,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
mergeCombiners: JFunction2[C, C, C],
partitioner: Partitioner
): JavaPairDStream[K, C] = {
- implicit val cm: ClassTag[C] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
+ implicit val cm: ClassTag[C] = fakeClassTag
dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)
}
@@ -184,8 +184,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
partitioner: Partitioner,
mapSideCombine: Boolean
): JavaPairDStream[K, C] = {
- implicit val cm: ClassTag[C] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
+ implicit val cm: ClassTag[C] = fakeClassTag
dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine)
}
@@ -279,7 +278,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* DStream's batching interval
*/
def reduceByKeyAndWindow(
- reduceFunc: Function2[V, V, V],
+ reduceFunc: JFunction2[V, V, V],
windowDuration: Duration,
slideDuration: Duration
):JavaPairDStream[K, V] = {
@@ -299,7 +298,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param numPartitions Number of partitions of each RDD in the new DStream.
*/
def reduceByKeyAndWindow(
- reduceFunc: Function2[V, V, V],
+ reduceFunc: JFunction2[V, V, V],
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int
@@ -320,7 +319,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* DStream.
*/
def reduceByKeyAndWindow(
- reduceFunc: Function2[V, V, V],
+ reduceFunc: JFunction2[V, V, V],
windowDuration: Duration,
slideDuration: Duration,
partitioner: Partitioner
@@ -345,8 +344,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* DStream's batching interval
*/
def reduceByKeyAndWindow(
- reduceFunc: Function2[V, V, V],
- invReduceFunc: Function2[V, V, V],
+ reduceFunc: JFunction2[V, V, V],
+ invReduceFunc: JFunction2[V, V, V],
windowDuration: Duration,
slideDuration: Duration
): JavaPairDStream[K, V] = {
@@ -374,8 +373,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* set this to null if you do not want to filter
*/
def reduceByKeyAndWindow(
- reduceFunc: Function2[V, V, V],
- invReduceFunc: Function2[V, V, V],
+ reduceFunc: JFunction2[V, V, V],
+ invReduceFunc: JFunction2[V, V, V],
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int,
@@ -412,8 +411,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* set this to null if you do not want to filter
*/
def reduceByKeyAndWindow(
- reduceFunc: Function2[V, V, V],
- invReduceFunc: Function2[V, V, V],
+ reduceFunc: JFunction2[V, V, V],
+ invReduceFunc: JFunction2[V, V, V],
windowDuration: Duration,
slideDuration: Duration,
partitioner: Partitioner,
@@ -453,8 +452,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
*/
def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], Optional[S]])
: JavaPairDStream[K, S] = {
- implicit val cm: ClassTag[S] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
+ implicit val cm: ClassTag[S] = fakeClassTag
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc))
}
@@ -471,8 +469,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
numPartitions: Int)
: JavaPairDStream[K, S] = {
- implicit val cm: ClassTag[S] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
+ implicit val cm: ClassTag[S] = fakeClassTag
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), numPartitions)
}
@@ -490,8 +487,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
partitioner: Partitioner
): JavaPairDStream[K, S] = {
- implicit val cm: ClassTag[S] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
+ implicit val cm: ClassTag[S] = fakeClassTag
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), partitioner)
}
@@ -501,8 +497,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* 'this' DStream without changing the key.
*/
def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = {
- implicit val cm: ClassTag[U] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
+ implicit val cm: ClassTag[U] = fakeClassTag
dstream.mapValues(f)
}
@@ -524,8 +519,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* of partitions.
*/
def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}
@@ -537,8 +531,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
numPartitions: Int
): JavaPairDStream[K, (JList[V], JList[W])] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
dstream.cogroup(other.dstream, numPartitions)
.mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}
@@ -551,8 +544,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
partitioner: Partitioner
): JavaPairDStream[K, (JList[V], JList[W])] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
dstream.cogroup(other.dstream, partitioner)
.mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}
@@ -562,8 +554,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
*/
def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
dstream.join(other.dstream)
}
@@ -572,8 +563,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
*/
def join[W](other: JavaPairDStream[K, W], numPartitions: Int): JavaPairDStream[K, (V, W)] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
dstream.join(other.dstream, numPartitions)
}
@@ -585,8 +575,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
partitioner: Partitioner
): JavaPairDStream[K, (V, W)] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
dstream.join(other.dstream, partitioner)
}
@@ -596,8 +585,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* number of partitions.
*/
def leftOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, Optional[W])] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
val joinResult = dstream.leftOuterJoin(other.dstream)
joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
}
@@ -611,8 +599,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
numPartitions: Int
): JavaPairDStream[K, (V, Optional[W])] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
val joinResult = dstream.leftOuterJoin(other.dstream, numPartitions)
joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
}
@@ -625,8 +612,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
partitioner: Partitioner
): JavaPairDStream[K, (V, Optional[W])] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
val joinResult = dstream.leftOuterJoin(other.dstream, partitioner)
joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
}
@@ -652,8 +638,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
numPartitions: Int
): JavaPairDStream[K, (Optional[V], W)] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
val joinResult = dstream.rightOuterJoin(other.dstream, numPartitions)
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
}
@@ -667,8 +652,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
partitioner: Partitioner
): JavaPairDStream[K, (Optional[V], W)] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
val joinResult = dstream.rightOuterJoin(other.dstream, partitioner)
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
}
@@ -748,8 +732,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
new JavaDStream[(K, V)](dstream)
}
- override val classTag: ClassTag[(K, V)] =
- implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K, V]]]
+ override val classTag: ClassTag[(K, V)] = fakeClassTag
}
object JavaPairDStream {
@@ -758,10 +741,8 @@ object JavaPairDStream {
}
def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = {
- implicit val cmk: ClassTag[K] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
- implicit val cmv: ClassTag[V] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+ implicit val cmk: ClassTag[K] = fakeClassTag
+ implicit val cmv: ClassTag[V] = fakeClassTag
new JavaPairDStream[K, V](dstream.dstream)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index b082bb0585..c48d754e43 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -187,7 +187,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
converter: JFunction[InputStream, java.lang.Iterable[T]],
storageLevel: StorageLevel)
: JavaDStream[T] = {
- def fn = (x: InputStream) => converter.apply(x).toIterator
+ def fn = (x: InputStream) => converter.call(x).toIterator
implicit val cmt: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.socketStream(hostname, port, fn, storageLevel)
@@ -431,7 +431,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* In the transform function, convert the JavaRDD corresponding to that JavaDStream to
* a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD().
*/
- def transform[K, V](
+ def transformToPair[K, V](
dstreams: JList[JavaDStream[_]],
transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaPairRDD[K, V]]
): JavaPairDStream[K, V] = {