From aa99f226a691ddcb4442d60f4cd4908f434cc4ce Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 7 Jan 2014 01:56:15 -0800 Subject: Removed XYZFunctions and added XYZUtils as a common Scala and Java interface for creating XYZ streams. --- .../api/java/twitter/TwitterFunctions.scala | 99 ---------------- .../spark/streaming/twitter/TwitterFunctions.scala | 49 -------- .../spark/streaming/twitter/TwitterUtils.scala | 126 +++++++++++++++++++++ .../apache/spark/streaming/twitter/package.scala | 22 ---- .../streaming/twitter/JavaTwitterStreamSuite.java | 20 ++-- .../streaming/twitter/TwitterStreamSuite.scala | 13 ++- 6 files changed, 141 insertions(+), 188 deletions(-) delete mode 100644 external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala delete mode 100644 external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala create mode 100644 external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala delete mode 100644 external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala (limited to 'external/twitter') diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala deleted file mode 100644 index 22e297a03a..0000000000 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.api.java.twitter - -import twitter4j.Status -import twitter4j.auth.Authorization - -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} -import org.apache.spark.streaming.twitter._ - -/** - * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra - * functions for creating Twitter input streams. - */ -class TwitterFunctions(javaStreamingContext: JavaStreamingContext) { - - /** - * Create a input stream that returns tweets received from Twitter using Twitter4J's default - * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, - * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and - * twitter4j.oauth.accessTokenSecret. - */ - def twitterStream(): JavaDStream[Status] = { - javaStreamingContext.ssc.twitterStream(None) - } - - /** - * Create a input stream that returns tweets received from Twitter using Twitter4J's default - * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, - * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and - * twitter4j.oauth.accessTokenSecret. - * @param filters Set of filter strings to get only those tweets that match them - */ - def twitterStream(filters: Array[String]): JavaDStream[Status] = { - javaStreamingContext.ssc.twitterStream(None, filters) - } - - /** - * Create a input stream that returns tweets received from Twitter using Twitter4J's default - * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, - * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and - * twitter4j.oauth.accessTokenSecret. - * @param filters Set of filter strings to get only those tweets that match them - * @param storageLevel Storage level to use for storing the received objects - */ - def twitterStream(filters: Array[String], storageLevel: StorageLevel): JavaDStream[Status] = { - javaStreamingContext.ssc.twitterStream(None, filters, storageLevel) - } - - /** - * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J Authorization - */ - def twitterStream(twitterAuth: Authorization): JavaDStream[Status] = { - javaStreamingContext.ssc.twitterStream(Some(twitterAuth)) - } - - /** - * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J Authorization - * @param filters Set of filter strings to get only those tweets that match them - */ - def twitterStream( - twitterAuth: Authorization, - filters: Array[String] - ): JavaDStream[Status] = { - javaStreamingContext.ssc.twitterStream(Some(twitterAuth), filters) - } - - /** - * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J Authorization object - * @param filters Set of filter strings to get only those tweets that match them - * @param storageLevel Storage level to use for storing the received objects - */ - def twitterStream( - twitterAuth: Authorization, - filters: Array[String], - storageLevel: StorageLevel - ): JavaDStream[Status] = { - javaStreamingContext.ssc.twitterStream(Some(twitterAuth), filters, storageLevel) - } -} diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala deleted file mode 100644 index e91049d9b1..0000000000 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.twitter - -import twitter4j.Status -import twitter4j.auth.Authorization - -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming._ - -/** - * Extra Twitter input stream functions available on [[org.apache.spark.streaming.StreamingContext]] - * through implicit conversions. Import org.apache.spark.streaming.twitter._ to use these functions. - */ -class TwitterFunctions(ssc: StreamingContext) { - /** - * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth - * authorization; this uses the system properties twitter4j.oauth.consumerKey, - * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and - * twitter4j.oauth.accessTokenSecret. - * @param filters Set of filter strings to get only those tweets that match them - * @param storageLevel Storage level to use for storing the received objects - */ - def twitterStream( - twitterAuth: Option[Authorization], - filters: Seq[String] = Nil, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[Status] = { - val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel) - ssc.registerInputStream(inputStream) - inputStream - } -} diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala new file mode 100644 index 0000000000..5e506ffabc --- /dev/null +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.twitter + +import twitter4j.Status +import twitter4j.auth.Authorization +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, DStream} +import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} + +object TwitterUtils { + /** + * Create a input stream that returns tweets received from Twitter. + * @param ssc StreamingContext object + * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth + * authorization; this uses the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def createStream( + ssc: StreamingContext, + twitterAuth: Option[Authorization], + filters: Seq[String] = Nil, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[Status] = { + val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel) + ssc.registerInputStream(inputStream) + inputStream + } + + /** + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret. + * @param jssc JavaStreamingContext object + */ + def createStream(jssc: JavaStreamingContext): JavaDStream[Status] = { + createStream(jssc.ssc, None) + } + + /** + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret. + * @param jssc JavaStreamingContext object + * @param filters Set of filter strings to get only those tweets that match them + */ + def createStream(jssc: JavaStreamingContext, filters: Array[String]): JavaDStream[Status] = { + createStream(jssc.ssc, None, filters) + } + + /** + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret. + * @param jssc JavaStreamingContext object + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def createStream( + jssc: JavaStreamingContext, + filters: Array[String], + storageLevel: StorageLevel + ): JavaDStream[Status] = { + createStream(jssc.ssc, None, filters, storageLevel) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param jssc JavaStreamingContext object + * @param twitterAuth Twitter4J Authorization + */ + def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization): JavaDStream[Status] = { + createStream(jssc.ssc, Some(twitterAuth)) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param jssc JavaStreamingContext object + * @param twitterAuth Twitter4J Authorization + * @param filters Set of filter strings to get only those tweets that match them + */ + def createStream( + jssc: JavaStreamingContext, + twitterAuth: Authorization, + filters: Array[String] + ): JavaDStream[Status] = { + createStream(jssc.ssc, Some(twitterAuth), filters) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param jssc JavaStreamingContext object + * @param twitterAuth Twitter4J Authorization object + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def createStream( + jssc: JavaStreamingContext, + twitterAuth: Authorization, + filters: Array[String], + storageLevel: StorageLevel + ): JavaDStream[Status] = { + createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel) + } +} diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala deleted file mode 100644 index 23f82c5885..0000000000 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming - -package object twitter { - implicit def sscToTwitterFunctions(ssc: StreamingContext) = new TwitterFunctions(ssc) -} diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java index 4564d6cd33..e46b4e5c75 100644 --- a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java +++ b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java @@ -19,13 +19,10 @@ package org.apache.spark.streaming.twitter; import java.util.Arrays; -import org.apache.spark.streaming.api.java.twitter.TwitterFunctions; import org.junit.Test; - import twitter4j.Status; import twitter4j.auth.Authorization; import twitter4j.auth.NullAuthorization; - import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.LocalJavaStreamingContext; import org.apache.spark.streaming.api.java.JavaDStream; @@ -33,18 +30,17 @@ import org.apache.spark.streaming.api.java.JavaDStream; public class JavaTwitterStreamSuite extends LocalJavaStreamingContext { @Test public void testTwitterStream() { - TwitterFunctions twitterFunc = new TwitterFunctions(ssc); String[] filters = (String[])Arrays.asList("filter1", "filter2").toArray(); Authorization auth = NullAuthorization.getInstance(); // tests the API, does not actually test data receiving - JavaDStream test1 = twitterFunc.twitterStream(); - JavaDStream test2 = twitterFunc.twitterStream(filters); - JavaDStream test3 = - twitterFunc.twitterStream(filters, StorageLevel.MEMORY_AND_DISK_SER_2()); - JavaDStream test4 = twitterFunc.twitterStream(auth); - JavaDStream test5 = twitterFunc.twitterStream(auth, filters); - JavaDStream test6 = - twitterFunc.twitterStream(auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaDStream test1 = TwitterUtils.createStream(ssc); + JavaDStream test2 = TwitterUtils.createStream(ssc, filters); + JavaDStream test3 = TwitterUtils.createStream( + ssc, filters, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaDStream test4 = TwitterUtils.createStream(ssc, auth); + JavaDStream test5 = TwitterUtils.createStream(ssc, auth, filters); + JavaDStream test6 = TwitterUtils.createStream(ssc, + auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2()); } } diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala index d7f6d35e07..a0a8fe617b 100644 --- a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala +++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala @@ -29,12 +29,13 @@ class TwitterStreamSuite extends TestSuiteBase { val authorization: Authorization = NullAuthorization.getInstance() // tests the API, does not actually test data receiving - val test1 = ssc.twitterStream(None) - val test2 = ssc.twitterStream(None, filters) - val test3 = ssc.twitterStream(None, filters, StorageLevel.MEMORY_AND_DISK_SER_2) - val test4 = ssc.twitterStream(Some(authorization)) - val test5 = ssc.twitterStream(Some(authorization), filters) - val test6 = ssc.twitterStream(Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2) + val test1 = TwitterUtils.createStream(ssc, None) + val test2 = TwitterUtils.createStream(ssc, None, filters) + val test3 = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2) + val test4 = TwitterUtils.createStream(ssc, Some(authorization)) + val test5 = TwitterUtils.createStream(ssc, Some(authorization), filters) + val test6 = TwitterUtils.createStream(ssc, Some(authorization), filters, + StorageLevel.MEMORY_AND_DISK_SER_2) // Note that actually testing the data receiving is hard as authentication keys are // necessary for accessing Twitter live stream -- cgit v1.2.3