aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala119
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala101
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala4
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java62
5 files changed, 130 insertions, 158 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
index e23b725052..721d502732 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
@@ -41,7 +41,7 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classTag: ClassTag[T]
/** Return a new DStream containing only the elements that satisfy a predicate. */
def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] =
- dstream.filter((x => f(x).booleanValue()))
+ dstream.filter((x => f.call(x).booleanValue()))
/** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def cache(): JavaDStream[T] = dstream.cache()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 7aa7ead29b..a85cd04c93 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -17,19 +17,20 @@
package org.apache.spark.streaming.api.java
-import java.util.{List => JList}
+import java.util
import java.lang.{Long => JLong}
+import java.util.{List => JList}
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
-import org.apache.spark.streaming._
-import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaRDD}
-import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
-import org.apache.spark.api.java.function.{Function3 => JFunction3, _}
-import java.util
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaRDDLike}
+import org.apache.spark.api.java.JavaPairRDD._
+import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, Function3 => JFunction3, _}
import org.apache.spark.rdd.RDD
-import JavaDStream._
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.api.java.JavaDStream._
import org.apache.spark.streaming.dstream.DStream
trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]
@@ -123,23 +124,23 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* this DStream. Applying glom() to an RDD coalesces all elements within each partition into
* an array.
*/
- def glom(): JavaDStream[JList[T]] = {
+ def glom(): JavaDStream[JList[T]] =
new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq)))
- }
+
/** Return the [[org.apache.spark.streaming.StreamingContext]] associated with this DStream */
- def context(): StreamingContext = dstream.context()
+ def context(): StreamingContext = dstream.context
/** Return a new DStream by applying a function to all elements of this DStream. */
def map[R](f: JFunction[T, R]): JavaDStream[R] = {
- new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType())
+ new JavaDStream(dstream.map(f)(fakeClassTag))(fakeClassTag)
}
/** Return a new DStream by applying a function to all elements of this DStream. */
- def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
- def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
- new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
+ def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
+ def cm: ClassTag[(K2, V2)] = fakeClassTag
+ new JavaPairDStream(dstream.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}
/**
@@ -148,19 +149,19 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
*/
def flatMap[U](f: FlatMapFunction[T, U]): JavaDStream[U] = {
import scala.collection.JavaConverters._
- def fn = (x: T) => f.apply(x).asScala
- new JavaDStream(dstream.flatMap(fn)(f.elementType()))(f.elementType())
+ def fn = (x: T) => f.call(x).asScala
+ new JavaDStream(dstream.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
/**
* Return a new DStream by applying a function to all elements of this DStream,
* and then flattening the results
*/
- def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
+ def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
import scala.collection.JavaConverters._
- def fn = (x: T) => f.apply(x).asScala
- def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
- new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType())
+ def fn = (x: T) => f.call(x).asScala
+ def cm: ClassTag[(K2, V2)] = fakeClassTag
+ new JavaPairDStream(dstream.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}
/**
@@ -169,8 +170,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* of the RDD.
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
- new JavaDStream(dstream.mapPartitions(fn)(f.elementType()))(f.elementType())
+ def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ new JavaDStream(dstream.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
/**
@@ -178,10 +179,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
* of the RDD.
*/
- def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2])
+ def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2])
: JavaPairDStream[K2, V2] = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
- new JavaPairDStream(dstream.mapPartitions(fn))(f.keyType(), f.valueType())
+ def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ new JavaPairDStream(dstream.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
}
/**
@@ -283,8 +284,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* on each RDD of 'this' DStream.
*/
def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = {
- implicit val cm: ClassTag[U] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
+ implicit val cm: ClassTag[U] = fakeClassTag
+
def scalaTransform (in: RDD[T]): RDD[U] =
transformFunc.call(wrapRDD(in)).rdd
dstream.transform(scalaTransform(_))
@@ -295,8 +296,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* on each RDD of 'this' DStream.
*/
def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = {
- implicit val cm: ClassTag[U] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
+ implicit val cm: ClassTag[U] = fakeClassTag
+
def scalaTransform (in: RDD[T], time: Time): RDD[U] =
transformFunc.call(wrapRDD(in), time).rdd
dstream.transform(scalaTransform(_, _))
@@ -306,12 +307,11 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream.
*/
- def transform[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]):
+ def transformToPair[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]):
JavaPairDStream[K2, V2] = {
- implicit val cmk: ClassTag[K2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
- implicit val cmv: ClassTag[V2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
+ implicit val cmk: ClassTag[K2] = fakeClassTag
+ implicit val cmv: ClassTag[V2] = fakeClassTag
+
def scalaTransform (in: RDD[T]): RDD[(K2, V2)] =
transformFunc.call(wrapRDD(in)).rdd
dstream.transform(scalaTransform(_))
@@ -321,12 +321,11 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream.
*/
- def transform[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]):
+ def transformToPair[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]):
JavaPairDStream[K2, V2] = {
- implicit val cmk: ClassTag[K2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
- implicit val cmv: ClassTag[V2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
+ implicit val cmk: ClassTag[K2] = fakeClassTag
+ implicit val cmv: ClassTag[V2] = fakeClassTag
+
def scalaTransform (in: RDD[T], time: Time): RDD[(K2, V2)] =
transformFunc.call(wrapRDD(in), time).rdd
dstream.transform(scalaTransform(_, _))
@@ -340,10 +339,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
other: JavaDStream[U],
transformFunc: JFunction3[R, JavaRDD[U], Time, JavaRDD[W]]
): JavaDStream[W] = {
- implicit val cmu: ClassTag[U] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
- implicit val cmv: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cmu: ClassTag[U] = fakeClassTag
+ implicit val cmv: ClassTag[W] = fakeClassTag
+
def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[W] =
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
dstream.transformWith[U, W](other.dstream, scalaTransform(_, _, _))
@@ -353,16 +351,13 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream and 'other' DStream.
*/
- def transformWith[U, K2, V2](
+ def transformWithToPair[U, K2, V2](
other: JavaDStream[U],
transformFunc: JFunction3[R, JavaRDD[U], Time, JavaPairRDD[K2, V2]]
): JavaPairDStream[K2, V2] = {
- implicit val cmu: ClassTag[U] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
- implicit val cmk2: ClassTag[K2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
- implicit val cmv2: ClassTag[V2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
+ implicit val cmu: ClassTag[U] = fakeClassTag
+ implicit val cmk2: ClassTag[K2] = fakeClassTag
+ implicit val cmv2: ClassTag[V2] = fakeClassTag
def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[(K2, V2)] =
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
dstream.transformWith[U, (K2, V2)](other.dstream, scalaTransform(_, _, _))
@@ -376,12 +371,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
other: JavaPairDStream[K2, V2],
transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaRDD[W]]
): JavaDStream[W] = {
- implicit val cmk2: ClassTag[K2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
- implicit val cmv2: ClassTag[V2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
- implicit val cmw: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cmk2: ClassTag[K2] = fakeClassTag
+ implicit val cmv2: ClassTag[V2] = fakeClassTag
+ implicit val cmw: ClassTag[W] = fakeClassTag
+
def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[W] =
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
dstream.transformWith[(K2, V2), W](other.dstream, scalaTransform(_, _, _))
@@ -391,18 +384,14 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream and 'other' DStream.
*/
- def transformWith[K2, V2, K3, V3](
+ def transformWithToPair[K2, V2, K3, V3](
other: JavaPairDStream[K2, V2],
transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaPairRDD[K3, V3]]
): JavaPairDStream[K3, V3] = {
- implicit val cmk2: ClassTag[K2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
- implicit val cmv2: ClassTag[V2] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
- implicit val cmk3: ClassTag[K3] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K3]]
- implicit val cmv3: ClassTag[V3] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V3]]
+ implicit val cmk2: ClassTag[K2] = fakeClassTag
+ implicit val cmv2: ClassTag[V2] = fakeClassTag
+ implicit val cmk3: ClassTag[K3] = fakeClassTag
+ implicit val cmv3: ClassTag[V3] = fakeClassTag
def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[(K3, V3)] =
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
dstream.transformWith[(K2, V2), (K3, V3)](other.dstream, scalaTransform(_, _, _))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 2c7ff87744..ac451d1913 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -17,24 +17,25 @@
package org.apache.spark.streaming.api.java
-import java.util.{List => JList}
import java.lang.{Long => JLong}
+import java.util.{List => JList}
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, Function3 => JFunction3}
-import org.apache.spark.Partitioner
+import com.google.common.base.Optional
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.api.java.{JavaUtils, JavaRDD, JavaPairRDD}
-import org.apache.spark.storage.StorageLevel
-import com.google.common.base.Optional
+import org.apache.spark.Partitioner
+import org.apache.spark.api.java.{JavaPairRDD, JavaUtils}
+import org.apache.spark.api.java.JavaPairRDD._
+import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.PairRDDFunctions
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
/**
@@ -54,7 +55,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/** Return a new DStream containing only the elements that satisfy a predicate. */
def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairDStream[K, V] =
- dstream.filter((x => f(x).booleanValue()))
+ dstream.filter((x => f.call(x).booleanValue()))
/** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def cache(): JavaPairDStream[K, V] = dstream.cache()
@@ -168,8 +169,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
mergeCombiners: JFunction2[C, C, C],
partitioner: Partitioner
): JavaPairDStream[K, C] = {
- implicit val cm: ClassTag[C] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
+ implicit val cm: ClassTag[C] = fakeClassTag
dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)
}
@@ -184,8 +184,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
partitioner: Partitioner,
mapSideCombine: Boolean
): JavaPairDStream[K, C] = {
- implicit val cm: ClassTag[C] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
+ implicit val cm: ClassTag[C] = fakeClassTag
dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine)
}
@@ -279,7 +278,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* DStream's batching interval
*/
def reduceByKeyAndWindow(
- reduceFunc: Function2[V, V, V],
+ reduceFunc: JFunction2[V, V, V],
windowDuration: Duration,
slideDuration: Duration
):JavaPairDStream[K, V] = {
@@ -299,7 +298,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param numPartitions Number of partitions of each RDD in the new DStream.
*/
def reduceByKeyAndWindow(
- reduceFunc: Function2[V, V, V],
+ reduceFunc: JFunction2[V, V, V],
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int
@@ -320,7 +319,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* DStream.
*/
def reduceByKeyAndWindow(
- reduceFunc: Function2[V, V, V],
+ reduceFunc: JFunction2[V, V, V],
windowDuration: Duration,
slideDuration: Duration,
partitioner: Partitioner
@@ -345,8 +344,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* DStream's batching interval
*/
def reduceByKeyAndWindow(
- reduceFunc: Function2[V, V, V],
- invReduceFunc: Function2[V, V, V],
+ reduceFunc: JFunction2[V, V, V],
+ invReduceFunc: JFunction2[V, V, V],
windowDuration: Duration,
slideDuration: Duration
): JavaPairDStream[K, V] = {
@@ -374,8 +373,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* set this to null if you do not want to filter
*/
def reduceByKeyAndWindow(
- reduceFunc: Function2[V, V, V],
- invReduceFunc: Function2[V, V, V],
+ reduceFunc: JFunction2[V, V, V],
+ invReduceFunc: JFunction2[V, V, V],
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int,
@@ -412,8 +411,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* set this to null if you do not want to filter
*/
def reduceByKeyAndWindow(
- reduceFunc: Function2[V, V, V],
- invReduceFunc: Function2[V, V, V],
+ reduceFunc: JFunction2[V, V, V],
+ invReduceFunc: JFunction2[V, V, V],
windowDuration: Duration,
slideDuration: Duration,
partitioner: Partitioner,
@@ -453,8 +452,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
*/
def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], Optional[S]])
: JavaPairDStream[K, S] = {
- implicit val cm: ClassTag[S] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
+ implicit val cm: ClassTag[S] = fakeClassTag
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc))
}
@@ -471,8 +469,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
numPartitions: Int)
: JavaPairDStream[K, S] = {
- implicit val cm: ClassTag[S] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
+ implicit val cm: ClassTag[S] = fakeClassTag
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), numPartitions)
}
@@ -490,8 +487,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
partitioner: Partitioner
): JavaPairDStream[K, S] = {
- implicit val cm: ClassTag[S] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
+ implicit val cm: ClassTag[S] = fakeClassTag
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), partitioner)
}
@@ -501,8 +497,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* 'this' DStream without changing the key.
*/
def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = {
- implicit val cm: ClassTag[U] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
+ implicit val cm: ClassTag[U] = fakeClassTag
dstream.mapValues(f)
}
@@ -524,8 +519,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* of partitions.
*/
def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}
@@ -537,8 +531,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
numPartitions: Int
): JavaPairDStream[K, (JList[V], JList[W])] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
dstream.cogroup(other.dstream, numPartitions)
.mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}
@@ -551,8 +544,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
partitioner: Partitioner
): JavaPairDStream[K, (JList[V], JList[W])] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
dstream.cogroup(other.dstream, partitioner)
.mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}
@@ -562,8 +554,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
*/
def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
dstream.join(other.dstream)
}
@@ -572,8 +563,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
*/
def join[W](other: JavaPairDStream[K, W], numPartitions: Int): JavaPairDStream[K, (V, W)] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
dstream.join(other.dstream, numPartitions)
}
@@ -585,8 +575,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
partitioner: Partitioner
): JavaPairDStream[K, (V, W)] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
dstream.join(other.dstream, partitioner)
}
@@ -596,8 +585,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* number of partitions.
*/
def leftOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, Optional[W])] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
val joinResult = dstream.leftOuterJoin(other.dstream)
joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
}
@@ -611,8 +599,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
numPartitions: Int
): JavaPairDStream[K, (V, Optional[W])] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
val joinResult = dstream.leftOuterJoin(other.dstream, numPartitions)
joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
}
@@ -625,8 +612,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
partitioner: Partitioner
): JavaPairDStream[K, (V, Optional[W])] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
val joinResult = dstream.leftOuterJoin(other.dstream, partitioner)
joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
}
@@ -652,8 +638,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
numPartitions: Int
): JavaPairDStream[K, (Optional[V], W)] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
val joinResult = dstream.rightOuterJoin(other.dstream, numPartitions)
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
}
@@ -667,8 +652,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
other: JavaPairDStream[K, W],
partitioner: Partitioner
): JavaPairDStream[K, (Optional[V], W)] = {
- implicit val cm: ClassTag[W] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ implicit val cm: ClassTag[W] = fakeClassTag
val joinResult = dstream.rightOuterJoin(other.dstream, partitioner)
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
}
@@ -748,8 +732,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
new JavaDStream[(K, V)](dstream)
}
- override val classTag: ClassTag[(K, V)] =
- implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K, V]]]
+ override val classTag: ClassTag[(K, V)] = fakeClassTag
}
object JavaPairDStream {
@@ -758,10 +741,8 @@ object JavaPairDStream {
}
def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = {
- implicit val cmk: ClassTag[K] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
- implicit val cmv: ClassTag[V] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+ implicit val cmk: ClassTag[K] = fakeClassTag
+ implicit val cmv: ClassTag[V] = fakeClassTag
new JavaPairDStream[K, V](dstream.dstream)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index b082bb0585..c48d754e43 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -187,7 +187,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
converter: JFunction[InputStream, java.lang.Iterable[T]],
storageLevel: StorageLevel)
: JavaDStream[T] = {
- def fn = (x: InputStream) => converter.apply(x).toIterator
+ def fn = (x: InputStream) => converter.call(x).toIterator
implicit val cmt: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.socketStream(hostname, port, fn, storageLevel)
@@ -431,7 +431,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* In the transform function, convert the JavaRDD corresponding to that JavaDStream to
* a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD().
*/
- def transform[K, V](
+ def transformToPair[K, V](
dstreams: JList[JavaDStream[_]],
transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaPairRDD[K, V]]
): JavaPairDStream[K, V] = {
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 54a0791d04..e93bf18b6d 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -247,14 +247,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
- private class IntegerSum extends Function2<Integer, Integer, Integer> {
+ private class IntegerSum implements Function2<Integer, Integer, Integer> {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
}
- private class IntegerDifference extends Function2<Integer, Integer, Integer> {
+ private class IntegerDifference implements Function2<Integer, Integer, Integer> {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 - i2;
@@ -392,7 +392,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
);
- JavaPairDStream<String, Integer> transformed3 = stream.transform(
+ JavaPairDStream<String, Integer> transformed3 = stream.transformToPair(
new Function<JavaRDD<Integer>, JavaPairRDD<String, Integer>>() {
@Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) throws Exception {
return null;
@@ -400,7 +400,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
);
- JavaPairDStream<String, Integer> transformed4 = stream.transform(
+ JavaPairDStream<String, Integer> transformed4 = stream.transformToPair(
new Function2<JavaRDD<Integer>, Time, JavaPairRDD<String, Integer>>() {
@Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) throws Exception {
return null;
@@ -424,7 +424,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
);
- JavaPairDStream<String, String> pairTransformed3 = pairStream.transform(
+ JavaPairDStream<String, String> pairTransformed3 = pairStream.transformToPair(
new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, String>>() {
@Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) throws Exception {
return null;
@@ -432,7 +432,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
);
- JavaPairDStream<String, String> pairTransformed4 = pairStream.transform(
+ JavaPairDStream<String, String> pairTransformed4 = pairStream.transformToPair(
new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, String>>() {
@Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in, Time time) throws Exception {
return null;
@@ -482,7 +482,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
ssc, stringStringKVStream2, 1);
JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
- JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWith(
+ JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWithToPair(
pairStream2,
new Function3<
JavaPairRDD<String, String>,
@@ -551,7 +551,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
);
- JavaPairDStream<Double, Double> transformed3 = stream1.transformWith(
+ JavaPairDStream<Double, Double> transformed3 = stream1.transformWithToPair(
stream2,
new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() {
@Override
@@ -561,7 +561,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
);
- JavaPairDStream<Double, Double> transformed4 = stream1.transformWith(
+ JavaPairDStream<Double, Double> transformed4 = stream1.transformWithToPair(
pairStream1,
new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaPairRDD<Double, Double>>() {
@Override
@@ -591,7 +591,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
);
- JavaPairDStream<Double, Double> pairTransformed3 = pairStream1.transformWith(
+ JavaPairDStream<Double, Double> pairTransformed3 = pairStream1.transformWithToPair(
stream2,
new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() {
@Override
@@ -601,7 +601,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
);
- JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWith(
+ JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWithToPair(
pairStream2,
new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, Character>, Time, JavaPairRDD<Double, Double>>() {
@Override
@@ -656,7 +656,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<JavaDStream<?>> listOfDStreams2 =
Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream());
- JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transform(
+ JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair(
listOfDStreams2,
new Function2<List<JavaRDD<?>>, Time, JavaPairRDD<Integer, Tuple2<Integer, String>>>() {
public JavaPairRDD<Integer, Tuple2<Integer, String>> call(List<JavaRDD<?>> listOfRDDs, Time time) {
@@ -671,7 +671,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
return new Tuple2<Integer, Integer>(i, i);
}
};
- return rdd1.union(rdd2).map(mapToTuple).join(prdd3);
+ return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
}
}
);
@@ -742,17 +742,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
new Tuple2<Integer, String>(9, "s")));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<Integer,String> flatMapped = stream.flatMap(
- new PairFlatMapFunction<String, Integer, String>() {
- @Override
- public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
- List<Tuple2<Integer, String>> out = Lists.newArrayList();
- for (String letter: in.split("(?!^)")) {
- out.add(new Tuple2<Integer, String>(in.length(), letter));
- }
- return out;
+ JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(
+ new PairFlatMapFunction<String, Integer, String>() {
+ @Override
+ public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
+ List<Tuple2<Integer, String>> out = Lists.newArrayList();
+ for (String letter: in.split("(?!^)")) {
+ out.add(new Tuple2<Integer, String>(in.length(), letter));
}
- });
+ return out;
+ }
+ });
JavaTestUtils.attachTestOutputStream(flatMapped);
List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -816,7 +816,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList(new Tuple2<String, Integer>("yankees", 7)));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = stream.map(
+ JavaPairDStream<String, Integer> pairStream = stream.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String in) throws Exception {
@@ -880,7 +880,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<Integer, String> reversed = pairStream.map(
+ JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(
new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> in) throws Exception {
@@ -913,7 +913,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<Integer, String> reversed = pairStream.mapPartitions(
+ JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(
new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, String>() {
@Override
public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) throws Exception {
@@ -983,7 +983,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Tuple2<String, Integer>> stream =
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<Integer, String> flatMapped = pairStream.flatMap(
+ JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(
new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) throws Exception {
@@ -1228,7 +1228,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> reduceWindowed =
- pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), new Duration(2000), new Duration(1000));
+ pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(),
+ new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(reduceWindowed);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1300,7 +1301,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
ssc, inputData, 1);
JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<Integer, Integer> sorted = pairStream.transform(
+ JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(
new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, Integer>>() {
@Override
public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception {
@@ -1632,7 +1633,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@Test
public void testSocketString() {
- class Converter extends Function<InputStream, Iterable<String>> {
+
+ class Converter implements Function<InputStream, Iterable<String>> {
public Iterable<String> call(InputStream in) throws IOException {
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
List<String> out = new ArrayList<String>();