diff options
Diffstat (limited to 'external/twitter')
-rw-r--r-- | external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala (renamed from external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala) | 18 | ||||
-rw-r--r-- | external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java | 19 |
2 files changed, 18 insertions, 19 deletions
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala index 0250364331..22e297a03a 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala @@ -15,20 +15,20 @@ * limitations under the License. */ -package org.apache.spark.streaming.twitter +package org.apache.spark.streaming.api.java.twitter import twitter4j.Status import twitter4j.auth.Authorization import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} +import org.apache.spark.streaming.twitter._ /** * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra * functions for creating Twitter input streams. */ -class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext) - extends JavaStreamingContext(javaStreamingContext.ssc) { +class TwitterFunctions(javaStreamingContext: JavaStreamingContext) { /** * Create a input stream that returns tweets received from Twitter using Twitter4J's default @@ -37,7 +37,7 @@ class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext * twitter4j.oauth.accessTokenSecret. */ def twitterStream(): JavaDStream[Status] = { - ssc.twitterStream(None) + javaStreamingContext.ssc.twitterStream(None) } /** @@ -48,7 +48,7 @@ class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext * @param filters Set of filter strings to get only those tweets that match them */ def twitterStream(filters: Array[String]): JavaDStream[Status] = { - ssc.twitterStream(None, filters) + javaStreamingContext.ssc.twitterStream(None, filters) } /** @@ -60,7 +60,7 @@ class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext * @param storageLevel Storage level to use for storing the received objects */ def twitterStream(filters: Array[String], storageLevel: StorageLevel): JavaDStream[Status] = { - ssc.twitterStream(None, filters, storageLevel) + javaStreamingContext.ssc.twitterStream(None, filters, storageLevel) } /** @@ -68,7 +68,7 @@ class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext * @param twitterAuth Twitter4J Authorization */ def twitterStream(twitterAuth: Authorization): JavaDStream[Status] = { - ssc.twitterStream(Some(twitterAuth)) + javaStreamingContext.ssc.twitterStream(Some(twitterAuth)) } /** @@ -80,7 +80,7 @@ class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext twitterAuth: Authorization, filters: Array[String] ): JavaDStream[Status] = { - ssc.twitterStream(Some(twitterAuth), filters) + javaStreamingContext.ssc.twitterStream(Some(twitterAuth), filters) } /** @@ -94,6 +94,6 @@ class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext filters: Array[String], storageLevel: StorageLevel ): JavaDStream[Status] = { - ssc.twitterStream(Some(twitterAuth), filters, storageLevel) + javaStreamingContext.ssc.twitterStream(Some(twitterAuth), filters, storageLevel) } } diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java index 34e4fbdd85..4564d6cd33 100644 --- a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java +++ b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java @@ -18,6 +18,8 @@ package org.apache.spark.streaming.twitter; import java.util.Arrays; + +import org.apache.spark.streaming.api.java.twitter.TwitterFunctions; import org.junit.Test; import twitter4j.Status; @@ -31,21 +33,18 @@ import org.apache.spark.streaming.api.java.JavaDStream; public class JavaTwitterStreamSuite extends LocalJavaStreamingContext { @Test public void testTwitterStream() { - JavaStreamingContextWithTwitter sscWithTwitter = new JavaStreamingContextWithTwitter(ssc); + TwitterFunctions twitterFunc = new TwitterFunctions(ssc); String[] filters = (String[])Arrays.<String>asList("filter1", "filter2").toArray(); Authorization auth = NullAuthorization.getInstance(); // tests the API, does not actually test data receiving - JavaDStream<Status> test1 = sscWithTwitter.twitterStream(); - JavaDStream<Status> test2 = sscWithTwitter.twitterStream(filters); + JavaDStream<Status> test1 = twitterFunc.twitterStream(); + JavaDStream<Status> test2 = twitterFunc.twitterStream(filters); JavaDStream<Status> test3 = - sscWithTwitter.twitterStream(filters, StorageLevel.MEMORY_AND_DISK_SER_2()); - JavaDStream<Status> test4 = sscWithTwitter.twitterStream(auth); - JavaDStream<Status> test5 = sscWithTwitter.twitterStream(auth, filters); + twitterFunc.twitterStream(filters, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaDStream<Status> test4 = twitterFunc.twitterStream(auth); + JavaDStream<Status> test5 = twitterFunc.twitterStream(auth, filters); JavaDStream<Status> test6 = - sscWithTwitter.twitterStream(auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2()); - - // To verify that JavaStreamingContextWithKafka is also StreamingContext - JavaDStream<String> socketStream = sscWithTwitter.socketTextStream("localhost", 9999); + twitterFunc.twitterStream(auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2()); } } |