aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-14 21:30:55 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-14 21:30:55 -0800
commit3bcc6e5c0395b7478bc19572cbef3958f13daf6e (patch)
treeec7a7981f7025d478a3656e2e2b2078cd3e1e5bb
parente8663e0fe5550d3ba0046ac2ba464524aef8d0cd (diff)
parent3f3e77f28b08fc1db110c3b14b2c90eaa6dca8ef (diff)
downloadspark-3bcc6e5c0395b7478bc19572cbef3958f13daf6e.tar.gz
spark-3bcc6e5c0395b7478bc19572cbef3958f13daf6e.tar.bz2
spark-3bcc6e5c0395b7478bc19572cbef3958f13daf6e.zip
Merge pull request #466 from pwendell/java-stream-transform
STREAMING-50: Support transform workaround in JavaPairDStream
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala34
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java45
2 files changed, 77 insertions, 2 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
index ef10c091ca..eb2495e3ac 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -8,11 +8,11 @@ import scala.collection.JavaConversions._
import spark.streaming._
import spark.streaming.StreamingContext._
import spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
-import spark.Partitioner
+import spark.{RDD, Partitioner}
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.conf.Configuration
-import spark.api.java.JavaPairRDD
+import spark.api.java.{JavaRDD, JavaPairRDD}
import spark.storage.StorageLevel
import com.google.common.base.Optional
@@ -81,6 +81,36 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
def union(that: JavaPairDStream[K, V]): JavaPairDStream[K, V] =
dstream.union(that.dstream)
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this DStream.
+ */
+ def transform[K2, V2](transformFunc: JFunction[JavaPairRDD[K, V], JavaPairRDD[K2, V2]]):
+ JavaPairDStream[K2, V2] = {
+ implicit val cmk: ClassManifest[K2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
+ implicit val cmv: ClassManifest[V2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
+ def scalaTransform (in: RDD[(K, V)]): RDD[(K2, V2)] =
+ transformFunc.call(new JavaPairRDD[K, V](in)).rdd
+ dstream.transform(scalaTransform(_))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this DStream.
+ */
+ def transform[K2, V2](transformFunc: JFunction2[JavaPairRDD[K, V], Time, JavaPairRDD[K2, V2]]):
+ JavaPairDStream[K2, V2] = {
+ implicit val cmk: ClassManifest[K2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
+ implicit val cmv: ClassManifest[V2] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
+ def scalaTransform (in: RDD[(K, V)], time: Time): RDD[(K2, V2)] =
+ transformFunc.call(new JavaPairRDD[K, V](in), time).rdd
+ dstream.transform(scalaTransform(_, _))
+ }
+
// =======================================================================
// Methods only for PairDStream's
// =======================================================================
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 9bfcd83e4d..7b385f609d 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -11,6 +11,7 @@ import org.junit.Before;
import org.junit.Test;
import scala.Tuple2;
import spark.HashPartitioner;
+import spark.api.java.JavaPairRDD;
import spark.api.java.JavaRDD;
import spark.api.java.JavaSparkContext;
import spark.api.java.function.*;
@@ -873,6 +874,50 @@ public class JavaAPISuite implements Serializable {
}
@Test
+ public void testPairTransform() {
+ List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(1, 5),
+ new Tuple2<Integer, Integer>(4, 5),
+ new Tuple2<Integer, Integer>(2, 5)),
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(2, 5),
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(4, 5),
+ new Tuple2<Integer, Integer>(1, 5)));
+
+ List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(1, 5),
+ new Tuple2<Integer, Integer>(2, 5),
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(4, 5)),
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(1, 5),
+ new Tuple2<Integer, Integer>(2, 5),
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(4, 5)));
+
+ JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<Integer, Integer> sorted = pairStream.transform(
+ new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, Integer>>() {
+ @Override
+ public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception {
+ return in.sortByKey();
+ }
+ });
+
+ JavaTestUtils.attachTestOutputStream(sorted);
+ List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
public void testMapValues() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;