aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-05-14 04:17:32 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-05-14 04:17:32 -0700
commit68f28dabe9c7679be82e684385be216319beb610 (patch)
treeb5906b0ebf7c5397a4ef058117f3d4978fc5bb29 /external
parent69f750228f3ec8537a93da08e712596fa8004143 (diff)
downloadspark-68f28dabe9c7679be82e684385be216319beb610.tar.gz
spark-68f28dabe9c7679be82e684385be216319beb610.tar.bz2
spark-68f28dabe9c7679be82e684385be216319beb610.zip
Fixed streaming examples docs to use run-example instead of spark-submit
Pretty self-explanatory Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #722 from tdas/example-fix and squashes the following commits: 7839979 [Tathagata Das] Minor changes. 0673441 [Tathagata Das] Fixed java docs of java streaming example e687123 [Tathagata Das] Fixed scala style errors. 9b8d112 [Tathagata Das] Fixed streaming examples docs to use run-example instead of spark-submit.
Diffstat (limited to 'external')
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala58
1 files changed, 35 insertions, 23 deletions
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index 7bca140711..5ea2e5549d 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -63,36 +63,48 @@ class TwitterReceiver(
storageLevel: StorageLevel
) extends Receiver[Status](storageLevel) with Logging {
- var twitterStream: TwitterStream = _
+ private var twitterStream: TwitterStream = _
def onStart() {
- twitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
- twitterStream.addListener(new StatusListener {
- def onStatus(status: Status) = {
- store(status)
- }
- // Unimplemented
- def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
- def onTrackLimitationNotice(i: Int) {}
- def onScrubGeo(l: Long, l1: Long) {}
- def onStallWarning(stallWarning: StallWarning) {}
- def onException(e: Exception) {
- restart("Error receiving tweets", e)
- }
- })
+ try {
+ val newTwitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
+ newTwitterStream.addListener(new StatusListener {
+ def onStatus(status: Status) = {
+ store(status)
+ }
+ // Unimplemented
+ def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
+ def onTrackLimitationNotice(i: Int) {}
+ def onScrubGeo(l: Long, l1: Long) {}
+ def onStallWarning(stallWarning: StallWarning) {}
+ def onException(e: Exception) {
+ restart("Error receiving tweets", e)
+ }
+ })
- val query = new FilterQuery
- if (filters.size > 0) {
- query.track(filters.toArray)
- twitterStream.filter(query)
- } else {
- twitterStream.sample()
+ val query = new FilterQuery
+ if (filters.size > 0) {
+ query.track(filters.toArray)
+ newTwitterStream.filter(query)
+ } else {
+ newTwitterStream.sample()
+ }
+ setTwitterStream(newTwitterStream)
+ logInfo("Twitter receiver started")
+ } catch {
+ case e: Exception => restart("Error starting Twitter stream", e)
}
- logInfo("Twitter receiver started")
}
def onStop() {
- twitterStream.shutdown()
+ setTwitterStream(null)
logInfo("Twitter receiver stopped")
}
+
+ private def setTwitterStream(newTwitterStream: TwitterStream) = synchronized {
+ if (twitterStream != null) {
+ twitterStream.shutdown()
+ }
+ twitterStream = newTwitterStream
+ }
}