diff options
author | Andrew Or <andrewor14@gmail.com> | 2014-08-06 16:34:53 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-08-06 16:34:53 -0700 |
commit | c6889d2cb9cd99f7e3e0ee14a4fdf301f1f9810e (patch) | |
tree | d416aa00bed706d6099bcec7ba4cb499675dcc7d /external | |
parent | e537b33c63d3fb373fe41deaa607d72e76e3906b (diff) | |
download | spark-c6889d2cb9cd99f7e3e0ee14a4fdf301f1f9810e.tar.gz spark-c6889d2cb9cd99f7e3e0ee14a4fdf301f1f9810e.tar.bz2 spark-c6889d2cb9cd99f7e3e0ee14a4fdf301f1f9810e.zip |
[HOTFIX][Streaming] Handle port collisions in flume polling test
This is failing my tests in #1777. @tdas
Author: Andrew Or <andrewor14@gmail.com>
Closes #1803 from andrewor14/fix-flaky-streaming-test and squashes the following commits:
ea11a03 [Andrew Or] Catch all exceptions caused by BindExceptions
54a0ca0 [Andrew Or] Merge branch 'master' of github.com:apache/spark into fix-flaky-streaming-test
664095c [Andrew Or] Tone down bind exception message
af3ddc9 [Andrew Or] Handle port collisions in flume polling test
Diffstat (limited to 'external')
-rw-r--r-- | external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala | 32 |
1 files changed, 31 insertions, 1 deletions
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala index 27bf2ac962..a69baa1698 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala @@ -35,6 +35,7 @@ import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.util.ManualClock import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingContext} import org.apache.spark.streaming.flume.sink._ +import org.apache.spark.util.Utils class FlumePollingStreamSuite extends TestSuiteBase { @@ -45,8 +46,37 @@ class FlumePollingStreamSuite extends TestSuiteBase { val eventsPerBatch = 100 val totalEventsPerChannel = batchCount * eventsPerBatch val channelCapacity = 5000 + val maxAttempts = 5 test("flume polling test") { + testMultipleTimes(testFlumePolling) + } + + test("flume polling test multiple hosts") { + testMultipleTimes(testFlumePollingMultipleHost) + } + + /** + * Run the given test until no more java.net.BindException's are thrown. + * Do this only up to a certain attempt limit. + */ + private def testMultipleTimes(test: () => Unit): Unit = { + var testPassed = false + var attempt = 0 + while (!testPassed && attempt < maxAttempts) { + try { + test() + testPassed = true + } catch { + case e: Exception if Utils.isBindCollision(e) => + logWarning("Exception when running flume polling test: " + e) + attempt += 1 + } + } + assert(testPassed, s"Test failed after $attempt attempts!") + } + + private def testFlumePolling(): Unit = { val testPort = getTestPort // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) @@ -80,7 +110,7 @@ class FlumePollingStreamSuite extends TestSuiteBase { channel.stop() } - test("flume polling test multiple hosts") { + private def testFlumePollingMultipleHost(): Unit = { val testPort = getTestPort // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) |