diff options
Diffstat (limited to 'streaming')
6 files changed, 21 insertions, 11 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala index 30985b4ebc..51efe6cae8 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala @@ -4,6 +4,7 @@ import spark.streaming.{Duration, Time, DStream} import spark.api.java.function.{Function => JFunction} import spark.api.java.JavaRDD import spark.storage.StorageLevel +import spark.RDD /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous @@ -26,7 +27,9 @@ import spark.storage.StorageLevel * - A function that is used to generate an RDD after each time interval */ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T]) - extends JavaDStreamLike[T, JavaDStream[T]] { + extends JavaDStreamLike[T, JavaDStream[T], JavaRDD[T]] { + + override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd) /** Return a new DStream containing only the elements that satisfy a predicate. */ def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] = diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala index 1c1ba05ff9..4e1458ca9e 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala @@ -6,17 +6,20 @@ import java.lang.{Long => JLong} import scala.collection.JavaConversions._ import spark.streaming._ -import spark.api.java.JavaRDD +import spark.api.java.{JavaRDDLike, JavaRDD} import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _} import java.util import spark.RDD import JavaDStream._ -trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable { +trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]] + extends Serializable { implicit val classManifest: ClassManifest[T] def dstream: DStream[T] + def wrapRDD(in: RDD[T]): R + implicit def scalaIntToJavaLong(in: DStream[Long]): JavaDStream[JLong] = { in.map(new JLong(_)) } @@ -220,16 +223,16 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable * Apply a function to each RDD in this DStream. This is an output operator, so * this DStream will be registered as an output stream and therefore materialized. */ - def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) { - dstream.foreach(rdd => foreachFunc.call(new JavaRDD(rdd))) + def foreach(foreachFunc: JFunction[R, Void]) { + dstream.foreach(rdd => foreachFunc.call(wrapRDD(rdd))) } /** * Apply a function to each RDD in this DStream. This is an output operator, so * this DStream will be registered as an output stream and therefore materialized. */ - def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) { - dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time)) + def foreach(foreachFunc: JFunction2[R, Time, Void]) { + dstream.foreach((rdd, time) => foreachFunc.call(wrapRDD(rdd), time)) } /** diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala index 952ca657bf..de3e802300 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala @@ -19,7 +19,9 @@ import com.google.common.base.Optional class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( implicit val kManifiest: ClassManifest[K], implicit val vManifest: ClassManifest[V]) - extends JavaDStreamLike[(K, V), JavaPairDStream[K, V]] { + extends JavaDStreamLike[(K, V), JavaPairDStream[K, V], JavaPairRDD[K, V]] { + + override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd) // ======================================================================= // Methods common to all DStream's diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index d9a676819a..878e179589 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -254,7 +254,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Registers an output stream that will be computed every interval */ - def registerOutputStream(outputStream: JavaDStreamLike[_, _]) { + def registerOutputStream(outputStream: JavaDStreamLike[_, _, _]) { ssc.registerOutputStream(outputStream.dstream) } diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 5d510fd89f..4fe2de5a1a 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -12,6 +12,7 @@ import org.junit.Test; import scala.Tuple2; import spark.HashPartitioner; import spark.api.java.JavaRDD; +import spark.api.java.JavaRDDLike; import spark.api.java.JavaSparkContext; import spark.api.java.function.*; import spark.storage.StorageLevel; diff --git a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala index 52ea28732a..64a7e7cbf9 100644 --- a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala @@ -31,8 +31,9 @@ trait JavaTestBase extends TestSuiteBase { * Attach a provided stream to it's associated StreamingContext as a * [[spark.streaming.TestOutputStream]]. **/ - def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T,This]]( - dstream: JavaDStreamLike[T, This]) = { + def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T, This, R], + R <: spark.api.java.JavaRDDLike[T, R]]( + dstream: JavaDStreamLike[T, This, R]) = { implicit val cm: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] val ostream = new TestOutputStream(dstream.dstream, |