aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorpetermaxlee <petermaxlee@gmail.com>2016-09-18 15:22:01 -0700
committerReynold Xin <rxin@databricks.com>2016-09-18 15:22:01 -0700
commit8f0c35a4d0dd458719627be5f524792bf244d70a (patch)
tree5f953e797c1c75d192df4178952f8ad90dc6d0d7
parent1dbb725dbef30bf7633584ce8efdb573f2d92bca (diff)
downloadspark-8f0c35a4d0dd458719627be5f524792bf244d70a.tar.gz
spark-8f0c35a4d0dd458719627be5f524792bf244d70a.tar.bz2
spark-8f0c35a4d0dd458719627be5f524792bf244d70a.zip
[SPARK-17571][SQL] AssertOnQuery.condition should always return Boolean value
## What changes were proposed in this pull request? AssertOnQuery has two apply constructor: one that accepts a closure that returns boolean, and another that accepts a closure that returns Unit. This is actually very confusing because developers could mistakenly think that AssertOnQuery always require a boolean return type and verifies the return result, when indeed the value of the last statement is ignored in one of the constructors. This pull request makes the two constructor consistent and always require boolean value. It will overall make the test suites more robust against developer errors. As an evidence for the confusing behavior, this change also identified a bug with an existing test case due to file system time granularity. This pull request fixes that test case as well. ## How was this patch tested? This is a test only change. Author: petermaxlee <petermaxlee@gmail.com> Closes #15127 from petermaxlee/SPARK-17571.
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala3
3 files changed, 10 insertions, 4 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 886f7be59d..a02a36c004 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -354,7 +354,9 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
CheckAnswer("a", "b"),
// SLeeps longer than 5ms (maxFileAge)
- AssertOnQuery { _ => Thread.sleep(10); true },
+ // Unfortunately since a lot of file system does not have modification time granularity
+ // finer grained than 1 sec, we need to use 1 sec here.
+ AssertOnQuery { _ => Thread.sleep(1000); true },
AddTextFileData("c\nd", src, tmp),
CheckAnswer("a", "b", "c", "d"),
@@ -363,7 +365,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
val source = streamExecution.logicalPlan.collect { case e: StreamingExecutionRelation =>
e.source.asInstanceOf[FileStreamSource]
}.head
- source.seenFiles.size == 1
+ assert(source.seenFiles.size == 1)
+ true
}
)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index af2b58116b..6c5b170d9c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -188,8 +188,8 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
new AssertOnQuery(condition, message)
}
- def apply(message: String)(condition: StreamExecution => Unit): AssertOnQuery = {
- new AssertOnQuery(s => { condition(s); true }, message)
+ def apply(message: String)(condition: StreamExecution => Boolean): AssertOnQuery = {
+ new AssertOnQuery(condition, message)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 77602e8167..831543a474 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -66,6 +66,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
// No progress events or termination events
assert(listener.progressStatuses.isEmpty)
assert(listener.terminationStatus === null)
+ true
},
AddDataMemory(input, Seq(1, 2, 3)),
CheckAnswer(1, 2, 3),
@@ -84,6 +85,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
// No termination events
assert(listener.terminationStatus === null)
}
+ true
},
StopStream,
AssertOnQuery("Incorrect query status in onQueryTerminated") { query =>
@@ -97,6 +99,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(listener.terminationException === None)
}
listener.checkAsyncErrors()
+ true
}
)
}