diff options
Diffstat (limited to 'external')
-rw-r--r-- | external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala | 58 |
1 files changed, 35 insertions, 23 deletions
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala index 7bca140711..5ea2e5549d 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala @@ -63,36 +63,48 @@ class TwitterReceiver( storageLevel: StorageLevel ) extends Receiver[Status](storageLevel) with Logging { - var twitterStream: TwitterStream = _ + private var twitterStream: TwitterStream = _ def onStart() { - twitterStream = new TwitterStreamFactory().getInstance(twitterAuth) - twitterStream.addListener(new StatusListener { - def onStatus(status: Status) = { - store(status) - } - // Unimplemented - def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {} - def onTrackLimitationNotice(i: Int) {} - def onScrubGeo(l: Long, l1: Long) {} - def onStallWarning(stallWarning: StallWarning) {} - def onException(e: Exception) { - restart("Error receiving tweets", e) - } - }) + try { + val newTwitterStream = new TwitterStreamFactory().getInstance(twitterAuth) + newTwitterStream.addListener(new StatusListener { + def onStatus(status: Status) = { + store(status) + } + // Unimplemented + def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {} + def onTrackLimitationNotice(i: Int) {} + def onScrubGeo(l: Long, l1: Long) {} + def onStallWarning(stallWarning: StallWarning) {} + def onException(e: Exception) { + restart("Error receiving tweets", e) + } + }) - val query = new FilterQuery - if (filters.size > 0) { - query.track(filters.toArray) - twitterStream.filter(query) - } else { - twitterStream.sample() + val query = new FilterQuery + if (filters.size > 0) { + query.track(filters.toArray) + newTwitterStream.filter(query) + } else { + newTwitterStream.sample() + } + setTwitterStream(newTwitterStream) + logInfo("Twitter receiver started") + } catch { + case e: Exception => restart("Error starting Twitter stream", e) } - logInfo("Twitter receiver started") } def onStop() { - twitterStream.shutdown() + setTwitterStream(null) logInfo("Twitter receiver stopped") } + + private def setTwitterStream(newTwitterStream: TwitterStream) = synchronized { + if (twitterStream != null) { + twitterStream.shutdown() + } + twitterStream = newTwitterStream + } } |