aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-02-16 15:56:04 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-02-19 08:30:32 -0800
commit9d49a6b03fb91d516bf40e50f67e87155c69dba1 (patch)
tree4b019f70e3ab292d001008ff98f6e020da726eb7 /streaming
parent8b9c673fce1c733c7fcd8b978e84f943be9e9e35 (diff)
downloadspark-9d49a6b03fb91d516bf40e50f67e87155c69dba1.tar.gz
spark-9d49a6b03fb91d516bf40e50f67e87155c69dba1.tar.bz2
spark-9d49a6b03fb91d516bf40e50f67e87155c69dba1.zip
Use RDD type for `foreach` operator in Java.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala5
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala15
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala2
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java1
-rw-r--r--streaming/src/test/java/spark/streaming/JavaTestUtils.scala5
6 files changed, 21 insertions, 11 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
index 30985b4ebc..51efe6cae8 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
@@ -4,6 +4,7 @@ import spark.streaming.{Duration, Time, DStream}
import spark.api.java.function.{Function => JFunction}
import spark.api.java.JavaRDD
import spark.storage.StorageLevel
+import spark.RDD
/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
@@ -26,7 +27,9 @@ import spark.storage.StorageLevel
* - A function that is used to generate an RDD after each time interval
*/
class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T])
- extends JavaDStreamLike[T, JavaDStream[T]] {
+ extends JavaDStreamLike[T, JavaDStream[T], JavaRDD[T]] {
+
+ override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd)
/** Return a new DStream containing only the elements that satisfy a predicate. */
def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] =
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
index 1c1ba05ff9..4e1458ca9e 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
@@ -6,17 +6,20 @@ import java.lang.{Long => JLong}
import scala.collection.JavaConversions._
import spark.streaming._
-import spark.api.java.JavaRDD
+import spark.api.java.{JavaRDDLike, JavaRDD}
import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
import java.util
import spark.RDD
import JavaDStream._
-trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable {
+trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]
+ extends Serializable {
implicit val classManifest: ClassManifest[T]
def dstream: DStream[T]
+ def wrapRDD(in: RDD[T]): R
+
implicit def scalaIntToJavaLong(in: DStream[Long]): JavaDStream[JLong] = {
in.map(new JLong(_))
}
@@ -220,16 +223,16 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
* Apply a function to each RDD in this DStream. This is an output operator, so
* this DStream will be registered as an output stream and therefore materialized.
*/
- def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) {
- dstream.foreach(rdd => foreachFunc.call(new JavaRDD(rdd)))
+ def foreach(foreachFunc: JFunction[R, Void]) {
+ dstream.foreach(rdd => foreachFunc.call(wrapRDD(rdd)))
}
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* this DStream will be registered as an output stream and therefore materialized.
*/
- def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) {
- dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time))
+ def foreach(foreachFunc: JFunction2[R, Time, Void]) {
+ dstream.foreach((rdd, time) => foreachFunc.call(wrapRDD(rdd), time))
}
/**
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
index 952ca657bf..de3e802300 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -19,7 +19,9 @@ import com.google.common.base.Optional
class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
implicit val kManifiest: ClassManifest[K],
implicit val vManifest: ClassManifest[V])
- extends JavaDStreamLike[(K, V), JavaPairDStream[K, V]] {
+ extends JavaDStreamLike[(K, V), JavaPairDStream[K, V], JavaPairRDD[K, V]] {
+
+ override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
// =======================================================================
// Methods common to all DStream's
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index d9a676819a..878e179589 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -254,7 +254,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Registers an output stream that will be computed every interval
*/
- def registerOutputStream(outputStream: JavaDStreamLike[_, _]) {
+ def registerOutputStream(outputStream: JavaDStreamLike[_, _, _]) {
ssc.registerOutputStream(outputStream.dstream)
}
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 5d510fd89f..4fe2de5a1a 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -12,6 +12,7 @@ import org.junit.Test;
import scala.Tuple2;
import spark.HashPartitioner;
import spark.api.java.JavaRDD;
+import spark.api.java.JavaRDDLike;
import spark.api.java.JavaSparkContext;
import spark.api.java.function.*;
import spark.storage.StorageLevel;
diff --git a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
index 52ea28732a..64a7e7cbf9 100644
--- a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
@@ -31,8 +31,9 @@ trait JavaTestBase extends TestSuiteBase {
* Attach a provided stream to it's associated StreamingContext as a
* [[spark.streaming.TestOutputStream]].
**/
- def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T,This]](
- dstream: JavaDStreamLike[T, This]) = {
+ def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T, This, R],
+ R <: spark.api.java.JavaRDDLike[T, R]](
+ dstream: JavaDStreamLike[T, This, R]) = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
val ostream = new TestOutputStream(dstream.dstream,