diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-04-24 21:34:37 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-04-24 21:34:37 -0700 |
commit | cd12dd9bde91303d0341180e5f70d2a03d6b65b6 (patch) | |
tree | 5e9d6510edaa1222e09355c239925cc7559c817d /streaming/src/test | |
parent | 968c0187a12f5ae4a696c02c1ff088e998ed7edd (diff) | |
download | spark-cd12dd9bde91303d0341180e5f70d2a03d6b65b6.tar.gz spark-cd12dd9bde91303d0341180e5f70d2a03d6b65b6.tar.bz2 spark-cd12dd9bde91303d0341180e5f70d2a03d6b65b6.zip |
[SPARK-1617] and [SPARK-1618] Improvements to streaming ui and bug fix to socket receiver
1617: These changes expose the receiver state (active or inactive) and last error in the UI
1618: If the socket receiver cannot connect in the first attempt, it should try to restart after a delay. That was broken, as the thread that restarts (hence, stops) the receiver waited on Thread.join on itself!
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #540 from tdas/streaming-ui-fix and squashes the following commits:
e469434 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-ui-fix
dbddf75 [Tathagata Das] Style fix.
66df1a5 [Tathagata Das] Merge remote-tracking branch 'apache/master' into streaming-ui-fix
ad98bc9 [Tathagata Das] Refactored streaming listener to use ReceiverInfo.
d7f849c [Tathagata Das] Revert "Moved BatchInfo from streaming.scheduler to streaming.ui"
5c80919 [Tathagata Das] Moved BatchInfo from streaming.scheduler to streaming.ui
da244f6 [Tathagata Das] Fixed socket receiver as well as made receiver state and error visible in the streamign UI.
Diffstat (limited to 'streaming/src/test')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala | 8 | ||||
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala | 15 |
2 files changed, 14 insertions, 9 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala index ff3619a590..303d149d28 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala @@ -94,9 +94,13 @@ class NetworkReceiverSuite extends FunSuite with Timeouts { // Verify restarting actually stops and starts the receiver receiver.restart("restarting", null, 100) - assert(receiver.isStopped) - assert(receiver.onStopCalled) + eventually(timeout(50 millis), interval(10 millis)) { + // receiver will be stopped async + assert(receiver.isStopped) + assert(receiver.onStopCalled) + } eventually(timeout(1000 millis), interval(100 millis)) { + // receiver will be started async assert(receiver.onStartCalled) assert(executor.isReceiverStarted) assert(receiver.isStarted) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 458dd3a2b1..ef0efa552c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -66,7 +66,7 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers { test("receiver info reporting") { val ssc = new StreamingContext("local[2]", "test", Milliseconds(1000)) - val inputStream = ssc.networkStream(new StreamingListenerSuiteReceiver) + val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver) inputStream.foreachRDD(_.count) val collector = new ReceiverInfoCollector @@ -75,8 +75,8 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers { ssc.start() try { eventually(timeout(1000 millis), interval(20 millis)) { - collector.startedReceiverInfo should have size 1 - collector.startedReceiverInfo(0).streamId should equal (0) + collector.startedReceiverStreamIds.size should be >= 1 + collector.startedReceiverStreamIds(0) should equal (0) collector.stoppedReceiverStreamIds should have size 1 collector.stoppedReceiverStreamIds(0) should equal (0) collector.receiverErrors should have size 1 @@ -108,20 +108,21 @@ class BatchInfoCollector extends StreamingListener { /** Listener that collects information on processed batches */ class ReceiverInfoCollector extends StreamingListener { - val startedReceiverInfo = new ArrayBuffer[ReceiverInfo] + val startedReceiverStreamIds = new ArrayBuffer[Int] val stoppedReceiverStreamIds = new ArrayBuffer[Int]() val receiverErrors = new ArrayBuffer[(Int, String, String)]() override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { - startedReceiverInfo += receiverStarted.receiverInfo + startedReceiverStreamIds += receiverStarted.receiverInfo.streamId } override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { - stoppedReceiverStreamIds += receiverStopped.streamId + stoppedReceiverStreamIds += receiverStopped.receiverInfo.streamId } override def onReceiverError(receiverError: StreamingListenerReceiverError) { - receiverErrors += ((receiverError.streamId, receiverError.message, receiverError.error)) + receiverErrors += ((receiverError.receiverInfo.streamId, + receiverError.receiverInfo.lastErrorMessage, receiverError.receiverInfo.lastError)) } } |