aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-04-24 21:34:37 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-04-24 21:34:37 -0700
commitcd12dd9bde91303d0341180e5f70d2a03d6b65b6 (patch)
tree5e9d6510edaa1222e09355c239925cc7559c817d /streaming/src/test
parent968c0187a12f5ae4a696c02c1ff088e998ed7edd (diff)
downloadspark-cd12dd9bde91303d0341180e5f70d2a03d6b65b6.tar.gz
spark-cd12dd9bde91303d0341180e5f70d2a03d6b65b6.tar.bz2
spark-cd12dd9bde91303d0341180e5f70d2a03d6b65b6.zip
[SPARK-1617] and [SPARK-1618] Improvements to streaming ui and bug fix to socket receiver
1617: These changes expose the receiver state (active or inactive) and last error in the UI 1618: If the socket receiver cannot connect in the first attempt, it should try to restart after a delay. That was broken, as the thread that restarts (hence, stops) the receiver waited on Thread.join on itself! Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #540 from tdas/streaming-ui-fix and squashes the following commits: e469434 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-ui-fix dbddf75 [Tathagata Das] Style fix. 66df1a5 [Tathagata Das] Merge remote-tracking branch 'apache/master' into streaming-ui-fix ad98bc9 [Tathagata Das] Refactored streaming listener to use ReceiverInfo. d7f849c [Tathagata Das] Revert "Moved BatchInfo from streaming.scheduler to streaming.ui" 5c80919 [Tathagata Das] Moved BatchInfo from streaming.scheduler to streaming.ui da244f6 [Tathagata Das] Fixed socket receiver as well as made receiver state and error visible in the streamign UI.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala8
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala15
2 files changed, 14 insertions, 9 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
index ff3619a590..303d149d28 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
@@ -94,9 +94,13 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
// Verify restarting actually stops and starts the receiver
receiver.restart("restarting", null, 100)
- assert(receiver.isStopped)
- assert(receiver.onStopCalled)
+ eventually(timeout(50 millis), interval(10 millis)) {
+ // receiver will be stopped async
+ assert(receiver.isStopped)
+ assert(receiver.onStopCalled)
+ }
eventually(timeout(1000 millis), interval(100 millis)) {
+ // receiver will be started async
assert(receiver.onStartCalled)
assert(executor.isReceiverStarted)
assert(receiver.isStarted)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 458dd3a2b1..ef0efa552c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -66,7 +66,7 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
test("receiver info reporting") {
val ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
- val inputStream = ssc.networkStream(new StreamingListenerSuiteReceiver)
+ val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
inputStream.foreachRDD(_.count)
val collector = new ReceiverInfoCollector
@@ -75,8 +75,8 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
ssc.start()
try {
eventually(timeout(1000 millis), interval(20 millis)) {
- collector.startedReceiverInfo should have size 1
- collector.startedReceiverInfo(0).streamId should equal (0)
+ collector.startedReceiverStreamIds.size should be >= 1
+ collector.startedReceiverStreamIds(0) should equal (0)
collector.stoppedReceiverStreamIds should have size 1
collector.stoppedReceiverStreamIds(0) should equal (0)
collector.receiverErrors should have size 1
@@ -108,20 +108,21 @@ class BatchInfoCollector extends StreamingListener {
/** Listener that collects information on processed batches */
class ReceiverInfoCollector extends StreamingListener {
- val startedReceiverInfo = new ArrayBuffer[ReceiverInfo]
+ val startedReceiverStreamIds = new ArrayBuffer[Int]
val stoppedReceiverStreamIds = new ArrayBuffer[Int]()
val receiverErrors = new ArrayBuffer[(Int, String, String)]()
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
- startedReceiverInfo += receiverStarted.receiverInfo
+ startedReceiverStreamIds += receiverStarted.receiverInfo.streamId
}
override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) {
- stoppedReceiverStreamIds += receiverStopped.streamId
+ stoppedReceiverStreamIds += receiverStopped.receiverInfo.streamId
}
override def onReceiverError(receiverError: StreamingListenerReceiverError) {
- receiverErrors += ((receiverError.streamId, receiverError.message, receiverError.error))
+ receiverErrors += ((receiverError.receiverInfo.streamId,
+ receiverError.receiverInfo.lastErrorMessage, receiverError.receiverInfo.lastError))
}
}