From 9d49a6b03fb91d516bf40e50f67e87155c69dba1 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sat, 16 Feb 2013 15:56:04 -0800 Subject: Use RDD type for `foreach` operator in Java. --- streaming/src/test/java/spark/streaming/JavaAPISuite.java | 1 + streaming/src/test/java/spark/streaming/JavaTestUtils.scala | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) (limited to 'streaming/src/test/java') diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 5d510fd89f..4fe2de5a1a 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -12,6 +12,7 @@ import org.junit.Test; import scala.Tuple2; import spark.HashPartitioner; import spark.api.java.JavaRDD; +import spark.api.java.JavaRDDLike; import spark.api.java.JavaSparkContext; import spark.api.java.function.*; import spark.storage.StorageLevel; diff --git a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala index 52ea28732a..64a7e7cbf9 100644 --- a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala @@ -31,8 +31,9 @@ trait JavaTestBase extends TestSuiteBase { * Attach a provided stream to it's associated StreamingContext as a * [[spark.streaming.TestOutputStream]]. **/ - def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T,This]]( - dstream: JavaDStreamLike[T, This]) = { + def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T, This, R], + R <: spark.api.java.JavaRDDLike[T, R]]( + dstream: JavaDStreamLike[T, This, R]) = { implicit val cm: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] val ostream = new TestOutputStream(dstream.dstream, -- cgit v1.2.3 From 35880de42edb30cf705036083710c85a74a351fa Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sat, 16 Feb 2013 16:36:12 -0800 Subject: Use RDD type for `transform` operator in Java. This is an improved implementation of the `transform` operator in Java. The main difference is that this allows all four possible types of transform functions 1. JavaRDD -> JavaRDD 2. JavaRDD -> JavaPairRDD 3. JavaPairRDD -> JavaPairRDD 4. JavaPairRDD -> JavaRDD whereas previously only (1) and (3) were possible. Conflicts: streaming/src/test/java/spark/streaming/JavaAPISuite.java --- .../spark/streaming/api/java/JavaDStreamLike.scala | 40 ++++++++-- .../test/java/spark/streaming/JavaAPISuite.java | 89 +++++++++++++++++++++- 2 files changed, 122 insertions(+), 7 deletions(-) (limited to 'streaming/src/test/java') diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala index 4e1458ca9e..f7b1704884 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala @@ -6,7 +6,7 @@ import java.lang.{Long => JLong} import scala.collection.JavaConversions._ import spark.streaming._ -import spark.api.java.{JavaRDDLike, JavaRDD} +import spark.api.java.{JavaPairRDD, JavaRDDLike, JavaRDD} import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _} import java.util import spark.RDD @@ -239,11 +239,11 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * Return a new DStream in which each RDD is generated by applying a function * on each RDD of this DStream. */ - def transform[U](transformFunc: JFunction[JavaRDD[T], JavaRDD[U]]): JavaDStream[U] = { + def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = { implicit val cm: ClassManifest[U] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] def scalaTransform (in: RDD[T]): RDD[U] = - transformFunc.call(new JavaRDD[T](in)).rdd + transformFunc.call(wrapRDD(in)).rdd dstream.transform(scalaTransform(_)) } @@ -251,11 +251,41 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * Return a new DStream in which each RDD is generated by applying a function * on each RDD of this DStream. */ - def transform[U](transformFunc: JFunction2[JavaRDD[T], Time, JavaRDD[U]]): JavaDStream[U] = { + def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = { implicit val cm: ClassManifest[U] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] def scalaTransform (in: RDD[T], time: Time): RDD[U] = - transformFunc.call(new JavaRDD[T](in), time).rdd + transformFunc.call(wrapRDD(in), time).rdd + dstream.transform(scalaTransform(_, _)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this DStream. + */ + def transform[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]): + JavaPairDStream[K2, V2] = { + implicit val cmk: ClassManifest[K2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] + implicit val cmv: ClassManifest[V2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] + def scalaTransform (in: RDD[T]): RDD[(K2, V2)] = + transformFunc.call(wrapRDD(in)).rdd + dstream.transform(scalaTransform(_)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this DStream. + */ + def transform[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]): + JavaPairDStream[K2, V2] = { + implicit val cmk: ClassManifest[K2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] + implicit val cmv: ClassManifest[V2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] + def scalaTransform (in: RDD[T], time: Time): RDD[(K2, V2)] = + transformFunc.call(wrapRDD(in), time).rdd dstream.transform(scalaTransform(_, _)) } diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 4fe2de5a1a..9be680dbdc 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -294,8 +294,9 @@ public class JavaAPISuite implements Serializable { Arrays.asList(6,7,8), Arrays.asList(9,10,11)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream transformed = stream.transform(new Function, JavaRDD>() { + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream transformed = + stream.transform(new Function, JavaRDD>() { @Override public JavaRDD call(JavaRDD in) throws Exception { return in.map(new Function() { @@ -742,6 +743,90 @@ public class JavaAPISuite implements Serializable { } @Test + public void testPairTransform() { + List>> inputData = Arrays.asList( + Arrays.asList( + new Tuple2(3, 5), + new Tuple2(1, 5), + new Tuple2(4, 5), + new Tuple2(2, 5)), + Arrays.asList( + new Tuple2(2, 5), + new Tuple2(3, 5), + new Tuple2(4, 5), + new Tuple2(1, 5))); + + List>> expected = Arrays.asList( + Arrays.asList( + new Tuple2(1, 5), + new Tuple2(2, 5), + new Tuple2(3, 5), + new Tuple2(4, 5)), + Arrays.asList( + new Tuple2(1, 5), + new Tuple2(2, 5), + new Tuple2(3, 5), + new Tuple2(4, 5))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream sorted = pairStream.transform( + new Function, JavaPairRDD>() { + @Override + public JavaPairRDD call(JavaPairRDD in) throws Exception { + return in.sortByKey(); + } + }); + + JavaTestUtils.attachTestOutputStream(sorted); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testPairToNormalRDDTransform() { + List>> inputData = Arrays.asList( + Arrays.asList( + new Tuple2(3, 5), + new Tuple2(1, 5), + new Tuple2(4, 5), + new Tuple2(2, 5)), + Arrays.asList( + new Tuple2(2, 5), + new Tuple2(3, 5), + new Tuple2(4, 5), + new Tuple2(1, 5))); + + List> expected = Arrays.asList( + Arrays.asList(3,1,4,2), + Arrays.asList(2,3,4,1)); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaDStream firstParts = pairStream.transform( + new Function, JavaRDD>() { + @Override + public JavaRDD call(JavaPairRDD in) throws Exception { + return in.map(new Function, Integer>() { + @Override + public Integer call(Tuple2 in) { + return in._1(); + } + }); + } + }); + + JavaTestUtils.attachTestOutputStream(firstParts); + List> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + public void testMapValues() { List>> inputData = stringStringKVStream; -- cgit v1.2.3 From 041c19e5f0309e7a667faab5fee9f9081db58237 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Tue, 19 Feb 2013 08:44:20 -0800 Subject: Small changes that were missing in merge --- streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala | 1 + streaming/src/test/java/spark/streaming/JavaAPISuite.java | 1 + 2 files changed, 2 insertions(+) (limited to 'streaming/src/test/java') diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala index de3e802300..c1c8783559 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala @@ -15,6 +15,7 @@ import org.apache.hadoop.conf.Configuration import spark.api.java.JavaPairRDD import spark.storage.StorageLevel import com.google.common.base.Optional +import spark.RDD class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( implicit val kManifiest: ClassManifest[K], diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 9be680dbdc..53fac14386 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -13,6 +13,7 @@ import scala.Tuple2; import spark.HashPartitioner; import spark.api.java.JavaRDD; import spark.api.java.JavaRDDLike; +import spark.api.java.JavaPairRDD; import spark.api.java.JavaSparkContext; import spark.api.java.function.*; import spark.storage.StorageLevel; -- cgit v1.2.3