aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-08-06 16:34:53 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-08-20 18:33:16 -0700
commit3f91e9dc2563f3c5c473c781bd3078cc620ff880 (patch)
treeb32f122db42f0e71a035e78f118a3577c918134a /external
parenteba399b3c6768f5106cbc17752630fa81d9cdce4 (diff)
downloadspark-3f91e9dc2563f3c5c473c781bd3078cc620ff880.tar.gz
spark-3f91e9dc2563f3c5c473c781bd3078cc620ff880.tar.bz2
spark-3f91e9dc2563f3c5c473c781bd3078cc620ff880.zip
[HOTFIX][Streaming] Handle port collisions in flume polling test
This is failing my tests in #1777. @tdas Author: Andrew Or <andrewor14@gmail.com> Closes #1803 from andrewor14/fix-flaky-streaming-test and squashes the following commits: ea11a03 [Andrew Or] Catch all exceptions caused by BindExceptions 54a0ca0 [Andrew Or] Merge branch 'master' of github.com:apache/spark into fix-flaky-streaming-test 664095c [Andrew Or] Tone down bind exception message af3ddc9 [Andrew Or] Handle port collisions in flume polling test
Diffstat (limited to 'external')
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala32
1 files changed, 31 insertions, 1 deletions
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index 2e4ac7cfbf..e3a5bdcd24 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -35,6 +35,7 @@ import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingContext}
import org.apache.spark.streaming.flume.sink._
+import org.apache.spark.util.Utils
class FlumePollingStreamSuite extends TestSuiteBase {
@@ -45,8 +46,37 @@ class FlumePollingStreamSuite extends TestSuiteBase {
val eventsPerBatch = 100
val totalEventsPerChannel = batchCount * eventsPerBatch
val channelCapacity = 5000
+ val maxAttempts = 5
test("flume polling test") {
+ testMultipleTimes(testFlumePolling)
+ }
+
+ test("flume polling test multiple hosts") {
+ testMultipleTimes(testFlumePollingMultipleHost)
+ }
+
+ /**
+ * Run the given test until no more java.net.BindException's are thrown.
+ * Do this only up to a certain attempt limit.
+ */
+ private def testMultipleTimes(test: () => Unit): Unit = {
+ var testPassed = false
+ var attempt = 0
+ while (!testPassed && attempt < maxAttempts) {
+ try {
+ test()
+ testPassed = true
+ } catch {
+ case e: Exception if Utils.isBindCollision(e) =>
+ logWarning("Exception when running flume polling test: " + e)
+ attempt += 1
+ }
+ }
+ assert(testPassed, s"Test failed after $attempt attempts!")
+ }
+
+ private def testFlumePolling(): Unit = {
val testPort = getTestPort
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
@@ -80,7 +110,7 @@ class FlumePollingStreamSuite extends TestSuiteBase {
channel.stop()
}
- test("flume polling test multiple hosts") {
+ private def testFlumePollingMultipleHost(): Unit = {
val testPort = getTestPort
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)