aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-08-31 12:19:11 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-31 12:19:11 -0700
commit4a5fe091658b1d06f427e404a11a84fc84f953c5 (patch)
tree42f7a7060d5d4c62302824c7b0f329efc7bce01b /streaming/src/test
parent72f6dbf7b0c8b271f5f9c762374422c69c8ab43d (diff)
downloadspark-4a5fe091658b1d06f427e404a11a84fc84f953c5.tar.gz
spark-4a5fe091658b1d06f427e404a11a84fc84f953c5.tar.bz2
spark-4a5fe091658b1d06f427e404a11a84fc84f953c5.zip
[SPARK-10369] [STREAMING] Don't remove ReceiverTrackingInfo when deregisterReceivering since we may reuse it later
`deregisterReceiver` should not remove `ReceiverTrackingInfo`. Otherwise, it will throw `java.util.NoSuchElementException: key not found` when restarting it. Author: zsxwing <zsxwing@gmail.com> Closes #8538 from zsxwing/SPARK-10369.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala51
1 files changed, 51 insertions, 0 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
index dd292ba4dd..45138b748e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -60,6 +60,26 @@ class ReceiverTrackerSuite extends TestSuiteBase {
}
}
}
+
+ test("should restart receiver after stopping it") {
+ withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc =>
+ @volatile var startTimes = 0
+ ssc.addStreamingListener(new StreamingListener {
+ override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {
+ startTimes += 1
+ }
+ })
+ val input = ssc.receiverStream(new StoppableReceiver)
+ val output = new TestOutputStream(input)
+ output.register()
+ ssc.start()
+ StoppableReceiver.shouldStop = true
+ eventually(timeout(10 seconds), interval(10 millis)) {
+ // The receiver is stopped once, so if it's restarted, it should be started twice.
+ assert(startTimes === 2)
+ }
+ }
+ }
}
/** An input DStream with for testing rate controlling */
@@ -132,3 +152,34 @@ private[streaming] object RateTestReceiver {
def getActive(): Option[RateTestReceiver] = Option(activeReceiver)
}
+
+/**
+ * A custom receiver that could be stopped via StoppableReceiver.shouldStop
+ */
+class StoppableReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
+
+ var receivingThreadOption: Option[Thread] = None
+
+ def onStart() {
+ val thread = new Thread() {
+ override def run() {
+ while (!StoppableReceiver.shouldStop) {
+ Thread.sleep(10)
+ }
+ StoppableReceiver.this.stop("stop")
+ }
+ }
+ thread.start()
+ }
+
+ def onStop() {
+ StoppableReceiver.shouldStop = true
+ receivingThreadOption.foreach(_.join())
+ // Reset it so as to restart it
+ StoppableReceiver.shouldStop = false
+ }
+}
+
+object StoppableReceiver {
+ @volatile var shouldStop = false
+}