diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-30 11:13:24 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-30 11:13:24 -0800 |
commit | f4e40661912af2a23e250a49f72f00675172e2de (patch) | |
tree | 97d40d041b08bf8a320f908e7b241cce9432c014 /external/twitter/src | |
parent | 6e43039614ed1ec55a134fb82fb3e8d4e80996ef (diff) | |
download | spark-f4e40661912af2a23e250a49f72f00675172e2de.tar.gz spark-f4e40661912af2a23e250a49f72f00675172e2de.tar.bz2 spark-f4e40661912af2a23e250a49f72f00675172e2de.zip |
Refactored kafka, flume, zeromq, mqtt as separate external projects, with their own self-contained scala API, java API, scala unit tests and java unit tests. Updated examples to use the external projects.
Diffstat (limited to 'external/twitter/src')
7 files changed, 288 insertions, 30 deletions
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala new file mode 100644 index 0000000000..0250364331 --- /dev/null +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.twitter + +import twitter4j.Status +import twitter4j.auth.Authorization + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} + +/** + * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra + * functions for creating Twitter input streams. + */ +class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext) + extends JavaStreamingContext(javaStreamingContext.ssc) { + + /** + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret. + */ + def twitterStream(): JavaDStream[Status] = { + ssc.twitterStream(None) + } + + /** + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret. + * @param filters Set of filter strings to get only those tweets that match them + */ + def twitterStream(filters: Array[String]): JavaDStream[Status] = { + ssc.twitterStream(None, filters) + } + + /** + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret. + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream(filters: Array[String], storageLevel: StorageLevel): JavaDStream[Status] = { + ssc.twitterStream(None, filters, storageLevel) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param twitterAuth Twitter4J Authorization + */ + def twitterStream(twitterAuth: Authorization): JavaDStream[Status] = { + ssc.twitterStream(Some(twitterAuth)) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param twitterAuth Twitter4J Authorization + * @param filters Set of filter strings to get only those tweets that match them + */ + def twitterStream( + twitterAuth: Authorization, + filters: Array[String] + ): JavaDStream[Status] = { + ssc.twitterStream(Some(twitterAuth), filters) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param twitterAuth Twitter4J Authorization object + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream( + twitterAuth: Authorization, + filters: Array[String], + storageLevel: StorageLevel + ): JavaDStream[Status] = { + ssc.twitterStream(Some(twitterAuth), filters, storageLevel) + } +} diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/StreamingContextWithTwitter.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/StreamingContextWithTwitter.scala deleted file mode 100644 index fe66e28ce6..0000000000 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/StreamingContextWithTwitter.scala +++ /dev/null @@ -1,27 +0,0 @@ -package org.apache.spark.streaming.twitter - -import twitter4j.Status -import twitter4j.auth.Authorization -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming._ - - -class StreamingContextWithTwitter(ssc: StreamingContext) { - /** - * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth - * authorization; this uses the system properties twitter4j.oauth.consumerKey, - * .consumerSecret, .accessToken and .accessTokenSecret. - * @param filters Set of filter strings to get only those tweets that match them - * @param storageLevel Storage level to use for storing the received objects - */ - def twitterStream( - twitterAuth: Option[Authorization] = None, - filters: Seq[String] = Nil, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[Status] = { - val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel) - ssc.registerInputStream(inputStream) - inputStream - } -} diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala new file mode 100644 index 0000000000..e91049d9b1 --- /dev/null +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.twitter + +import twitter4j.Status +import twitter4j.auth.Authorization + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ + +/** + * Extra Twitter input stream functions available on [[org.apache.spark.streaming.StreamingContext]] + * through implicit conversions. Import org.apache.spark.streaming.twitter._ to use these functions. + */ +class TwitterFunctions(ssc: StreamingContext) { + /** + * Create a input stream that returns tweets received from Twitter. + * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth + * authorization; this uses the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret. + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream( + twitterAuth: Option[Authorization], + filters: Seq[String] = Nil, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[Status] = { + val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel) + ssc.registerInputStream(inputStream) + inputStream + } +} diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala index 89c202a730..23f82c5885 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala @@ -1,7 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.streaming package object twitter { - implicit def enrichMyStreamingContext(ssc: StreamingContext): StreamingContextWithTwitter = { - new StreamingContextWithTwitter(ssc) - } + implicit def sscToTwitterFunctions(ssc: StreamingContext) = new TwitterFunctions(ssc) } diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java new file mode 100644 index 0000000000..34e4fbdd85 --- /dev/null +++ b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.twitter; + +import java.util.Arrays; +import org.junit.Test; + +import twitter4j.Status; +import twitter4j.auth.Authorization; +import twitter4j.auth.NullAuthorization; + +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaDStream; + +public class JavaTwitterStreamSuite extends LocalJavaStreamingContext { + @Test + public void testTwitterStream() { + JavaStreamingContextWithTwitter sscWithTwitter = new JavaStreamingContextWithTwitter(ssc); + String[] filters = (String[])Arrays.<String>asList("filter1", "filter2").toArray(); + Authorization auth = NullAuthorization.getInstance(); + + // tests the API, does not actually test data receiving + JavaDStream<Status> test1 = sscWithTwitter.twitterStream(); + JavaDStream<Status> test2 = sscWithTwitter.twitterStream(filters); + JavaDStream<Status> test3 = + sscWithTwitter.twitterStream(filters, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaDStream<Status> test4 = sscWithTwitter.twitterStream(auth); + JavaDStream<Status> test5 = sscWithTwitter.twitterStream(auth, filters); + JavaDStream<Status> test6 = + sscWithTwitter.twitterStream(auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2()); + + // To verify that JavaStreamingContextWithKafka is also StreamingContext + JavaDStream<String> socketStream = sscWithTwitter.socketTextStream("localhost", 9999); + } +} diff --git a/external/twitter/src/test/resources/log4j.properties b/external/twitter/src/test/resources/log4j.properties new file mode 100644 index 0000000000..063529a9cb --- /dev/null +++ b/external/twitter/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=INFO, file +# log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN + diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala new file mode 100644 index 0000000000..d7f6d35e07 --- /dev/null +++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.twitter + +import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} +import org.apache.spark.storage.StorageLevel +import twitter4j.auth.{NullAuthorization, Authorization} + +class TwitterStreamSuite extends TestSuiteBase { + + test("kafka input stream") { + val ssc = new StreamingContext(master, framework, batchDuration) + val filters = Seq("filter1", "filter2") + val authorization: Authorization = NullAuthorization.getInstance() + + // tests the API, does not actually test data receiving + val test1 = ssc.twitterStream(None) + val test2 = ssc.twitterStream(None, filters) + val test3 = ssc.twitterStream(None, filters, StorageLevel.MEMORY_AND_DISK_SER_2) + val test4 = ssc.twitterStream(Some(authorization)) + val test5 = ssc.twitterStream(Some(authorization), filters) + val test6 = ssc.twitterStream(Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2) + + // Note that actually testing the data receiving is hard as authentication keys are + // necessary for accessing Twitter live stream + } +} |