aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-10-24 10:56:24 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2013-10-24 10:56:24 -0700
commitbacfe5ebca8e82317a7596c9fcbf95331c7038a9 (patch)
treea61f11e9c05179b883a767b6b485f562d30dff3f /streaming
parent9fccb17a5f168994fd1ca33199fb43ecbc2bbd92 (diff)
downloadspark-bacfe5ebca8e82317a7596c9fcbf95331c7038a9.tar.gz
spark-bacfe5ebca8e82317a7596c9fcbf95331c7038a9.tar.bz2
spark-bacfe5ebca8e82317a7596c9fcbf95331c7038a9.zip
Added JavaStreamingContext.transform
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala50
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java132
4 files changed, 158 insertions, 33 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 3c466ade93..70bc25070a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -474,10 +474,10 @@ class StreamingContext private (
* the DStreams.
*/
def transform[T: ClassManifest](
- streams: Seq[DStream[_]],
+ dstreams: Seq[DStream[_]],
transformFunc: (Seq[RDD[_]], Time) => RDD[T]
): DStream[T] = {
- new TransformedDStream[T](streams, sparkContext.clean(transformFunc))
+ new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 309c0fa24b..4dd6b7d096 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -711,6 +711,11 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
}
+ /** Convert to a JavaDStream */
+ def toJavaDStream(): JavaDStream[(K, V)] = {
+ new JavaDStream[(K, V)](dstream)
+ }
+
override val classManifest: ClassManifest[(K, V)] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index a4b1670cd4..cf30b541e1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -33,7 +33,7 @@ import twitter4j.auth.Authorization
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
-import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaSparkContext, JavaRDD}
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receivers.{ActorReceiver, ReceiverSupervisorStrategy}
@@ -617,6 +617,54 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
+ * Create a new DStream in which each RDD is generated by applying a function on RDDs of
+ * the DStreams. The order of the JavaRDDs in the transform function parameter will be the
+ * same as the order of corresponding DStreams in the list. Note that for adding a
+ * JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream().
+ * In the transform function, convert the JavaRDD corresponding to that JavaDStream to
+ * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD().
+ */
+ def transform[T](
+ dstreams: JList[JavaDStream[_]],
+ transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaRDD[T]]
+ ): JavaDStream[T] = {
+ implicit val cmt: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ val scalaDStreams = dstreams.map(_.dstream).toSeq
+ val scalaTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
+ val jrdds = rdds.map(rdd => JavaRDD.fromRDD[AnyRef](rdd.asInstanceOf[RDD[AnyRef]])).toList
+ transformFunc.call(jrdds, time).rdd
+ }
+ ssc.transform(scalaDStreams, scalaTransformFunc)
+ }
+
+ /**
+ * Create a new DStream in which each RDD is generated by applying a function on RDDs of
+ * the DStreams. The order of the JavaRDDs in the transform function parameter will be the
+ * same as the order of corresponding DStreams in the list. Note that for adding a
+ * JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream().
+ * In the transform function, convert the JavaRDD corresponding to that JavaDStream to
+ * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD().
+ */
+ def transform[K, V](
+ dstreams: JList[JavaDStream[_]],
+ transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaPairRDD[K, V]]
+ ): JavaPairDStream[K, V] = {
+ implicit val cmk: ClassManifest[K] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
+ implicit val cmv: ClassManifest[V] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ val scalaDStreams = dstreams.map(_.dstream).toSeq
+ val scalaTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
+ val jrdds = rdds.map(rdd => JavaRDD.fromRDD[AnyRef](rdd.asInstanceOf[RDD[AnyRef]])).toList
+ transformFunc.call(jrdds, time).rdd
+ }
+ ssc.transform(scalaDStreams, scalaTransformFunc)
+ }
+
+ /**
* Sets the context to periodically checkpoint the DStream operations for master
* fault-tolerance. The graph will be checkpointed every batch interval.
* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 2f92421367..f588afe90c 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Maps;
import com.google.common.io.Files;
import kafka.serializer.StringDecoder;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.spark.streaming.api.java.JavaDStreamLike;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -292,8 +293,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(7,8,9));
JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc());
- JavaRDD<Integer> rdd1 = ssc.sc().parallelize(Arrays.asList(1,2,3));
- JavaRDD<Integer> rdd2 = ssc.sc().parallelize(Arrays.asList(4,5,6));
+ JavaRDD<Integer> rdd1 = ssc.sc().parallelize(Arrays.asList(1, 2, 3));
+ JavaRDD<Integer> rdd2 = ssc.sc().parallelize(Arrays.asList(4, 5, 6));
JavaRDD<Integer> rdd3 = ssc.sc().parallelize(Arrays.asList(7,8,9));
LinkedList<JavaRDD<Integer>> rdds = Lists.newLinkedList();
@@ -331,7 +332,6 @@ public class JavaAPISuite implements Serializable {
}
});
}
- }
);
JavaTestUtils.attachTestOutputStream(transformed);
@@ -354,7 +354,8 @@ public class JavaAPISuite implements Serializable {
JavaDStream<Integer> transformed1 = stream.transform(
new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
- @Override public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
+ @Override
+ public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
return null;
}
}
@@ -421,51 +422,56 @@ public class JavaAPISuite implements Serializable {
@Test
public void testTransformWith() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
- Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
- new Tuple2<String, String>("new york", "yankees")),
- Arrays.asList(new Tuple2<String, String>("california", "sharks"),
- new Tuple2<String, String>("new york", "rangers")));
+ Arrays.asList(
+ new Tuple2<String, String>("california", "dodgers"),
+ new Tuple2<String, String>("new york", "yankees")),
+ Arrays.asList(
+ new Tuple2<String, String>("california", "sharks"),
+ new Tuple2<String, String>("new york", "rangers")));
List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
- Arrays.asList(new Tuple2<String, String>("california", "giants"),
- new Tuple2<String, String>("new york", "mets")),
- Arrays.asList(new Tuple2<String, String>("california", "ducks"),
- new Tuple2<String, String>("new york", "islanders")));
+ Arrays.asList(
+ new Tuple2<String, String>("california", "giants"),
+ new Tuple2<String, String>("new york", "mets")),
+ Arrays.asList(
+ new Tuple2<String, String>("california", "ducks"),
+ new Tuple2<String, String>("new york", "islanders")));
List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<String, Tuple2<String, String>>("california",
- new Tuple2<String, String>("dodgers", "giants")),
- new Tuple2<String, Tuple2<String, String>>("new york",
- new Tuple2<String, String>("yankees", "mets"))),
- Arrays.asList(
- new Tuple2<String, Tuple2<String, String>>("california",
- new Tuple2<String, String>("sharks", "ducks")),
- new Tuple2<String, Tuple2<String, String>>("new york",
- new Tuple2<String, String>("rangers", "islanders"))));
+ Arrays.asList(
+ new Tuple2<String, Tuple2<String, String>>("california",
+ new Tuple2<String, String>("dodgers", "giants")),
+ new Tuple2<String, Tuple2<String, String>>("new york",
+ new Tuple2<String, String>("yankees", "mets"))),
+ Arrays.asList(
+ new Tuple2<String, Tuple2<String, String>>("california",
+ new Tuple2<String, String>("sharks", "ducks")),
+ new Tuple2<String, Tuple2<String, String>>("new york",
+ new Tuple2<String, String>("rangers", "islanders"))));
JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
- ssc, stringStringKVStream1, 1);
+ ssc, stringStringKVStream1, 1);
JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
- ssc, stringStringKVStream2, 1);
+ ssc, stringStringKVStream2, 1);
JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWith(
pairStream2,
- new Function3 <
+ new Function3<
JavaPairRDD<String, String>,
JavaPairRDD<String, String>,
Time,
JavaPairRDD<String, Tuple2<String, String>>
>() {
- @Override public JavaPairRDD<String, Tuple2<String, String>> call(
+ @Override
+ public JavaPairRDD<String, Tuple2<String, String>> call(
JavaPairRDD<String, String> rdd1,
JavaPairRDD<String, String> rdd2,
Time time
- ) throws Exception {
+ ) throws Exception {
return rdd1.join(rdd2);
}
}
@@ -475,9 +481,9 @@ public class JavaAPISuite implements Serializable {
List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
-
}
+
@Test
public void testVariousTransformWith() {
// tests whether all variations of transformWith can be called from Java
@@ -566,7 +572,6 @@ public class JavaAPISuite implements Serializable {
}
);
-
JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWith(
pairStream2,
new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, Character>, Time, JavaPairRDD<Double, Double>>() {
@@ -578,7 +583,74 @@ public class JavaAPISuite implements Serializable {
);
}
- @Test
+ @Test
+ public void testStreamingContextTransform(){
+ List<List<Integer>> stream1input = Arrays.asList(
+ Arrays.asList(1),
+ Arrays.asList(2)
+ );
+
+ List<List<Integer>> stream2input = Arrays.asList(
+ Arrays.asList(3),
+ Arrays.asList(4)
+ );
+
+ List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList(
+ Arrays.asList(new Tuple2<Integer, String>(1, "x")),
+ Arrays.asList(new Tuple2<Integer, String>(2, "y"))
+ );
+
+ List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(1, new Tuple2<Integer, String>(1, "x"))),
+ Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(2, new Tuple2<Integer, String>(2, "y")))
+ );
+
+ JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1);
+ JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, stream2input, 1);
+ JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream(
+ JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1));
+
+ List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2);
+
+ // This is just to test whether this transform to JavaStream compiles
+ JavaDStream<Long> transformed1 = ssc.transform(
+ listOfDStreams1,
+ new Function2<List<JavaRDD<?>>, Time, JavaRDD<Long>>() {
+ public JavaRDD<Long> call(List<JavaRDD<?>> listOfRDDs, Time time) {
+ assert(listOfRDDs.size() == 2);
+ return null;
+ }
+ }
+ );
+
+ List<JavaDStream<?>> listOfDStreams2 =
+ Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream());
+
+ JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transform(
+ listOfDStreams2,
+ new Function2<List<JavaRDD<?>>, Time, JavaPairRDD<Integer, Tuple2<Integer, String>>>() {
+ public JavaPairRDD<Integer, Tuple2<Integer, String>> call(List<JavaRDD<?>> listOfRDDs, Time time) {
+ assert(listOfRDDs.size() == 3);
+ JavaRDD<Integer> rdd1 = (JavaRDD<Integer>)listOfRDDs.get(0);
+ JavaRDD<Integer> rdd2 = (JavaRDD<Integer>)listOfRDDs.get(1);
+ JavaRDD<Tuple2<Integer, String>> rdd3 = (JavaRDD<Tuple2<Integer, String>>)listOfRDDs.get(2);
+ JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
+ PairFunction<Integer, Integer, Integer> mapToTuple = new PairFunction<Integer, Integer, Integer>() {
+ @Override
+ public Tuple2<Integer, Integer> call(Integer i) throws Exception {
+ return new Tuple2<Integer, Integer>(i, i);
+ }
+ };
+ return rdd1.union(rdd2).map(mapToTuple).join(prdd3);
+ }
+ }
+ );
+ JavaTestUtils.attachTestOutputStream(transformed2);
+ List<List<Tuple2<Integer, Tuple2<Integer, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
public void testFlatMap() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("go", "giants"),