aboutsummaryrefslogtreecommitdiff
path: root/external/twitter/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-07-24 15:59:09 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-07-24 16:00:49 -0700
commit53b4e0f95750f371db14f8da4b5c4a1c38301710 (patch)
tree2bfc28ac6024c3c8e5310aeba4393033f0084d86 /external/twitter/src
parent91241592a4e8eb6ca9e0c7ff81b1eb1d100377aa (diff)
downloadspark-53b4e0f95750f371db14f8da4b5c4a1c38301710.tar.gz
spark-53b4e0f95750f371db14f8da4b5c4a1c38301710.tar.bz2
spark-53b4e0f95750f371db14f8da4b5c4a1c38301710.zip
[SPARK-2464][Streaming] Fixed Twitter stream stopping bug
Stopping the Twitter Receiver would call twitter4j's TwitterStream.shutdown, which in turn causes an Exception to be thrown to the listener. This exception caused the Receiver to be restarted. This patch check whether the receiver was stopped or not, and accordingly restarts on exception. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #1577 from tdas/twitter-stop and squashes the following commits: 011b525 [Tathagata Das] Fixed Twitter stream stopping bug. (cherry picked from commit a45d5480f65d2e969fc7fbd8f358b1717fb99bef) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Diffstat (limited to 'external/twitter/src')
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala9
1 files changed, 7 insertions, 2 deletions
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index 5ea2e5549d..4eacc47da5 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -63,7 +63,8 @@ class TwitterReceiver(
storageLevel: StorageLevel
) extends Receiver[Status](storageLevel) with Logging {
- private var twitterStream: TwitterStream = _
+ @volatile private var twitterStream: TwitterStream = _
+ @volatile private var stopped = false
def onStart() {
try {
@@ -78,7 +79,9 @@ class TwitterReceiver(
def onScrubGeo(l: Long, l1: Long) {}
def onStallWarning(stallWarning: StallWarning) {}
def onException(e: Exception) {
- restart("Error receiving tweets", e)
+ if (!stopped) {
+ restart("Error receiving tweets", e)
+ }
}
})
@@ -91,12 +94,14 @@ class TwitterReceiver(
}
setTwitterStream(newTwitterStream)
logInfo("Twitter receiver started")
+ stopped = false
} catch {
case e: Exception => restart("Error starting Twitter stream", e)
}
}
def onStop() {
+ stopped = true
setTwitterStream(null)
logInfo("Twitter receiver stopped")
}