diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-23 16:25:07 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-23 16:25:07 -0800 |
commit | 41285eaae3642b73b3ac5007a35cc4e8f1d7d084 (patch) | |
tree | 7347664e7474271a1ca534b95c8b731f28045082 /streaming/src/test/java | |
parent | d8cee52d526497efdc02ea39e2fb721321ec0b4e (diff) | |
download | spark-41285eaae3642b73b3ac5007a35cc4e8f1d7d084.tar.gz spark-41285eaae3642b73b3ac5007a35cc4e8f1d7d084.tar.bz2 spark-41285eaae3642b73b3ac5007a35cc4e8f1d7d084.zip |
Fixed differences in APIs of StreamingContext and JavaStreamingContext. Change rawNetworkStream to rawSocketStream, and added twitter, actor, zeroMQ streams to JavaStreamingContext. Also added them to JavaAPISuite.
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r-- | streaming/src/test/java/spark/streaming/JavaAPISuite.java | 37 |
1 files changed, 32 insertions, 5 deletions
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 4530af5f6a..3bed500f73 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -24,10 +24,16 @@ import spark.streaming.api.java.JavaStreamingContext; import spark.streaming.JavaTestUtils; import spark.streaming.JavaCheckpointTestUtils; import spark.streaming.dstream.KafkaPartitionKey; +import spark.streaming.InputStreamsSuite; import java.io.*; import java.util.*; +import akka.actor.Props; +import akka.zeromq.Subscribe; + + + // The test suite itself is Serializable so that anonymous Function implementations can be // serialized, as an alternative to converting these anonymous classes to static inner classes; // see http://stackoverflow.com/questions/758570/. @@ -1205,12 +1211,12 @@ public class JavaAPISuite implements Serializable { } @Test - public void testNetworkTextStream() { + public void testSocketTextStream() { JavaDStream test = ssc.socketTextStream("localhost", 12345); } @Test - public void testNetworkString() { + public void testSocketString() { class Converter extends Function<InputStream, Iterable<String>> { public Iterable<String> call(InputStream in) { BufferedReader reader = new BufferedReader(new InputStreamReader(in)); @@ -1239,13 +1245,13 @@ public class JavaAPISuite implements Serializable { } @Test - public void testRawNetworkStream() { - JavaDStream test = ssc.rawNetworkStream("localhost", 12345); + public void testRawSocketStream() { + JavaDStream test = ssc.rawSocketStream("localhost", 12345); } @Test public void testFlumeStream() { - JavaDStream test = ssc.flumeStream("localhost", 12345); + JavaDStream test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY()); } @Test @@ -1253,4 +1259,25 @@ public class JavaAPISuite implements Serializable { JavaPairDStream<String, String> foo = ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo"); } + + @Test + public void testTwitterStream() { + String[] filters = new String[] { "good", "bad", "ugly" }; + JavaDStream test = ssc.twitterStream("username", "password", filters, StorageLevel.MEMORY_ONLY()); + } + + @Test + public void testActorStream() { + JavaDStream test = ssc.actorStream((Props)null, "TestActor", StorageLevel.MEMORY_ONLY()); + } + + @Test + public void testZeroMQStream() { + JavaDStream test = ssc.zeroMQStream("url", (Subscribe) null, new Function<byte[][], Iterable<String>>() { + @Override + public Iterable<String> call(byte[][] b) throws Exception { + return null; + } + }); + } } |