aboutsummaryrefslogtreecommitdiff
path: root/external/mqtt/src/test
diff options
context:
space:
mode:
authorIulian Dragos <jaguarul@gmail.com>2015-02-02 14:00:33 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-02-02 14:00:33 -0800
commite908322cd5991e6cbdaaafb8cd494759dac01225 (patch)
tree66ce728fd4de2216dea0009eac5fc5c783a1cec5 /external/mqtt/src/test
parent683e938242e29a0d584452e5230b4168b85bdab2 (diff)
downloadspark-e908322cd5991e6cbdaaafb8cd494759dac01225.tar.gz
spark-e908322cd5991e6cbdaaafb8cd494759dac01225.tar.bz2
spark-e908322cd5991e6cbdaaafb8cd494759dac01225.zip
[SPARK-4631][streaming][FIX] Wait for a receiver to start before publishing test data.
This fixes two sources of non-deterministic failures in this test: - wait for a receiver to be up before pushing data through MQTT - gracefully handle the case where the MQTT client is overloaded. There’s a hard-coded limit of 10 in-flight messages, and this test may hit it. Instead of crashing, we retry sending the message. Both of these are needed to make the test pass reliably on my machine. Author: Iulian Dragos <jaguarul@gmail.com> Closes #4270 from dragos/issue/fix-flaky-test-SPARK-4631 and squashes the following commits: f66c482 [Iulian Dragos] [SPARK-4631][streaming] Wait for a receiver to start before publishing test data. d408a8e [Iulian Dragos] Install callback before connecting to MQTT broker.
Diffstat (limited to 'external/mqtt/src/test')
-rw-r--r--external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala35
1 files changed, 32 insertions, 3 deletions
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index fe53a29cba..e84adc088a 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -18,6 +18,8 @@
package org.apache.spark.streaming.mqtt
import java.net.{URI, ServerSocket}
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
import scala.language.postfixOps
@@ -32,6 +34,8 @@ import org.scalatest.concurrent.Eventually
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.scheduler.StreamingListener
+import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
import org.apache.spark.SparkConf
import org.apache.spark.util.Utils
@@ -67,7 +71,7 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
val sendMessage = "MQTT demo for spark streaming"
val receiveStream: ReceiverInputDStream[String] =
MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY)
- var receiveMessage: List[String] = List()
+ @volatile var receiveMessage: List[String] = List()
receiveStream.foreachRDD { rdd =>
if (rdd.collect.length > 0) {
receiveMessage = receiveMessage ::: List(rdd.first)
@@ -75,6 +79,11 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
}
}
ssc.start()
+
+ // wait for the receiver to start before publishing data, or we risk failing
+ // the test nondeterministically. See SPARK-4631
+ waitForReceiverToStart()
+
publishData(sendMessage)
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
assert(sendMessage.equals(receiveMessage(0)))
@@ -121,8 +130,14 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
val message: MqttMessage = new MqttMessage(data.getBytes("utf-8"))
message.setQos(1)
message.setRetained(true)
- for (i <- 0 to 100) {
- msgTopic.publish(message)
+
+ for (i <- 0 to 10) {
+ try {
+ msgTopic.publish(message)
+ } catch {
+ case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT =>
+ Thread.sleep(50) // wait for Spark streaming to consume something from the message queue
+ }
}
}
} finally {
@@ -131,4 +146,18 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
client = null
}
}
+
+ /**
+ * Block until at least one receiver has started or timeout occurs.
+ */
+ private def waitForReceiverToStart() = {
+ val latch = new CountDownLatch(1)
+ ssc.addStreamingListener(new StreamingListener {
+ override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
+ latch.countDown()
+ }
+ })
+
+ assert(latch.await(10, TimeUnit.SECONDS), "Timeout waiting for receiver to start.")
+ }
}