aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-17 21:07:09 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-17 21:25:49 -0800
commite0165bf7141086e28f88cd68ab7bc6249061c924 (patch)
treeeed0fbf2db1b1b93395632e57985a4fd072c76f6 /streaming
parent6fba7683c29b64a33a6daa28cc56bb5d20574314 (diff)
downloadspark-e0165bf7141086e28f88cd68ab7bc6249061c924.tar.gz
spark-e0165bf7141086e28f88cd68ab7bc6249061c924.tar.bz2
spark-e0165bf7141086e28f88cd68ab7bc6249061c924.zip
Adding queueStream and some slight refactoring
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala58
-rw-r--r--streaming/src/test/scala/spark/streaming/JavaAPISuite.java186
2 files changed, 163 insertions, 81 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index accac82e09..f82e6a37cc 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -10,6 +10,7 @@ import spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import java.io.InputStream
import java.util.{Map => JMap}
+import spark.api.java.{JavaSparkContext, JavaRDD}
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -39,6 +40,9 @@ class JavaStreamingContext(val ssc: StreamingContext) {
*/
def this(path: String) = this (new StreamingContext(path))
+ /** The underlying SparkContext */
+ val sc: JavaSparkContext = new JavaSparkContext(ssc.sc)
+
/**
* Create an input stream that pulls messages form a Kafka Broker.
* @param hostname Zookeper hostname.
@@ -255,6 +259,60 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
+ * Creates a input stream from an queue of RDDs. In each batch,
+ * it will process either one or all of the RDDs returned by the queue.
+ *
+ * NOTE: changes to the queue after the stream is created will not be recognized.
+ * @param queue Queue of RDDs
+ * @tparam T Type of objects in the RDD
+ */
+ def queueStream[T](queue: java.util.Queue[JavaRDD[T]]): JavaDStream[T] = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ val sQueue = new scala.collection.mutable.Queue[spark.RDD[T]]
+ sQueue.enqueue(queue.map(_.rdd).toSeq: _*)
+ ssc.queueStream(sQueue)
+ }
+
+ /**
+ * Creates a input stream from an queue of RDDs. In each batch,
+ * it will process either one or all of the RDDs returned by the queue.
+ *
+ * NOTE: changes to the queue after the stream is created will not be recognized.
+ * @param queue Queue of RDDs
+ * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
+ * @tparam T Type of objects in the RDD
+ */
+ def queueStream[T](queue: java.util.Queue[JavaRDD[T]], oneAtATime: Boolean): JavaDStream[T] = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ val sQueue = new scala.collection.mutable.Queue[spark.RDD[T]]
+ sQueue.enqueue(queue.map(_.rdd).toSeq: _*)
+ ssc.queueStream(sQueue, oneAtATime)
+ }
+
+ /**
+ * Creates a input stream from an queue of RDDs. In each batch,
+ * it will process either one or all of the RDDs returned by the queue.
+ *
+ * NOTE: changes to the queue after the stream is created will not be recognized.
+ * @param queue Queue of RDDs
+ * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
+ * @param defaultRDD Default RDD is returned by the DStream when the queue is empty
+ * @tparam T Type of objects in the RDD
+ */
+ def queueStream[T](
+ queue: java.util.Queue[JavaRDD[T]],
+ oneAtATime: Boolean,
+ defaultRDD: JavaRDD[T]): JavaDStream[T] = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ val sQueue = new scala.collection.mutable.Queue[spark.RDD[T]]
+ sQueue.enqueue(queue.map(_.rdd).toSeq: _*)
+ ssc.queueStream(sQueue, oneAtATime, defaultRDD.rdd)
+ }
+
+ /**
* Sets the context to periodically checkpoint the DStream operations for master
* fault-tolerance. By default, the graph will be checkpointed every batch interval.
* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
index 374793b57e..8c94e13e65 100644
--- a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
@@ -12,6 +12,7 @@ import org.junit.Test;
import scala.Tuple2;
import spark.HashPartitioner;
import spark.api.java.JavaRDD;
+import spark.api.java.JavaSparkContext;
import spark.api.java.function.*;
import spark.storage.StorageLevel;
import spark.streaming.api.java.JavaDStream;
@@ -28,17 +29,17 @@ import java.util.*;
// serialized, as an alternative to converting these anonymous classes to static inner classes;
// see http://stackoverflow.com/questions/758570/.
public class JavaAPISuite implements Serializable {
- private transient JavaStreamingContext sc;
+ private transient JavaStreamingContext ssc;
@Before
public void setUp() {
- sc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
}
@After
public void tearDown() {
- sc.stop();
- sc = null;
+ ssc.stop();
+ ssc = null;
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.master.port");
}
@@ -55,10 +56,10 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(3L),
Arrays.asList(1L));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream count = stream.count();
JavaTestUtils.attachTestOutputStream(count);
- List<List<Long>> result = JavaTestUtils.runStreams(sc, 3, 3);
+ List<List<Long>> result = JavaTestUtils.runStreams(ssc, 3, 3);
assertOrderInvariantEquals(expected, result);
}
@@ -72,7 +73,7 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(5,5),
Arrays.asList(9,4));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream letterCount = stream.map(new Function<String, Integer>() {
@Override
public Integer call(String s) throws Exception {
@@ -80,7 +81,7 @@ public class JavaAPISuite implements Serializable {
}
});
JavaTestUtils.attachTestOutputStream(letterCount);
- List<List<Integer>> result = JavaTestUtils.runStreams(sc, 2, 2);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
assertOrderInvariantEquals(expected, result);
}
@@ -98,10 +99,10 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(7,8,9,4,5,6),
Arrays.asList(7,8,9));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream windowed = stream.window(new Duration(2000));
JavaTestUtils.attachTestOutputStream(windowed);
- List<List<Integer>> result = JavaTestUtils.runStreams(sc, 4, 4);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
assertOrderInvariantEquals(expected, result);
}
@@ -122,10 +123,10 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(7,8,9,10,11,12,13,14,15,16,17,18),
Arrays.asList(13,14,15,16,17,18));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream windowed = stream.window(new Duration(4000), new Duration(2000));
JavaTestUtils.attachTestOutputStream(windowed);
- List<List<Integer>> result = JavaTestUtils.runStreams(sc, 8, 4);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 8, 4);
assertOrderInvariantEquals(expected, result);
}
@@ -145,10 +146,10 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(7,8,9,10,11,12),
Arrays.asList(13,14,15,16,17,18));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream windowed = stream.tumble(new Duration(2000));
JavaTestUtils.attachTestOutputStream(windowed);
- List<List<Integer>> result = JavaTestUtils.runStreams(sc, 6, 3);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 6, 3);
assertOrderInvariantEquals(expected, result);
}
@@ -163,7 +164,7 @@ public class JavaAPISuite implements Serializable {
Arrays.asList("giants"),
Arrays.asList("yankees"));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream filtered = stream.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String s) throws Exception {
@@ -171,7 +172,7 @@ public class JavaAPISuite implements Serializable {
}
});
JavaTestUtils.attachTestOutputStream(filtered);
- List<List<String>> result = JavaTestUtils.runStreams(sc, 2, 2);
+ List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
assertOrderInvariantEquals(expected, result);
}
@@ -186,10 +187,10 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(Arrays.asList("giants", "dodgers")),
Arrays.asList(Arrays.asList("yankees", "red socks")));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream glommed = stream.glom();
JavaTestUtils.attachTestOutputStream(glommed);
- List<List<List<String>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+ List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@@ -204,7 +205,7 @@ public class JavaAPISuite implements Serializable {
Arrays.asList("GIANTSDODGERS"),
Arrays.asList("YANKEESRED SOCKS"));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
@Override
public Iterable<String> call(Iterator<String> in) {
@@ -216,7 +217,7 @@ public class JavaAPISuite implements Serializable {
}
});
JavaTestUtils.attachTestOutputStream(mapped);
- List<List<List<String>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+ List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@@ -247,10 +248,10 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(15),
Arrays.asList(24));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream reduced = stream.reduce(new IntegerSum());
JavaTestUtils.attachTestOutputStream(reduced);
- List<List<Integer>> result = JavaTestUtils.runStreams(sc, 3, 3);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
Assert.assertEquals(expected, result);
}
@@ -268,16 +269,39 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(39),
Arrays.asList(24));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream reducedWindowed = stream.reduceByWindow(new IntegerSum(),
new IntegerDifference(), new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(reducedWindowed);
- List<List<Integer>> result = JavaTestUtils.runStreams(sc, 4, 4);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
Assert.assertEquals(expected, result);
}
@Test
+ public void testQueueStream() {
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(1,2,3),
+ Arrays.asList(4,5,6),
+ Arrays.asList(7,8,9));
+
+ JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc());
+ JavaRDD<Integer> rdd1 = ssc.sc().parallelize(Arrays.asList(1,2,3));
+ JavaRDD<Integer> rdd2 = ssc.sc().parallelize(Arrays.asList(4,5,6));
+ JavaRDD<Integer> rdd3 = ssc.sc().parallelize(Arrays.asList(7,8,9));
+
+ LinkedList<JavaRDD<Integer>> rdds = Lists.newLinkedList();
+ rdds.add(rdd1);
+ rdds.add(rdd2);
+ rdds.add(rdd3);
+
+ JavaDStream<Integer> stream = ssc.queueStream(rdds);
+ JavaTestUtils.attachTestOutputStream(stream);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
public void testTransform() {
List<List<Integer>> inputData = Arrays.asList(
Arrays.asList(1,2,3),
@@ -289,7 +313,7 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(6,7,8),
Arrays.asList(9,10,11));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream transformed = stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
@Override
public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
@@ -301,7 +325,7 @@ public class JavaAPISuite implements Serializable {
});
}});
JavaTestUtils.attachTestOutputStream(transformed);
- List<List<Integer>> result = JavaTestUtils.runStreams(sc, 3, 3);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
assertOrderInvariantEquals(expected, result);
}
@@ -318,7 +342,7 @@ public class JavaAPISuite implements Serializable {
Arrays.asList("b", "o", "o", "d","o","d","g","e","r","s"),
Arrays.asList("a","t","h","l","e","t","i","c","s"));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream flatMapped = stream.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
@@ -326,7 +350,7 @@ public class JavaAPISuite implements Serializable {
}
});
JavaTestUtils.attachTestOutputStream(flatMapped);
- List<List<String>> result = JavaTestUtils.runStreams(sc, 3, 3);
+ List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3);
assertOrderInvariantEquals(expected, result);
}
@@ -365,7 +389,7 @@ public class JavaAPISuite implements Serializable {
new Tuple2<Integer, String>(9, "c"),
new Tuple2<Integer, String>(9, "s")));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() {
@Override
public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
@@ -377,7 +401,7 @@ public class JavaAPISuite implements Serializable {
}
});
JavaTestUtils.attachTestOutputStream(flatMapped);
- List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(sc, 3, 3);
+ List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
Assert.assertEquals(expected, result);
}
@@ -399,12 +423,12 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(2,2,5,5),
Arrays.asList(3,3,6,6));
- JavaDStream stream1 = JavaTestUtils.attachTestInputStream(sc, inputData1, 2);
- JavaDStream stream2 = JavaTestUtils.attachTestInputStream(sc, inputData2, 2);
+ JavaDStream stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2);
+ JavaDStream stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2);
JavaDStream unioned = stream1.union(stream2);
JavaTestUtils.attachTestOutputStream(unioned);
- List<List<Integer>> result = JavaTestUtils.runStreams(sc, 3, 3);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
assertOrderInvariantEquals(expected, result);
}
@@ -436,7 +460,7 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(new Tuple2<String, Integer>("giants", 6)),
Arrays.asList(new Tuple2<String, Integer>("yankees", 7)));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = stream.map(
new PairFunction<String, String, Integer>() {
@Override
@@ -453,7 +477,7 @@ public class JavaAPISuite implements Serializable {
}
});
JavaTestUtils.attachTestOutputStream(filtered);
- List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@@ -492,12 +516,12 @@ public class JavaAPISuite implements Serializable {
new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")),
new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders"))));
- JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, List<String>> grouped = pairStream.groupByKey();
JavaTestUtils.attachTestOutputStream(grouped);
- List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+ List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@@ -515,13 +539,13 @@ public class JavaAPISuite implements Serializable {
new Tuple2<String, Integer>("new york", 4)));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
- sc, inputData, 1);
+ ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey(new IntegerSum());
JavaTestUtils.attachTestOutputStream(reduced);
- List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@@ -539,7 +563,7 @@ public class JavaAPISuite implements Serializable {
new Tuple2<String, Integer>("new york", 4)));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
- sc, inputData, 1);
+ ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(
@@ -551,7 +575,7 @@ public class JavaAPISuite implements Serializable {
}, new IntegerSum(), new IntegerSum(), new HashPartitioner(2));
JavaTestUtils.attachTestOutputStream(combined);
- List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@@ -569,12 +593,12 @@ public class JavaAPISuite implements Serializable {
new Tuple2<String, Long>("new york", 2L)));
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
- sc, inputData, 1);
+ ssc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Long> counted = pairStream.countByKey();
JavaTestUtils.attachTestOutputStream(counted);
- List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+ List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@@ -592,13 +616,13 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")),
new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders"))));
- JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, List<String>> groupWindowed =
pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(groupWindowed);
- List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(sc, 3, 3);
+ List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
Assert.assertEquals(expected, result);
}
@@ -615,13 +639,13 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(new Tuple2<String, Integer>("california", 10),
new Tuple2<String, Integer>("new york", 4)));
- JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> reduceWindowed =
pairStream.reduceByKeyAndWindow(new IntegerSum(), new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(reduceWindowed);
- List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 3, 3);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
Assert.assertEquals(expected, result);
}
@@ -638,7 +662,7 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(new Tuple2<String, Integer>("california", 14),
new Tuple2<String, Integer>("new york", 9)));
- JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey(
@@ -656,7 +680,7 @@ public class JavaAPISuite implements Serializable {
}
});
JavaTestUtils.attachTestOutputStream(updated);
- List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 3, 3);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
Assert.assertEquals(expected, result);
}
@@ -673,13 +697,13 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(new Tuple2<String, Integer>("california", 10),
new Tuple2<String, Integer>("new york", 4)));
- JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> reduceWindowed =
pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(reduceWindowed);
- List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 3, 3);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
Assert.assertEquals(expected, result);
}
@@ -700,13 +724,13 @@ public class JavaAPISuite implements Serializable {
new Tuple2<String, Long>("new york", 2L)));
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
- sc, inputData, 1);
+ ssc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Long> counted =
pairStream.countByKeyAndWindow(new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(counted);
- List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(sc, 3, 3);
+ List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
Assert.assertEquals(expected, result);
}
@@ -726,7 +750,7 @@ public class JavaAPISuite implements Serializable {
new Tuple2<String, String>("new york", "ISLANDERS")));
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
- sc, inputData, 1);
+ ssc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, String> mapped = pairStream.mapValues(new Function<String, String>() {
@@ -737,7 +761,7 @@ public class JavaAPISuite implements Serializable {
});
JavaTestUtils.attachTestOutputStream(mapped);
- List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+ List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@@ -765,7 +789,7 @@ public class JavaAPISuite implements Serializable {
new Tuple2<String, String>("new york", "islanders2")));
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
- sc, inputData, 1);
+ ssc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
@@ -781,7 +805,7 @@ public class JavaAPISuite implements Serializable {
});
JavaTestUtils.attachTestOutputStream(flatMapped);
- List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+ List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@@ -815,16 +839,16 @@ public class JavaAPISuite implements Serializable {
JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
- sc, stringStringKVStream1, 1);
+ ssc, stringStringKVStream1, 1);
JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
- sc, stringStringKVStream2, 1);
+ ssc, stringStringKVStream2, 1);
JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
JavaPairDStream<String, Tuple2<List<String>, List<String>>> grouped = pairStream1.cogroup(pairStream2);
JavaTestUtils.attachTestOutputStream(grouped);
- List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+ List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@@ -858,16 +882,16 @@ public class JavaAPISuite implements Serializable {
JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
- sc, stringStringKVStream1, 1);
+ ssc, stringStringKVStream1, 1);
JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
- sc, stringStringKVStream2, 1);
+ ssc, stringStringKVStream2, 1);
JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.join(pairStream2);
JavaTestUtils.attachTestOutputStream(joined);
- List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+ List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@@ -887,9 +911,9 @@ public class JavaAPISuite implements Serializable {
File tempDir = Files.createTempDir();
- sc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000));
+ ssc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000));
- JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream letterCount = stream.map(new Function<String, Integer>() {
@Override
public Integer call(String s) throws Exception {
@@ -897,15 +921,15 @@ public class JavaAPISuite implements Serializable {
}
});
JavaCheckpointTestUtils.attachTestOutputStream(letterCount);
- List<List<Integer>> initialResult = JavaTestUtils.runStreams(sc, 1, 1);
+ List<List<Integer>> initialResult = JavaTestUtils.runStreams(ssc, 1, 1);
assertOrderInvariantEquals(expectedInitial, initialResult);
Thread.sleep(1000);
- sc.stop();
- sc = new JavaStreamingContext(tempDir.getAbsolutePath());
- sc.start();
- List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(sc, 2, 2);
+ ssc.stop();
+ ssc = new JavaStreamingContext(tempDir.getAbsolutePath());
+ ssc.start();
+ List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 2);
assertOrderInvariantEquals(expectedFinal, finalResult);
}
@@ -922,7 +946,7 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(1,4),
Arrays.asList(8,7));
- JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream letterCount = stream.map(new Function<String, Integer>() {
@Override
public Integer call(String s) throws Exception {
@@ -933,7 +957,7 @@ public class JavaAPISuite implements Serializable {
letterCount.checkpoint(new Duration(1000));
- List<List<Integer>> result1 = JavaCheckpointTestUtils.runStreams(sc, 3, 3);
+ List<List<Integer>> result1 = JavaCheckpointTestUtils.runStreams(ssc, 3, 3);
assertOrderInvariantEquals(expected, result1);
}
*/
@@ -945,15 +969,15 @@ public class JavaAPISuite implements Serializable {
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();
HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap();
- JavaDStream test1 = sc.kafkaStream("localhost", 12345, "group", topics);
- JavaDStream test2 = sc.kafkaStream("localhost", 12345, "group", topics, offsets);
- JavaDStream test3 = sc.kafkaStream("localhost", 12345, "group", topics, offsets,
+ JavaDStream test1 = ssc.kafkaStream("localhost", 12345, "group", topics);
+ JavaDStream test2 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets);
+ JavaDStream test3 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets,
StorageLevel.MEMORY_AND_DISK());
}
@Test
public void testNetworkTextStream() {
- JavaDStream test = sc.networkTextStream("localhost", 12345);
+ JavaDStream test = ssc.networkTextStream("localhost", 12345);
}
@Test
@@ -973,7 +997,7 @@ public class JavaAPISuite implements Serializable {
}
}
- JavaDStream test = sc.networkStream(
+ JavaDStream test = ssc.networkStream(
"localhost",
12345,
new Converter(),
@@ -982,22 +1006,22 @@ public class JavaAPISuite implements Serializable {
@Test
public void testTextFileStream() {
- JavaDStream test = sc.textFileStream("/tmp/foo");
+ JavaDStream test = ssc.textFileStream("/tmp/foo");
}
@Test
public void testRawNetworkStream() {
- JavaDStream test = sc.rawNetworkStream("localhost", 12345);
+ JavaDStream test = ssc.rawNetworkStream("localhost", 12345);
}
@Test
public void testFlumeStream() {
- JavaDStream test = sc.flumeStream("localhost", 12345);
+ JavaDStream test = ssc.flumeStream("localhost", 12345);
}
@Test
public void testFileStream() {
JavaPairDStream<String, String> foo =
- sc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo");
+ ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo");
}
}