aboutsummaryrefslogtreecommitdiff
path: root/external/twitter
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-06 01:47:53 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-06 01:47:53 -0800
commitd0fd3b9ad238294346eb3465c489eabd41fb2380 (patch)
treed7f7f17fc842589f80c3ef16669741e3499d9692 /external/twitter
parent977bcc36d4440ff562d5dbcc12449bf383d0d9e2 (diff)
downloadspark-d0fd3b9ad238294346eb3465c489eabd41fb2380.tar.gz
spark-d0fd3b9ad238294346eb3465c489eabd41fb2380.tar.bz2
spark-d0fd3b9ad238294346eb3465c489eabd41fb2380.zip
Changed JavaStreamingContextWith*** to ***Function in streaming.api.java.*** package. Also fixed packages of Flume and MQTT tests.
Diffstat (limited to 'external/twitter')
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala (renamed from external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala)18
-rw-r--r--external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java19
2 files changed, 18 insertions, 19 deletions
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala
index 0250364331..22e297a03a 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala
@@ -15,20 +15,20 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.twitter
+package org.apache.spark.streaming.api.java.twitter
import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.twitter._
/**
* Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
* functions for creating Twitter input streams.
*/
-class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext)
- extends JavaStreamingContext(javaStreamingContext.ssc) {
+class TwitterFunctions(javaStreamingContext: JavaStreamingContext) {
/**
* Create a input stream that returns tweets received from Twitter using Twitter4J's default
@@ -37,7 +37,7 @@ class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext
* twitter4j.oauth.accessTokenSecret.
*/
def twitterStream(): JavaDStream[Status] = {
- ssc.twitterStream(None)
+ javaStreamingContext.ssc.twitterStream(None)
}
/**
@@ -48,7 +48,7 @@ class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext
* @param filters Set of filter strings to get only those tweets that match them
*/
def twitterStream(filters: Array[String]): JavaDStream[Status] = {
- ssc.twitterStream(None, filters)
+ javaStreamingContext.ssc.twitterStream(None, filters)
}
/**
@@ -60,7 +60,7 @@ class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext
* @param storageLevel Storage level to use for storing the received objects
*/
def twitterStream(filters: Array[String], storageLevel: StorageLevel): JavaDStream[Status] = {
- ssc.twitterStream(None, filters, storageLevel)
+ javaStreamingContext.ssc.twitterStream(None, filters, storageLevel)
}
/**
@@ -68,7 +68,7 @@ class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext
* @param twitterAuth Twitter4J Authorization
*/
def twitterStream(twitterAuth: Authorization): JavaDStream[Status] = {
- ssc.twitterStream(Some(twitterAuth))
+ javaStreamingContext.ssc.twitterStream(Some(twitterAuth))
}
/**
@@ -80,7 +80,7 @@ class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext
twitterAuth: Authorization,
filters: Array[String]
): JavaDStream[Status] = {
- ssc.twitterStream(Some(twitterAuth), filters)
+ javaStreamingContext.ssc.twitterStream(Some(twitterAuth), filters)
}
/**
@@ -94,6 +94,6 @@ class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext
filters: Array[String],
storageLevel: StorageLevel
): JavaDStream[Status] = {
- ssc.twitterStream(Some(twitterAuth), filters, storageLevel)
+ javaStreamingContext.ssc.twitterStream(Some(twitterAuth), filters, storageLevel)
}
}
diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
index 34e4fbdd85..4564d6cd33 100644
--- a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
+++ b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
@@ -18,6 +18,8 @@
package org.apache.spark.streaming.twitter;
import java.util.Arrays;
+
+import org.apache.spark.streaming.api.java.twitter.TwitterFunctions;
import org.junit.Test;
import twitter4j.Status;
@@ -31,21 +33,18 @@ import org.apache.spark.streaming.api.java.JavaDStream;
public class JavaTwitterStreamSuite extends LocalJavaStreamingContext {
@Test
public void testTwitterStream() {
- JavaStreamingContextWithTwitter sscWithTwitter = new JavaStreamingContextWithTwitter(ssc);
+ TwitterFunctions twitterFunc = new TwitterFunctions(ssc);
String[] filters = (String[])Arrays.<String>asList("filter1", "filter2").toArray();
Authorization auth = NullAuthorization.getInstance();
// tests the API, does not actually test data receiving
- JavaDStream<Status> test1 = sscWithTwitter.twitterStream();
- JavaDStream<Status> test2 = sscWithTwitter.twitterStream(filters);
+ JavaDStream<Status> test1 = twitterFunc.twitterStream();
+ JavaDStream<Status> test2 = twitterFunc.twitterStream(filters);
JavaDStream<Status> test3 =
- sscWithTwitter.twitterStream(filters, StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaDStream<Status> test4 = sscWithTwitter.twitterStream(auth);
- JavaDStream<Status> test5 = sscWithTwitter.twitterStream(auth, filters);
+ twitterFunc.twitterStream(filters, StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaDStream<Status> test4 = twitterFunc.twitterStream(auth);
+ JavaDStream<Status> test5 = twitterFunc.twitterStream(auth, filters);
JavaDStream<Status> test6 =
- sscWithTwitter.twitterStream(auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2());
-
- // To verify that JavaStreamingContextWithKafka is also StreamingContext
- JavaDStream<String> socketStream = sscWithTwitter.socketTextStream("localhost", 9999);
+ twitterFunc.twitterStream(auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2());
}
}