aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
Diffstat (limited to 'external')
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala9
1 files changed, 7 insertions, 2 deletions
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index 5ea2e5549d..4eacc47da5 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -63,7 +63,8 @@ class TwitterReceiver(
storageLevel: StorageLevel
) extends Receiver[Status](storageLevel) with Logging {
- private var twitterStream: TwitterStream = _
+ @volatile private var twitterStream: TwitterStream = _
+ @volatile private var stopped = false
def onStart() {
try {
@@ -78,7 +79,9 @@ class TwitterReceiver(
def onScrubGeo(l: Long, l1: Long) {}
def onStallWarning(stallWarning: StallWarning) {}
def onException(e: Exception) {
- restart("Error receiving tweets", e)
+ if (!stopped) {
+ restart("Error receiving tweets", e)
+ }
}
})
@@ -91,12 +94,14 @@ class TwitterReceiver(
}
setTwitterStream(newTwitterStream)
logInfo("Twitter receiver started")
+ stopped = false
} catch {
case e: Exception => restart("Error starting Twitter stream", e)
}
}
def onStop() {
+ stopped = true
setTwitterStream(null)
logInfo("Twitter receiver stopped")
}