aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-20 11:06:01 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-20 11:06:01 -0800
commit972fe7714fb57385d14e4d776f70e0040d3a8f7e (patch)
tree5b16bdd3dd1b2bff6a8fea51a351fd6bd5fb28cc /streaming/src/test/java
parent334ab9244113e4b792fd51697ef80ab0d3b3de25 (diff)
parent991d3342fed1cf5626142bc90872f79618ea94c8 (diff)
downloadspark-972fe7714fb57385d14e4d776f70e0040d3a8f7e.tar.gz
spark-972fe7714fb57385d14e4d776f70e0040d3a8f7e.tar.bz2
spark-972fe7714fb57385d14e4d776f70e0040d3a8f7e.zip
Merge branch 'mesos-streaming' into streaming
Conflicts: streaming/src/test/java/spark/streaming/JavaAPISuite.java
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java47
-rw-r--r--streaming/src/test/java/spark/streaming/JavaTestUtils.scala5
2 files changed, 48 insertions, 4 deletions
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 17cd5ed795..4530af5f6a 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -13,6 +13,8 @@ import scala.Tuple2;
import spark.HashPartitioner;
import spark.api.java.JavaPairRDD;
import spark.api.java.JavaRDD;
+import spark.api.java.JavaRDDLike;
+import spark.api.java.JavaPairRDD;
import spark.api.java.JavaSparkContext;
import spark.api.java.function.*;
import spark.storage.StorageLevel;
@@ -294,8 +296,9 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(6,7,8),
Arrays.asList(9,10,11));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream transformed = stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> transformed =
+ stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
@Override
public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
return in.map(new Function<Integer, Integer>() {
@@ -921,6 +924,46 @@ public class JavaAPISuite implements Serializable {
}
@Test
+ public void testPairToNormalRDDTransform() {
+ List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(1, 5),
+ new Tuple2<Integer, Integer>(4, 5),
+ new Tuple2<Integer, Integer>(2, 5)),
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(2, 5),
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(4, 5),
+ new Tuple2<Integer, Integer>(1, 5)));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(3,1,4,2),
+ Arrays.asList(2,3,4,1));
+
+ JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaDStream<Integer> firstParts = pairStream.transform(
+ new Function<JavaPairRDD<Integer, Integer>, JavaRDD<Integer>>() {
+ @Override
+ public JavaRDD<Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception {
+ return in.map(new Function<Tuple2<Integer, Integer>, Integer>() {
+ @Override
+ public Integer call(Tuple2<Integer, Integer> in) {
+ return in._1();
+ }
+ });
+ }
+ });
+
+ JavaTestUtils.attachTestOutputStream(firstParts);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
public void testMapValues() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
diff --git a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
index 52ea28732a..64a7e7cbf9 100644
--- a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
@@ -31,8 +31,9 @@ trait JavaTestBase extends TestSuiteBase {
* Attach a provided stream to it's associated StreamingContext as a
* [[spark.streaming.TestOutputStream]].
**/
- def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T,This]](
- dstream: JavaDStreamLike[T, This]) = {
+ def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T, This, R],
+ R <: spark.api.java.JavaRDDLike[T, R]](
+ dstream: JavaDStreamLike[T, This, R]) = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
val ostream = new TestOutputStream(dstream.dstream,