aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-26 18:02:49 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-26 18:02:49 -0800
commit6e43039614ed1ec55a134fb82fb3e8d4e80996ef (patch)
tree14b2325558807a1a0f82a79707c2d2d975f008b7 /streaming
parente240bad03b9f9e19cb84b0914b729c8d109d4815 (diff)
downloadspark-6e43039614ed1ec55a134fb82fb3e8d4e80996ef.tar.gz
spark-6e43039614ed1ec55a134fb82fb3e8d4e80996ef.tar.bz2
spark-6e43039614ed1ec55a134fb82fb3e8d4e80996ef.zip
Refactored streaming project to separate out the twitter functionality.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala99
3 files changed, 8 insertions, 105 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 41da028a3c..25b9b70b2c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -45,8 +45,8 @@ import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
-import twitter4j.Status
-import twitter4j.auth.Authorization
+//import twitter4j.Status
+//import twitter4j.auth.Authorization
import org.apache.spark.streaming.scheduler._
import akka.util.ByteString
@@ -414,6 +414,7 @@ class StreamingContext private (
fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}
+ /*
/**
* Create a input stream that returns tweets received from Twitter.
* @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth
@@ -431,6 +432,7 @@ class StreamingContext private (
registerInputStream(inputStream)
inputStream
}
+ */
/**
* Create an input stream from a queue of RDDs. In each batch,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 78d318cf27..b32cfbb677 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -25,13 +25,13 @@ import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-import twitter4j.Status
+//import twitter4j.Status
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.zeromq.Subscribe
import akka.util.ByteString
-import twitter4j.auth.Authorization
+//import twitter4j.auth.Authorization
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
@@ -338,7 +338,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = {
ssc.flumeStream(hostname, port)
}
-
+ /*
/**
* Create a input stream that returns tweets received from Twitter.
* @param twitterAuth Twitter4J Authorization object
@@ -409,7 +409,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
def twitterStream(): JavaDStream[Status] = {
ssc.twitterStream()
}
-
+ */
/**
* Create an input stream with any arbitrary user implemented actor receiver.
* @param props Props object defining creation of the actor
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala
deleted file mode 100644
index 387e15b0e6..0000000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.dstream
-
-import org.apache.spark._
-import org.apache.spark.streaming._
-import storage.StorageLevel
-import twitter4j._
-import twitter4j.auth.Authorization
-import java.util.prefs.Preferences
-import twitter4j.conf.ConfigurationBuilder
-import twitter4j.conf.PropertyConfiguration
-import twitter4j.auth.OAuthAuthorization
-import twitter4j.auth.AccessToken
-
-/* A stream of Twitter statuses, potentially filtered by one or more keywords.
-*
-* @constructor create a new Twitter stream using the supplied Twitter4J authentication credentials.
-* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is
-* such that this may return a sampled subset of all tweets during each interval.
-*
-* If no Authorization object is provided, initializes OAuth authorization using the system
-* properties twitter4j.oauth.consumerKey, .consumerSecret, .accessToken and .accessTokenSecret.
-*/
-private[streaming]
-class TwitterInputDStream(
- @transient ssc_ : StreamingContext,
- twitterAuth: Option[Authorization],
- filters: Seq[String],
- storageLevel: StorageLevel
- ) extends NetworkInputDStream[Status](ssc_) {
-
- private def createOAuthAuthorization(): Authorization = {
- new OAuthAuthorization(new ConfigurationBuilder().build())
- }
-
- private val authorization = twitterAuth.getOrElse(createOAuthAuthorization())
-
- override def getReceiver(): NetworkReceiver[Status] = {
- new TwitterReceiver(authorization, filters, storageLevel)
- }
-}
-
-private[streaming]
-class TwitterReceiver(
- twitterAuth: Authorization,
- filters: Seq[String],
- storageLevel: StorageLevel
- ) extends NetworkReceiver[Status] {
-
- var twitterStream: TwitterStream = _
- lazy val blockGenerator = new BlockGenerator(storageLevel)
-
- protected override def onStart() {
- blockGenerator.start()
- twitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
- twitterStream.addListener(new StatusListener {
- def onStatus(status: Status) = {
- blockGenerator += status
- }
- // Unimplemented
- def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
- def onTrackLimitationNotice(i: Int) {}
- def onScrubGeo(l: Long, l1: Long) {}
- def onStallWarning(stallWarning: StallWarning) {}
- def onException(e: Exception) { stopOnError(e) }
- })
-
- val query: FilterQuery = new FilterQuery
- if (filters.size > 0) {
- query.track(filters.toArray)
- twitterStream.filter(query)
- } else {
- twitterStream.sample()
- }
- logInfo("Twitter receiver started")
- }
-
- protected override def onStop() {
- blockGenerator.stop()
- twitterStream.shutdown()
- logInfo("Twitter receiver stopped")
- }
-}