aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-23 16:25:07 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-23 16:25:07 -0800
commit41285eaae3642b73b3ac5007a35cc4e8f1d7d084 (patch)
tree7347664e7474271a1ca534b95c8b731f28045082 /streaming/src/test/java
parentd8cee52d526497efdc02ea39e2fb721321ec0b4e (diff)
downloadspark-41285eaae3642b73b3ac5007a35cc4e8f1d7d084.tar.gz
spark-41285eaae3642b73b3ac5007a35cc4e8f1d7d084.tar.bz2
spark-41285eaae3642b73b3ac5007a35cc4e8f1d7d084.zip
Fixed differences in APIs of StreamingContext and JavaStreamingContext. Change rawNetworkStream to rawSocketStream, and added twitter, actor, zeroMQ streams to JavaStreamingContext. Also added them to JavaAPISuite.
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java37
1 files changed, 32 insertions, 5 deletions
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 4530af5f6a..3bed500f73 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -24,10 +24,16 @@ import spark.streaming.api.java.JavaStreamingContext;
import spark.streaming.JavaTestUtils;
import spark.streaming.JavaCheckpointTestUtils;
import spark.streaming.dstream.KafkaPartitionKey;
+import spark.streaming.InputStreamsSuite;
import java.io.*;
import java.util.*;
+import akka.actor.Props;
+import akka.zeromq.Subscribe;
+
+
+
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
// see http://stackoverflow.com/questions/758570/.
@@ -1205,12 +1211,12 @@ public class JavaAPISuite implements Serializable {
}
@Test
- public void testNetworkTextStream() {
+ public void testSocketTextStream() {
JavaDStream test = ssc.socketTextStream("localhost", 12345);
}
@Test
- public void testNetworkString() {
+ public void testSocketString() {
class Converter extends Function<InputStream, Iterable<String>> {
public Iterable<String> call(InputStream in) {
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
@@ -1239,13 +1245,13 @@ public class JavaAPISuite implements Serializable {
}
@Test
- public void testRawNetworkStream() {
- JavaDStream test = ssc.rawNetworkStream("localhost", 12345);
+ public void testRawSocketStream() {
+ JavaDStream test = ssc.rawSocketStream("localhost", 12345);
}
@Test
public void testFlumeStream() {
- JavaDStream test = ssc.flumeStream("localhost", 12345);
+ JavaDStream test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY());
}
@Test
@@ -1253,4 +1259,25 @@ public class JavaAPISuite implements Serializable {
JavaPairDStream<String, String> foo =
ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo");
}
+
+ @Test
+ public void testTwitterStream() {
+ String[] filters = new String[] { "good", "bad", "ugly" };
+ JavaDStream test = ssc.twitterStream("username", "password", filters, StorageLevel.MEMORY_ONLY());
+ }
+
+ @Test
+ public void testActorStream() {
+ JavaDStream test = ssc.actorStream((Props)null, "TestActor", StorageLevel.MEMORY_ONLY());
+ }
+
+ @Test
+ public void testZeroMQStream() {
+ JavaDStream test = ssc.zeroMQStream("url", (Subscribe) null, new Function<byte[][], Iterable<String>>() {
+ @Override
+ public Iterable<String> call(byte[][] b) throws Exception {
+ return null;
+ }
+ });
+ }
}