aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala32
1 files changed, 31 insertions, 1 deletions
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index 2e4ac7cfbf..e3a5bdcd24 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -35,6 +35,7 @@ import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingContext}
import org.apache.spark.streaming.flume.sink._
+import org.apache.spark.util.Utils
class FlumePollingStreamSuite extends TestSuiteBase {
@@ -45,8 +46,37 @@ class FlumePollingStreamSuite extends TestSuiteBase {
val eventsPerBatch = 100
val totalEventsPerChannel = batchCount * eventsPerBatch
val channelCapacity = 5000
+ val maxAttempts = 5
test("flume polling test") {
+ testMultipleTimes(testFlumePolling)
+ }
+
+ test("flume polling test multiple hosts") {
+ testMultipleTimes(testFlumePollingMultipleHost)
+ }
+
+ /**
+ * Run the given test until no more java.net.BindException's are thrown.
+ * Do this only up to a certain attempt limit.
+ */
+ private def testMultipleTimes(test: () => Unit): Unit = {
+ var testPassed = false
+ var attempt = 0
+ while (!testPassed && attempt < maxAttempts) {
+ try {
+ test()
+ testPassed = true
+ } catch {
+ case e: Exception if Utils.isBindCollision(e) =>
+ logWarning("Exception when running flume polling test: " + e)
+ attempt += 1
+ }
+ }
+ assert(testPassed, s"Test failed after $attempt attempts!")
+ }
+
+ private def testFlumePolling(): Unit = {
val testPort = getTestPort
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
@@ -80,7 +110,7 @@ class FlumePollingStreamSuite extends TestSuiteBase {
channel.stop()
}
- test("flume polling test multiple hosts") {
+ private def testFlumePollingMultipleHost(): Unit = {
val testPort = getTestPort
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)