aboutsummaryrefslogtreecommitdiff
path: root/external/twitter/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-30 11:13:24 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-30 11:13:24 -0800
commitf4e40661912af2a23e250a49f72f00675172e2de (patch)
tree97d40d041b08bf8a320f908e7b241cce9432c014 /external/twitter/src
parent6e43039614ed1ec55a134fb82fb3e8d4e80996ef (diff)
downloadspark-f4e40661912af2a23e250a49f72f00675172e2de.tar.gz
spark-f4e40661912af2a23e250a49f72f00675172e2de.tar.bz2
spark-f4e40661912af2a23e250a49f72f00675172e2de.zip
Refactored kafka, flume, zeromq, mqtt as separate external projects, with their own self-contained scala API, java API, scala unit tests and java unit tests. Updated examples to use the external projects.
Diffstat (limited to 'external/twitter/src')
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala99
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/StreamingContextWithTwitter.scala27
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala49
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala21
-rw-r--r--external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java51
-rw-r--r--external/twitter/src/test/resources/log4j.properties29
-rw-r--r--external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala42
7 files changed, 288 insertions, 30 deletions
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala
new file mode 100644
index 0000000000..0250364331
--- /dev/null
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.twitter
+
+import twitter4j.Status
+import twitter4j.auth.Authorization
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+
+/**
+ * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
+ * functions for creating Twitter input streams.
+ */
+class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext)
+ extends JavaStreamingContext(javaStreamingContext.ssc) {
+
+ /**
+ * Create a input stream that returns tweets received from Twitter using Twitter4J's default
+ * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
+ * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
+ * twitter4j.oauth.accessTokenSecret.
+ */
+ def twitterStream(): JavaDStream[Status] = {
+ ssc.twitterStream(None)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter using Twitter4J's default
+ * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
+ * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
+ * twitter4j.oauth.accessTokenSecret.
+ * @param filters Set of filter strings to get only those tweets that match them
+ */
+ def twitterStream(filters: Array[String]): JavaDStream[Status] = {
+ ssc.twitterStream(None, filters)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter using Twitter4J's default
+ * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
+ * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
+ * twitter4j.oauth.accessTokenSecret.
+ * @param filters Set of filter strings to get only those tweets that match them
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def twitterStream(filters: Array[String], storageLevel: StorageLevel): JavaDStream[Status] = {
+ ssc.twitterStream(None, filters, storageLevel)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param twitterAuth Twitter4J Authorization
+ */
+ def twitterStream(twitterAuth: Authorization): JavaDStream[Status] = {
+ ssc.twitterStream(Some(twitterAuth))
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param twitterAuth Twitter4J Authorization
+ * @param filters Set of filter strings to get only those tweets that match them
+ */
+ def twitterStream(
+ twitterAuth: Authorization,
+ filters: Array[String]
+ ): JavaDStream[Status] = {
+ ssc.twitterStream(Some(twitterAuth), filters)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param twitterAuth Twitter4J Authorization object
+ * @param filters Set of filter strings to get only those tweets that match them
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def twitterStream(
+ twitterAuth: Authorization,
+ filters: Array[String],
+ storageLevel: StorageLevel
+ ): JavaDStream[Status] = {
+ ssc.twitterStream(Some(twitterAuth), filters, storageLevel)
+ }
+}
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/StreamingContextWithTwitter.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/StreamingContextWithTwitter.scala
deleted file mode 100644
index fe66e28ce6..0000000000
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/StreamingContextWithTwitter.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-package org.apache.spark.streaming.twitter
-
-import twitter4j.Status
-import twitter4j.auth.Authorization
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming._
-
-
-class StreamingContextWithTwitter(ssc: StreamingContext) {
- /**
- * Create a input stream that returns tweets received from Twitter.
- * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth
- * authorization; this uses the system properties twitter4j.oauth.consumerKey,
- * .consumerSecret, .accessToken and .accessTokenSecret.
- * @param filters Set of filter strings to get only those tweets that match them
- * @param storageLevel Storage level to use for storing the received objects
- */
- def twitterStream(
- twitterAuth: Option[Authorization] = None,
- filters: Seq[String] = Nil,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[Status] = {
- val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
- ssc.registerInputStream(inputStream)
- inputStream
- }
-}
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala
new file mode 100644
index 0000000000..e91049d9b1
--- /dev/null
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.twitter
+
+import twitter4j.Status
+import twitter4j.auth.Authorization
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+
+/**
+ * Extra Twitter input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
+ * through implicit conversions. Import org.apache.spark.streaming.twitter._ to use these functions.
+ */
+class TwitterFunctions(ssc: StreamingContext) {
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth
+ * authorization; this uses the system properties twitter4j.oauth.consumerKey,
+ * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
+ * twitter4j.oauth.accessTokenSecret.
+ * @param filters Set of filter strings to get only those tweets that match them
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def twitterStream(
+ twitterAuth: Option[Authorization],
+ filters: Seq[String] = Nil,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[Status] = {
+ val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
+ ssc.registerInputStream(inputStream)
+ inputStream
+ }
+}
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala
index 89c202a730..23f82c5885 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala
@@ -1,7 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.spark.streaming
package object twitter {
- implicit def enrichMyStreamingContext(ssc: StreamingContext): StreamingContextWithTwitter = {
- new StreamingContextWithTwitter(ssc)
- }
+ implicit def sscToTwitterFunctions(ssc: StreamingContext) = new TwitterFunctions(ssc)
}
diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
new file mode 100644
index 0000000000..34e4fbdd85
--- /dev/null
+++ b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.twitter;
+
+import java.util.Arrays;
+import org.junit.Test;
+
+import twitter4j.Status;
+import twitter4j.auth.Authorization;
+import twitter4j.auth.NullAuthorization;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+
+public class JavaTwitterStreamSuite extends LocalJavaStreamingContext {
+ @Test
+ public void testTwitterStream() {
+ JavaStreamingContextWithTwitter sscWithTwitter = new JavaStreamingContextWithTwitter(ssc);
+ String[] filters = (String[])Arrays.<String>asList("filter1", "filter2").toArray();
+ Authorization auth = NullAuthorization.getInstance();
+
+ // tests the API, does not actually test data receiving
+ JavaDStream<Status> test1 = sscWithTwitter.twitterStream();
+ JavaDStream<Status> test2 = sscWithTwitter.twitterStream(filters);
+ JavaDStream<Status> test3 =
+ sscWithTwitter.twitterStream(filters, StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaDStream<Status> test4 = sscWithTwitter.twitterStream(auth);
+ JavaDStream<Status> test5 = sscWithTwitter.twitterStream(auth, filters);
+ JavaDStream<Status> test6 =
+ sscWithTwitter.twitterStream(auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2());
+
+ // To verify that JavaStreamingContextWithKafka is also StreamingContext
+ JavaDStream<String> socketStream = sscWithTwitter.socketTextStream("localhost", 9999);
+ }
+}
diff --git a/external/twitter/src/test/resources/log4j.properties b/external/twitter/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..063529a9cb
--- /dev/null
+++ b/external/twitter/src/test/resources/log4j.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file streaming/target/unit-tests.log
+log4j.rootCategory=INFO, file
+# log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=streaming/target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+
diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
new file mode 100644
index 0000000000..d7f6d35e07
--- /dev/null
+++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.twitter
+
+import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
+import org.apache.spark.storage.StorageLevel
+import twitter4j.auth.{NullAuthorization, Authorization}
+
+class TwitterStreamSuite extends TestSuiteBase {
+
+ test("kafka input stream") {
+ val ssc = new StreamingContext(master, framework, batchDuration)
+ val filters = Seq("filter1", "filter2")
+ val authorization: Authorization = NullAuthorization.getInstance()
+
+ // tests the API, does not actually test data receiving
+ val test1 = ssc.twitterStream(None)
+ val test2 = ssc.twitterStream(None, filters)
+ val test3 = ssc.twitterStream(None, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test4 = ssc.twitterStream(Some(authorization))
+ val test5 = ssc.twitterStream(Some(authorization), filters)
+ val test6 = ssc.twitterStream(Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2)
+
+ // Note that actually testing the data receiving is hard as authentication keys are
+ // necessary for accessing Twitter live stream
+ }
+}