diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-05-14 04:17:32 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-05-14 04:17:32 -0700 |
commit | 68f28dabe9c7679be82e684385be216319beb610 (patch) | |
tree | b5906b0ebf7c5397a4ef058117f3d4978fc5bb29 /external/twitter | |
parent | 69f750228f3ec8537a93da08e712596fa8004143 (diff) | |
download | spark-68f28dabe9c7679be82e684385be216319beb610.tar.gz spark-68f28dabe9c7679be82e684385be216319beb610.tar.bz2 spark-68f28dabe9c7679be82e684385be216319beb610.zip |
Fixed streaming examples docs to use run-example instead of spark-submit
Pretty self-explanatory
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #722 from tdas/example-fix and squashes the following commits:
7839979 [Tathagata Das] Minor changes.
0673441 [Tathagata Das] Fixed java docs of java streaming example
e687123 [Tathagata Das] Fixed scala style errors.
9b8d112 [Tathagata Das] Fixed streaming examples docs to use run-example instead of spark-submit.
Diffstat (limited to 'external/twitter')
-rw-r--r-- | external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala | 58 |
1 files changed, 35 insertions, 23 deletions
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala index 7bca140711..5ea2e5549d 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala @@ -63,36 +63,48 @@ class TwitterReceiver( storageLevel: StorageLevel ) extends Receiver[Status](storageLevel) with Logging { - var twitterStream: TwitterStream = _ + private var twitterStream: TwitterStream = _ def onStart() { - twitterStream = new TwitterStreamFactory().getInstance(twitterAuth) - twitterStream.addListener(new StatusListener { - def onStatus(status: Status) = { - store(status) - } - // Unimplemented - def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {} - def onTrackLimitationNotice(i: Int) {} - def onScrubGeo(l: Long, l1: Long) {} - def onStallWarning(stallWarning: StallWarning) {} - def onException(e: Exception) { - restart("Error receiving tweets", e) - } - }) + try { + val newTwitterStream = new TwitterStreamFactory().getInstance(twitterAuth) + newTwitterStream.addListener(new StatusListener { + def onStatus(status: Status) = { + store(status) + } + // Unimplemented + def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {} + def onTrackLimitationNotice(i: Int) {} + def onScrubGeo(l: Long, l1: Long) {} + def onStallWarning(stallWarning: StallWarning) {} + def onException(e: Exception) { + restart("Error receiving tweets", e) + } + }) - val query = new FilterQuery - if (filters.size > 0) { - query.track(filters.toArray) - twitterStream.filter(query) - } else { - twitterStream.sample() + val query = new FilterQuery + if (filters.size > 0) { + query.track(filters.toArray) + newTwitterStream.filter(query) + } else { + newTwitterStream.sample() + } + setTwitterStream(newTwitterStream) + logInfo("Twitter receiver started") + } catch { + case e: Exception => restart("Error starting Twitter stream", e) } - logInfo("Twitter receiver started") } def onStop() { - twitterStream.shutdown() + setTwitterStream(null) logInfo("Twitter receiver stopped") } + + private def setTwitterStream(newTwitterStream: TwitterStream) = synchronized { + if (twitterStream != null) { + twitterStream.shutdown() + } + twitterStream = newTwitterStream + } } |