aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
Diffstat (limited to 'external')
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala58
1 files changed, 35 insertions, 23 deletions
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index 7bca140711..5ea2e5549d 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -63,36 +63,48 @@ class TwitterReceiver(
storageLevel: StorageLevel
) extends Receiver[Status](storageLevel) with Logging {
- var twitterStream: TwitterStream = _
+ private var twitterStream: TwitterStream = _
def onStart() {
- twitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
- twitterStream.addListener(new StatusListener {
- def onStatus(status: Status) = {
- store(status)
- }
- // Unimplemented
- def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
- def onTrackLimitationNotice(i: Int) {}
- def onScrubGeo(l: Long, l1: Long) {}
- def onStallWarning(stallWarning: StallWarning) {}
- def onException(e: Exception) {
- restart("Error receiving tweets", e)
- }
- })
+ try {
+ val newTwitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
+ newTwitterStream.addListener(new StatusListener {
+ def onStatus(status: Status) = {
+ store(status)
+ }
+ // Unimplemented
+ def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
+ def onTrackLimitationNotice(i: Int) {}
+ def onScrubGeo(l: Long, l1: Long) {}
+ def onStallWarning(stallWarning: StallWarning) {}
+ def onException(e: Exception) {
+ restart("Error receiving tweets", e)
+ }
+ })
- val query = new FilterQuery
- if (filters.size > 0) {
- query.track(filters.toArray)
- twitterStream.filter(query)
- } else {
- twitterStream.sample()
+ val query = new FilterQuery
+ if (filters.size > 0) {
+ query.track(filters.toArray)
+ newTwitterStream.filter(query)
+ } else {
+ newTwitterStream.sample()
+ }
+ setTwitterStream(newTwitterStream)
+ logInfo("Twitter receiver started")
+ } catch {
+ case e: Exception => restart("Error starting Twitter stream", e)
}
- logInfo("Twitter receiver started")
}
def onStop() {
- twitterStream.shutdown()
+ setTwitterStream(null)
logInfo("Twitter receiver stopped")
}
+
+ private def setTwitterStream(newTwitterStream: TwitterStream) = synchronized {
+ if (twitterStream != null) {
+ twitterStream.shutdown()
+ }
+ twitterStream = newTwitterStream
+ }
}