aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-03-10 14:38:19 -0800
committerYin Huai <yhuai@databricks.com>2016-03-10 14:38:19 -0800
commit3d2b6f56e38ce867ae8819752fd693adab9a8cc9 (patch)
treee99cbd06981e1fe68aab67a52138f2666bebb8ad /sql/core/src
parent747d2f5381b3ef0b2663ca7def42d3dc43ee13d7 (diff)
downloadspark-3d2b6f56e38ce867ae8819752fd693adab9a8cc9.tar.gz
spark-3d2b6f56e38ce867ae8819752fd693adab9a8cc9.tar.bz2
spark-3d2b6f56e38ce867ae8819752fd693adab9a8cc9.zip
[SQL][TEST] Increased timeouts to reduce flakiness in ContinuousQueryManagerSuite
## What changes were proposed in this pull request? ContinuousQueryManager is sometimes flaky on Jenkins. I could not reproduce it on my machine, so I guess it about the waiting times which causes problems if Jenkins is loaded. I have increased the wait time in the hope that it will be less flaky. ## How was this patch tested? I reran the unit test many times on a loop in my machine. I am going to run it a few time in Jenkins, that's the real test. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #11638 from tdas/cqm-flaky-test.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala30
1 files changed, 15 insertions, 15 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
index 35bb9fdbfd..45e824ad63 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
@@ -148,9 +148,9 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
// awaitAnyTermination should be blocking or non-blocking depending on timeout values
testAwaitAnyTermination(
ExpectBlocked,
- awaitTimeout = 2 seconds,
+ awaitTimeout = 4 seconds,
expectedReturnedValue = false,
- testBehaviorFor = 1 second)
+ testBehaviorFor = 2 seconds)
testAwaitAnyTermination(
ExpectNotBlocked,
@@ -162,20 +162,20 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
val q1 = stopRandomQueryAsync(stopAfter = 100 milliseconds, withError = false)
testAwaitAnyTermination(
ExpectNotBlocked,
- awaitTimeout = 1 second,
+ awaitTimeout = 2 seconds,
expectedReturnedValue = true,
- testBehaviorFor = 2 seconds)
+ testBehaviorFor = 4 seconds)
require(!q1.isActive) // should be inactive by the time the prev awaitAnyTerm returned
// All subsequent calls to awaitAnyTermination should be non-blocking even if timeout is high
testAwaitAnyTermination(
- ExpectNotBlocked, awaitTimeout = 2 seconds, expectedReturnedValue = true)
+ ExpectNotBlocked, awaitTimeout = 4 seconds, expectedReturnedValue = true)
// Resetting termination should make awaitAnyTermination() blocking again
sqlContext.streams.resetTerminated()
testAwaitAnyTermination(
ExpectBlocked,
- awaitTimeout = 2 seconds,
+ awaitTimeout = 4 seconds,
expectedReturnedValue = false,
testBehaviorFor = 1 second)
@@ -184,31 +184,31 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
val q2 = stopRandomQueryAsync(100 milliseconds, withError = true)
testAwaitAnyTermination(
ExpectException[SparkException],
- awaitTimeout = 1 second,
+ awaitTimeout = 1 seconds,
testBehaviorFor = 2 seconds)
require(!q2.isActive) // should be inactive by the time the prev awaitAnyTerm returned
// All subsequent calls to awaitAnyTermination should throw the exception
testAwaitAnyTermination(
ExpectException[SparkException],
- awaitTimeout = 1 second,
- testBehaviorFor = 2 seconds)
+ awaitTimeout = 2 seconds,
+ testBehaviorFor = 4 seconds)
// Terminate a query asynchronously outside the timeout, awaitAnyTerm should be blocked
sqlContext.streams.resetTerminated()
- val q3 = stopRandomQueryAsync(1 second, withError = true)
+ val q3 = stopRandomQueryAsync(2 seconds, withError = true)
testAwaitAnyTermination(
ExpectNotBlocked,
awaitTimeout = 100 milliseconds,
expectedReturnedValue = false,
- testBehaviorFor = 2 seconds)
+ testBehaviorFor = 4 seconds)
// After that query is stopped, awaitAnyTerm should throw exception
eventually(Timeout(streamingTimeout)) { require(!q3.isActive) } // wait for query to stop
testAwaitAnyTermination(
ExpectException[SparkException],
awaitTimeout = 100 milliseconds,
- testBehaviorFor = 2 seconds)
+ testBehaviorFor = 4 seconds)
// Terminate multiple queries, one with failure and see whether awaitAnyTermination throws
@@ -217,12 +217,12 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
val q4 = stopRandomQueryAsync(10 milliseconds, withError = false)
testAwaitAnyTermination(
- ExpectNotBlocked, awaitTimeout = 1 second, expectedReturnedValue = true)
+ ExpectNotBlocked, awaitTimeout = 2 seconds, expectedReturnedValue = true)
require(!q4.isActive)
val q5 = stopRandomQueryAsync(10 milliseconds, withError = true)
eventually(Timeout(streamingTimeout)) { require(!q5.isActive) }
// After q5 terminates with exception, awaitAnyTerm should start throwing exception
- testAwaitAnyTermination(ExpectException[SparkException], awaitTimeout = 100 milliseconds)
+ testAwaitAnyTermination(ExpectException[SparkException], awaitTimeout = 2 seconds)
}
}
@@ -260,7 +260,7 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
expectedBehavior: ExpectedBehavior,
expectedReturnedValue: Boolean = false,
awaitTimeout: Span = null,
- testBehaviorFor: Span = 2 seconds
+ testBehaviorFor: Span = 4 seconds
): Unit = {
def awaitTermFunc(): Unit = {