aboutsummaryrefslogtreecommitdiff
path: root/external/twitter/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-07 01:56:15 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-07 01:56:15 -0800
commitaa99f226a691ddcb4442d60f4cd4908f434cc4ce (patch)
tree33a1614e3d5ee7a050776e3601ba8c7430b573f8 /external/twitter/src
parent3b4c4c7f4d0d6e45a1acb0baf0d9416a8997b686 (diff)
downloadspark-aa99f226a691ddcb4442d60f4cd4908f434cc4ce.tar.gz
spark-aa99f226a691ddcb4442d60f4cd4908f434cc4ce.tar.bz2
spark-aa99f226a691ddcb4442d60f4cd4908f434cc4ce.zip
Removed XYZFunctions and added XYZUtils as a common Scala and Java interface for creating XYZ streams.
Diffstat (limited to 'external/twitter/src')
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala49
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala (renamed from external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala)75
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala22
-rw-r--r--external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java20
-rw-r--r--external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala13
5 files changed, 66 insertions, 113 deletions
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala
deleted file mode 100644
index e91049d9b1..0000000000
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.twitter
-
-import twitter4j.Status
-import twitter4j.auth.Authorization
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming._
-
-/**
- * Extra Twitter input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
- * through implicit conversions. Import org.apache.spark.streaming.twitter._ to use these functions.
- */
-class TwitterFunctions(ssc: StreamingContext) {
- /**
- * Create a input stream that returns tweets received from Twitter.
- * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth
- * authorization; this uses the system properties twitter4j.oauth.consumerKey,
- * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
- * twitter4j.oauth.accessTokenSecret.
- * @param filters Set of filter strings to get only those tweets that match them
- * @param storageLevel Storage level to use for storing the received objects
- */
- def twitterStream(
- twitterAuth: Option[Authorization],
- filters: Seq[String] = Nil,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[Status] = {
- val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
- ssc.registerInputStream(inputStream)
- inputStream
- }
-}
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
index 22e297a03a..5e506ffabc 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
@@ -15,29 +15,45 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.api.java.twitter
+package org.apache.spark.streaming.twitter
import twitter4j.Status
import twitter4j.auth.Authorization
-
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, DStream}
import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
-import org.apache.spark.streaming.twitter._
-/**
- * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
- * functions for creating Twitter input streams.
- */
-class TwitterFunctions(javaStreamingContext: JavaStreamingContext) {
+object TwitterUtils {
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param ssc StreamingContext object
+ * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth
+ * authorization; this uses the system properties twitter4j.oauth.consumerKey,
+ * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
+ * twitter4j.oauth.accessTokenSecret
+ * @param filters Set of filter strings to get only those tweets that match them
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createStream(
+ ssc: StreamingContext,
+ twitterAuth: Option[Authorization],
+ filters: Seq[String] = Nil,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[Status] = {
+ val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
+ ssc.registerInputStream(inputStream)
+ inputStream
+ }
/**
* Create a input stream that returns tweets received from Twitter using Twitter4J's default
* OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
* twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
* twitter4j.oauth.accessTokenSecret.
+ * @param jssc JavaStreamingContext object
*/
- def twitterStream(): JavaDStream[Status] = {
- javaStreamingContext.ssc.twitterStream(None)
+ def createStream(jssc: JavaStreamingContext): JavaDStream[Status] = {
+ createStream(jssc.ssc, None)
}
/**
@@ -45,10 +61,11 @@ class TwitterFunctions(javaStreamingContext: JavaStreamingContext) {
* OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
* twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
* twitter4j.oauth.accessTokenSecret.
+ * @param jssc JavaStreamingContext object
* @param filters Set of filter strings to get only those tweets that match them
*/
- def twitterStream(filters: Array[String]): JavaDStream[Status] = {
- javaStreamingContext.ssc.twitterStream(None, filters)
+ def createStream(jssc: JavaStreamingContext, filters: Array[String]): JavaDStream[Status] = {
+ createStream(jssc.ssc, None, filters)
}
/**
@@ -56,44 +73,54 @@ class TwitterFunctions(javaStreamingContext: JavaStreamingContext) {
* OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
* twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
* twitter4j.oauth.accessTokenSecret.
- * @param filters Set of filter strings to get only those tweets that match them
+ * @param jssc JavaStreamingContext object
+ * @param filters Set of filter strings to get only those tweets that match them
* @param storageLevel Storage level to use for storing the received objects
*/
- def twitterStream(filters: Array[String], storageLevel: StorageLevel): JavaDStream[Status] = {
- javaStreamingContext.ssc.twitterStream(None, filters, storageLevel)
+ def createStream(
+ jssc: JavaStreamingContext,
+ filters: Array[String],
+ storageLevel: StorageLevel
+ ): JavaDStream[Status] = {
+ createStream(jssc.ssc, None, filters, storageLevel)
}
/**
* Create a input stream that returns tweets received from Twitter.
+ * @param jssc JavaStreamingContext object
* @param twitterAuth Twitter4J Authorization
*/
- def twitterStream(twitterAuth: Authorization): JavaDStream[Status] = {
- javaStreamingContext.ssc.twitterStream(Some(twitterAuth))
+ def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization): JavaDStream[Status] = {
+ createStream(jssc.ssc, Some(twitterAuth))
}
/**
* Create a input stream that returns tweets received from Twitter.
+ * @param jssc JavaStreamingContext object
* @param twitterAuth Twitter4J Authorization
- * @param filters Set of filter strings to get only those tweets that match them
+ * @param filters Set of filter strings to get only those tweets that match them
*/
- def twitterStream(
+ def createStream(
+ jssc: JavaStreamingContext,
twitterAuth: Authorization,
filters: Array[String]
): JavaDStream[Status] = {
- javaStreamingContext.ssc.twitterStream(Some(twitterAuth), filters)
+ createStream(jssc.ssc, Some(twitterAuth), filters)
}
/**
* Create a input stream that returns tweets received from Twitter.
- * @param twitterAuth Twitter4J Authorization object
- * @param filters Set of filter strings to get only those tweets that match them
+ * @param jssc JavaStreamingContext object
+ * @param twitterAuth Twitter4J Authorization object
+ * @param filters Set of filter strings to get only those tweets that match them
* @param storageLevel Storage level to use for storing the received objects
*/
- def twitterStream(
+ def createStream(
+ jssc: JavaStreamingContext,
twitterAuth: Authorization,
filters: Array[String],
storageLevel: StorageLevel
): JavaDStream[Status] = {
- javaStreamingContext.ssc.twitterStream(Some(twitterAuth), filters, storageLevel)
+ createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel)
}
}
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala
deleted file mode 100644
index 23f82c5885..0000000000
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-package object twitter {
- implicit def sscToTwitterFunctions(ssc: StreamingContext) = new TwitterFunctions(ssc)
-}
diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
index 4564d6cd33..e46b4e5c75 100644
--- a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
+++ b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
@@ -19,13 +19,10 @@ package org.apache.spark.streaming.twitter;
import java.util.Arrays;
-import org.apache.spark.streaming.api.java.twitter.TwitterFunctions;
import org.junit.Test;
-
import twitter4j.Status;
import twitter4j.auth.Authorization;
import twitter4j.auth.NullAuthorization;
-
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
@@ -33,18 +30,17 @@ import org.apache.spark.streaming.api.java.JavaDStream;
public class JavaTwitterStreamSuite extends LocalJavaStreamingContext {
@Test
public void testTwitterStream() {
- TwitterFunctions twitterFunc = new TwitterFunctions(ssc);
String[] filters = (String[])Arrays.<String>asList("filter1", "filter2").toArray();
Authorization auth = NullAuthorization.getInstance();
// tests the API, does not actually test data receiving
- JavaDStream<Status> test1 = twitterFunc.twitterStream();
- JavaDStream<Status> test2 = twitterFunc.twitterStream(filters);
- JavaDStream<Status> test3 =
- twitterFunc.twitterStream(filters, StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaDStream<Status> test4 = twitterFunc.twitterStream(auth);
- JavaDStream<Status> test5 = twitterFunc.twitterStream(auth, filters);
- JavaDStream<Status> test6 =
- twitterFunc.twitterStream(auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaDStream<Status> test1 = TwitterUtils.createStream(ssc);
+ JavaDStream<Status> test2 = TwitterUtils.createStream(ssc, filters);
+ JavaDStream<Status> test3 = TwitterUtils.createStream(
+ ssc, filters, StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaDStream<Status> test4 = TwitterUtils.createStream(ssc, auth);
+ JavaDStream<Status> test5 = TwitterUtils.createStream(ssc, auth, filters);
+ JavaDStream<Status> test6 = TwitterUtils.createStream(ssc,
+ auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2());
}
}
diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
index d7f6d35e07..a0a8fe617b 100644
--- a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
+++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
@@ -29,12 +29,13 @@ class TwitterStreamSuite extends TestSuiteBase {
val authorization: Authorization = NullAuthorization.getInstance()
// tests the API, does not actually test data receiving
- val test1 = ssc.twitterStream(None)
- val test2 = ssc.twitterStream(None, filters)
- val test3 = ssc.twitterStream(None, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
- val test4 = ssc.twitterStream(Some(authorization))
- val test5 = ssc.twitterStream(Some(authorization), filters)
- val test6 = ssc.twitterStream(Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test1 = TwitterUtils.createStream(ssc, None)
+ val test2 = TwitterUtils.createStream(ssc, None, filters)
+ val test3 = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test4 = TwitterUtils.createStream(ssc, Some(authorization))
+ val test5 = TwitterUtils.createStream(ssc, Some(authorization), filters)
+ val test6 = TwitterUtils.createStream(ssc, Some(authorization), filters,
+ StorageLevel.MEMORY_AND_DISK_SER_2)
// Note that actually testing the data receiving is hard as authentication keys are
// necessary for accessing Twitter live stream