diff options
author | zsxwing <zsxwing@gmail.com> | 2015-08-31 12:19:11 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-08-31 12:19:11 -0700 |
commit | 4a5fe091658b1d06f427e404a11a84fc84f953c5 (patch) | |
tree | 42f7a7060d5d4c62302824c7b0f329efc7bce01b /streaming/src/test | |
parent | 72f6dbf7b0c8b271f5f9c762374422c69c8ab43d (diff) | |
download | spark-4a5fe091658b1d06f427e404a11a84fc84f953c5.tar.gz spark-4a5fe091658b1d06f427e404a11a84fc84f953c5.tar.bz2 spark-4a5fe091658b1d06f427e404a11a84fc84f953c5.zip |
[SPARK-10369] [STREAMING] Don't remove ReceiverTrackingInfo when deregisterReceivering since we may reuse it later
`deregisterReceiver` should not remove `ReceiverTrackingInfo`. Otherwise, it will throw `java.util.NoSuchElementException: key not found` when restarting it.
Author: zsxwing <zsxwing@gmail.com>
Closes #8538 from zsxwing/SPARK-10369.
Diffstat (limited to 'streaming/src/test')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala | 51 |
1 files changed, 51 insertions, 0 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index dd292ba4dd..45138b748e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -60,6 +60,26 @@ class ReceiverTrackerSuite extends TestSuiteBase { } } } + + test("should restart receiver after stopping it") { + withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc => + @volatile var startTimes = 0 + ssc.addStreamingListener(new StreamingListener { + override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = { + startTimes += 1 + } + }) + val input = ssc.receiverStream(new StoppableReceiver) + val output = new TestOutputStream(input) + output.register() + ssc.start() + StoppableReceiver.shouldStop = true + eventually(timeout(10 seconds), interval(10 millis)) { + // The receiver is stopped once, so if it's restarted, it should be started twice. + assert(startTimes === 2) + } + } + } } /** An input DStream with for testing rate controlling */ @@ -132,3 +152,34 @@ private[streaming] object RateTestReceiver { def getActive(): Option[RateTestReceiver] = Option(activeReceiver) } + +/** + * A custom receiver that could be stopped via StoppableReceiver.shouldStop + */ +class StoppableReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) { + + var receivingThreadOption: Option[Thread] = None + + def onStart() { + val thread = new Thread() { + override def run() { + while (!StoppableReceiver.shouldStop) { + Thread.sleep(10) + } + StoppableReceiver.this.stop("stop") + } + } + thread.start() + } + + def onStop() { + StoppableReceiver.shouldStop = true + receivingThreadOption.foreach(_.join()) + // Reset it so as to restart it + StoppableReceiver.shouldStop = false + } +} + +object StoppableReceiver { + @volatile var shouldStop = false +} |