From 3f3e77f28b08fc1db110c3b14b2c90eaa6dca8ef Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Tue, 12 Feb 2013 13:57:57 -0800 Subject: STREAMING-50: Support transform workaround in JavaPairDStream This ports a useful workaround (the `transform` function) to JavaPairDStream. It is necessary to do things like sorting which are not supported yet in the core streaming API. --- .../spark/streaming/api/java/JavaPairDStream.scala | 34 +++++++++++++++- .../test/java/spark/streaming/JavaAPISuite.java | 45 ++++++++++++++++++++++ 2 files changed, 77 insertions(+), 2 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala index ef10c091ca..eb2495e3ac 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala @@ -8,11 +8,11 @@ import scala.collection.JavaConversions._ import spark.streaming._ import spark.streaming.StreamingContext._ import spark.api.java.function.{Function => JFunction, Function2 => JFunction2} -import spark.Partitioner +import spark.{RDD, Partitioner} import org.apache.hadoop.mapred.{JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} import org.apache.hadoop.conf.Configuration -import spark.api.java.JavaPairRDD +import spark.api.java.{JavaRDD, JavaPairRDD} import spark.storage.StorageLevel import com.google.common.base.Optional @@ -81,6 +81,36 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( def union(that: JavaPairDStream[K, V]): JavaPairDStream[K, V] = dstream.union(that.dstream) + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this DStream. + */ + def transform[K2, V2](transformFunc: JFunction[JavaPairRDD[K, V], JavaPairRDD[K2, V2]]): + JavaPairDStream[K2, V2] = { + implicit val cmk: ClassManifest[K2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] + implicit val cmv: ClassManifest[V2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] + def scalaTransform (in: RDD[(K, V)]): RDD[(K2, V2)] = + transformFunc.call(new JavaPairRDD[K, V](in)).rdd + dstream.transform(scalaTransform(_)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this DStream. + */ + def transform[K2, V2](transformFunc: JFunction2[JavaPairRDD[K, V], Time, JavaPairRDD[K2, V2]]): + JavaPairDStream[K2, V2] = { + implicit val cmk: ClassManifest[K2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] + implicit val cmv: ClassManifest[V2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] + def scalaTransform (in: RDD[(K, V)], time: Time): RDD[(K2, V2)] = + transformFunc.call(new JavaPairRDD[K, V](in), time).rdd + dstream.transform(scalaTransform(_, _)) + } + // ======================================================================= // Methods only for PairDStream's // ======================================================================= diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 9bfcd83e4d..7b385f609d 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -11,6 +11,7 @@ import org.junit.Before; import org.junit.Test; import scala.Tuple2; import spark.HashPartitioner; +import spark.api.java.JavaPairRDD; import spark.api.java.JavaRDD; import spark.api.java.JavaSparkContext; import spark.api.java.function.*; @@ -872,6 +873,50 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(expected, result); } + @Test + public void testPairTransform() { + List>> inputData = Arrays.asList( + Arrays.asList( + new Tuple2(3, 5), + new Tuple2(1, 5), + new Tuple2(4, 5), + new Tuple2(2, 5)), + Arrays.asList( + new Tuple2(2, 5), + new Tuple2(3, 5), + new Tuple2(4, 5), + new Tuple2(1, 5))); + + List>> expected = Arrays.asList( + Arrays.asList( + new Tuple2(1, 5), + new Tuple2(2, 5), + new Tuple2(3, 5), + new Tuple2(4, 5)), + Arrays.asList( + new Tuple2(1, 5), + new Tuple2(2, 5), + new Tuple2(3, 5), + new Tuple2(4, 5))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream sorted = pairStream.transform( + new Function, JavaPairRDD>() { + @Override + public JavaPairRDD call(JavaPairRDD in) throws Exception { + return in.sortByKey(); + } + }); + + JavaTestUtils.attachTestOutputStream(sorted); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + @Test public void testMapValues() { List>> inputData = stringStringKVStream; -- cgit v1.2.3