aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala3
3 files changed, 10 insertions, 4 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 886f7be59d..a02a36c004 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -354,7 +354,9 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
CheckAnswer("a", "b"),
// SLeeps longer than 5ms (maxFileAge)
- AssertOnQuery { _ => Thread.sleep(10); true },
+ // Unfortunately since a lot of file system does not have modification time granularity
+ // finer grained than 1 sec, we need to use 1 sec here.
+ AssertOnQuery { _ => Thread.sleep(1000); true },
AddTextFileData("c\nd", src, tmp),
CheckAnswer("a", "b", "c", "d"),
@@ -363,7 +365,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
val source = streamExecution.logicalPlan.collect { case e: StreamingExecutionRelation =>
e.source.asInstanceOf[FileStreamSource]
}.head
- source.seenFiles.size == 1
+ assert(source.seenFiles.size == 1)
+ true
}
)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index af2b58116b..6c5b170d9c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -188,8 +188,8 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
new AssertOnQuery(condition, message)
}
- def apply(message: String)(condition: StreamExecution => Unit): AssertOnQuery = {
- new AssertOnQuery(s => { condition(s); true }, message)
+ def apply(message: String)(condition: StreamExecution => Boolean): AssertOnQuery = {
+ new AssertOnQuery(condition, message)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 77602e8167..831543a474 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -66,6 +66,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
// No progress events or termination events
assert(listener.progressStatuses.isEmpty)
assert(listener.terminationStatus === null)
+ true
},
AddDataMemory(input, Seq(1, 2, 3)),
CheckAnswer(1, 2, 3),
@@ -84,6 +85,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
// No termination events
assert(listener.terminationStatus === null)
}
+ true
},
StopStream,
AssertOnQuery("Incorrect query status in onQueryTerminated") { query =>
@@ -97,6 +99,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(listener.terminationException === None)
}
listener.checkAsyncErrors()
+ true
}
)
}