diff options
-rw-r--r-- | examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala | 2 | ||||
-rw-r--r-- | examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala | 2 | ||||
-rw-r--r-- | examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala | 1 | ||||
-rw-r--r-- | external/twitter/src/main/scala/org/apache/spark/streaming/twitter/StreamingContextWithTwitter.scala | 27 | ||||
-rw-r--r-- | external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala (renamed from streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala) | 12 | ||||
-rw-r--r-- | external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala | 7 | ||||
-rw-r--r-- | project/SparkBuild.scala | 13 | ||||
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala | 6 | ||||
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala | 8 |
9 files changed, 64 insertions, 14 deletions
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala index 35b6329ab3..a00b3bde6e 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -23,6 +23,8 @@ import com.twitter.algebird._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.SparkContext._ +import org.apache.spark.streaming.twitter._ + /** * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute * windowed and global Top-K estimates of user IDs occurring in a Twitter stream. diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala index 8bfde2a829..82156060a8 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala @@ -21,7 +21,7 @@ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel import com.twitter.algebird.HyperLogLog._ import com.twitter.algebird.HyperLogLogMonoid -import org.apache.spark.streaming.dstream.TwitterInputDStream +import org.apache.spark.streaming.twitter._ /** * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala index 27aa6b14bf..c4ded5e071 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.examples import org.apache.spark.streaming.{Seconds, StreamingContext} import StreamingContext._ import org.apache.spark.SparkContext._ +import org.apache.spark.streaming.twitter._ /** * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/StreamingContextWithTwitter.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/StreamingContextWithTwitter.scala new file mode 100644 index 0000000000..fe66e28ce6 --- /dev/null +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/StreamingContextWithTwitter.scala @@ -0,0 +1,27 @@ +package org.apache.spark.streaming.twitter + +import twitter4j.Status +import twitter4j.auth.Authorization +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ + + +class StreamingContextWithTwitter(ssc: StreamingContext) { + /** + * Create a input stream that returns tweets received from Twitter. + * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth + * authorization; this uses the system properties twitter4j.oauth.consumerKey, + * .consumerSecret, .accessToken and .accessTokenSecret. + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream( + twitterAuth: Option[Authorization] = None, + filters: Seq[String] = Nil, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[Status] = { + val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel) + ssc.registerInputStream(inputStream) + inputStream + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala index 387e15b0e6..97e48ebeca 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala @@ -15,18 +15,19 @@ * limitations under the License. */ -package org.apache.spark.streaming.dstream +package org.apache.spark.streaming.twitter -import org.apache.spark._ -import org.apache.spark.streaming._ -import storage.StorageLevel +import java.util.prefs.Preferences import twitter4j._ import twitter4j.auth.Authorization -import java.util.prefs.Preferences import twitter4j.conf.ConfigurationBuilder import twitter4j.conf.PropertyConfiguration import twitter4j.auth.OAuthAuthorization import twitter4j.auth.AccessToken +import org.apache.spark._ +import org.apache.spark.streaming._ +import org.apache.spark.streaming.dstream._ +import org.apache.spark.storage.StorageLevel /* A stream of Twitter statuses, potentially filtered by one or more keywords. * @@ -97,3 +98,4 @@ class TwitterReceiver( logInfo("Twitter receiver stopped") } } + diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala new file mode 100644 index 0000000000..89c202a730 --- /dev/null +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala @@ -0,0 +1,7 @@ +package org.apache.spark.streaming + +package object twitter { + implicit def enrichMyStreamingContext(ssc: StreamingContext): StreamingContextWithTwitter = { + new StreamingContextWithTwitter(ssc) + } +} diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 7bcbd90bd3..f9ff781f38 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -50,7 +50,7 @@ object SparkBuild extends Build { .dependsOn(core, bagel, mllib) lazy val examples = Project("examples", file("examples"), settings = examplesSettings) - .dependsOn(core, mllib, bagel, streaming) + .dependsOn(core, mllib, bagel, streaming, externalTwitter) lazy val tools = Project("tools", file("tools"), settings = toolsSettings) dependsOn(core) dependsOn(streaming) @@ -60,6 +60,8 @@ object SparkBuild extends Build { lazy val mllib = Project("mllib", file("mllib"), settings = mllibSettings) dependsOn(core) + lazy val externalTwitter = Project("streaming-twitter", file("external/twitter"), settings = twitterSettings) dependsOn(streaming) + lazy val assemblyProj = Project("assembly", file("assembly"), settings = assemblyProjSettings) .dependsOn(core, bagel, mllib, repl, streaming) dependsOn(maybeYarn: _*) @@ -313,7 +315,7 @@ object SparkBuild extends Build { excludeAll(excludeNetty), "org.eclipse.paho" % "mqtt-client" % "0.4.0", "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty), - "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty), + // "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty), "org.spark-project.akka" %% "akka-zeromq" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty) ) ) @@ -354,4 +356,11 @@ object SparkBuild extends Build { case _ => MergeStrategy.first } ) + + def twitterSettings() = streamingSettings ++ Seq( + name := "spark-twitter", + libraryDependencies ++= Seq( + "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty) + ) + ) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 41da028a3c..25b9b70b2c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -45,8 +45,8 @@ import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path -import twitter4j.Status -import twitter4j.auth.Authorization +//import twitter4j.Status +//import twitter4j.auth.Authorization import org.apache.spark.streaming.scheduler._ import akka.util.ByteString @@ -414,6 +414,7 @@ class StreamingContext private ( fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString) } + /* /** * Create a input stream that returns tweets received from Twitter. * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth @@ -431,6 +432,7 @@ class StreamingContext private ( registerInputStream(inputStream) inputStream } + */ /** * Create an input stream from a queue of RDDs. In each batch, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 78d318cf27..b32cfbb677 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -25,13 +25,13 @@ import scala.collection.JavaConversions._ import scala.reflect.ClassTag import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} -import twitter4j.Status +//import twitter4j.Status import akka.actor.Props import akka.actor.SupervisorStrategy import akka.zeromq.Subscribe import akka.util.ByteString -import twitter4j.auth.Authorization +//import twitter4j.auth.Authorization import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -338,7 +338,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = { ssc.flumeStream(hostname, port) } - + /* /** * Create a input stream that returns tweets received from Twitter. * @param twitterAuth Twitter4J Authorization object @@ -409,7 +409,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { def twitterStream(): JavaDStream[Status] = { ssc.twitterStream() } - + */ /** * Create an input stream with any arbitrary user implemented actor receiver. * @param props Props object defining creation of the actor |