diff options
Diffstat (limited to 'external/twitter')
6 files changed, 433 insertions, 0 deletions
diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml new file mode 100644 index 0000000000..216e6c1d8f --- /dev/null +++ b/external/twitter/pom.xml @@ -0,0 +1,89 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.spark</groupId> + <artifactId>spark-parent</artifactId> + <version>0.9.0-incubating-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-twitter_2.10</artifactId> + <packaging>jar</packaging> + <name>Spark Project External Twitter</name> + <url>http://spark.incubator.apache.org/</url> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.twitter4j</groupId> + <artifactId>twitter4j-stream</artifactId> + <version>3.0.3</version> + <exclusions> + <exclusion> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.scalacheck</groupId> + <artifactId>scalacheck_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.novocode</groupId> + <artifactId>junit-interface</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> + <plugins> + <plugin> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala new file mode 100644 index 0000000000..5cc721d7f9 --- /dev/null +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.twitter + +import java.util.prefs.Preferences +import twitter4j._ +import twitter4j.auth.Authorization +import twitter4j.conf.ConfigurationBuilder +import twitter4j.conf.PropertyConfiguration +import twitter4j.auth.OAuthAuthorization +import twitter4j.auth.AccessToken +import org.apache.spark._ +import org.apache.spark.streaming._ +import org.apache.spark.streaming.dstream._ +import org.apache.spark.storage.StorageLevel + +/* A stream of Twitter statuses, potentially filtered by one or more keywords. +* +* @constructor create a new Twitter stream using the supplied Twitter4J authentication credentials. +* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is +* such that this may return a sampled subset of all tweets during each interval. +* +* If no Authorization object is provided, initializes OAuth authorization using the system +* properties twitter4j.oauth.consumerKey, .consumerSecret, .accessToken and .accessTokenSecret. +*/ +private[streaming] +class TwitterInputDStream( + @transient ssc_ : StreamingContext, + twitterAuth: Option[Authorization], + filters: Seq[String], + storageLevel: StorageLevel + ) extends NetworkInputDStream[Status](ssc_) { + + private def createOAuthAuthorization(): Authorization = { + new OAuthAuthorization(new ConfigurationBuilder().build()) + } + + private val authorization = twitterAuth.getOrElse(createOAuthAuthorization()) + + override def getReceiver(): NetworkReceiver[Status] = { + new TwitterReceiver(authorization, filters, storageLevel) + } +} + +private[streaming] +class TwitterReceiver( + twitterAuth: Authorization, + filters: Seq[String], + storageLevel: StorageLevel + ) extends NetworkReceiver[Status] { + + var twitterStream: TwitterStream = _ + lazy val blockGenerator = new BlockGenerator(storageLevel) + + protected override def onStart() { + blockGenerator.start() + twitterStream = new TwitterStreamFactory().getInstance(twitterAuth) + twitterStream.addListener(new StatusListener { + def onStatus(status: Status) = { + blockGenerator += status + } + // Unimplemented + def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {} + def onTrackLimitationNotice(i: Int) {} + def onScrubGeo(l: Long, l1: Long) {} + def onStallWarning(stallWarning: StallWarning) {} + def onException(e: Exception) { stopOnError(e) } + }) + + val query: FilterQuery = new FilterQuery + if (filters.size > 0) { + query.track(filters.toArray) + twitterStream.filter(query) + } else { + twitterStream.sample() + } + logInfo("Twitter receiver started") + } + + protected override def onStop() { + blockGenerator.stop() + twitterStream.shutdown() + logInfo("Twitter receiver stopped") + } +} diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala new file mode 100644 index 0000000000..5e506ffabc --- /dev/null +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.twitter + +import twitter4j.Status +import twitter4j.auth.Authorization +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, DStream} +import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} + +object TwitterUtils { + /** + * Create a input stream that returns tweets received from Twitter. + * @param ssc StreamingContext object + * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth + * authorization; this uses the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def createStream( + ssc: StreamingContext, + twitterAuth: Option[Authorization], + filters: Seq[String] = Nil, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[Status] = { + val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel) + ssc.registerInputStream(inputStream) + inputStream + } + + /** + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret. + * @param jssc JavaStreamingContext object + */ + def createStream(jssc: JavaStreamingContext): JavaDStream[Status] = { + createStream(jssc.ssc, None) + } + + /** + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret. + * @param jssc JavaStreamingContext object + * @param filters Set of filter strings to get only those tweets that match them + */ + def createStream(jssc: JavaStreamingContext, filters: Array[String]): JavaDStream[Status] = { + createStream(jssc.ssc, None, filters) + } + + /** + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret. + * @param jssc JavaStreamingContext object + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def createStream( + jssc: JavaStreamingContext, + filters: Array[String], + storageLevel: StorageLevel + ): JavaDStream[Status] = { + createStream(jssc.ssc, None, filters, storageLevel) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param jssc JavaStreamingContext object + * @param twitterAuth Twitter4J Authorization + */ + def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization): JavaDStream[Status] = { + createStream(jssc.ssc, Some(twitterAuth)) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param jssc JavaStreamingContext object + * @param twitterAuth Twitter4J Authorization + * @param filters Set of filter strings to get only those tweets that match them + */ + def createStream( + jssc: JavaStreamingContext, + twitterAuth: Authorization, + filters: Array[String] + ): JavaDStream[Status] = { + createStream(jssc.ssc, Some(twitterAuth), filters) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param jssc JavaStreamingContext object + * @param twitterAuth Twitter4J Authorization object + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def createStream( + jssc: JavaStreamingContext, + twitterAuth: Authorization, + filters: Array[String], + storageLevel: StorageLevel + ): JavaDStream[Status] = { + createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel) + } +} diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java new file mode 100644 index 0000000000..e46b4e5c75 --- /dev/null +++ b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.twitter; + +import java.util.Arrays; + +import org.junit.Test; +import twitter4j.Status; +import twitter4j.auth.Authorization; +import twitter4j.auth.NullAuthorization; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaDStream; + +public class JavaTwitterStreamSuite extends LocalJavaStreamingContext { + @Test + public void testTwitterStream() { + String[] filters = (String[])Arrays.<String>asList("filter1", "filter2").toArray(); + Authorization auth = NullAuthorization.getInstance(); + + // tests the API, does not actually test data receiving + JavaDStream<Status> test1 = TwitterUtils.createStream(ssc); + JavaDStream<Status> test2 = TwitterUtils.createStream(ssc, filters); + JavaDStream<Status> test3 = TwitterUtils.createStream( + ssc, filters, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaDStream<Status> test4 = TwitterUtils.createStream(ssc, auth); + JavaDStream<Status> test5 = TwitterUtils.createStream(ssc, auth, filters); + JavaDStream<Status> test6 = TwitterUtils.createStream(ssc, + auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2()); + } +} diff --git a/external/twitter/src/test/resources/log4j.properties b/external/twitter/src/test/resources/log4j.properties new file mode 100644 index 0000000000..063529a9cb --- /dev/null +++ b/external/twitter/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=INFO, file +# log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN + diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala new file mode 100644 index 0000000000..a0a8fe617b --- /dev/null +++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.twitter + +import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} +import org.apache.spark.storage.StorageLevel +import twitter4j.auth.{NullAuthorization, Authorization} + +class TwitterStreamSuite extends TestSuiteBase { + + test("kafka input stream") { + val ssc = new StreamingContext(master, framework, batchDuration) + val filters = Seq("filter1", "filter2") + val authorization: Authorization = NullAuthorization.getInstance() + + // tests the API, does not actually test data receiving + val test1 = TwitterUtils.createStream(ssc, None) + val test2 = TwitterUtils.createStream(ssc, None, filters) + val test3 = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2) + val test4 = TwitterUtils.createStream(ssc, Some(authorization)) + val test5 = TwitterUtils.createStream(ssc, Some(authorization), filters) + val test6 = TwitterUtils.createStream(ssc, Some(authorization), filters, + StorageLevel.MEMORY_AND_DISK_SER_2) + + // Note that actually testing the data receiving is hard as authentication keys are + // necessary for accessing Twitter live stream + } +} |