aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2017-03-31 10:58:43 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2017-03-31 10:58:43 -0700
commit567a50acfb0ae26bd430c290348886d494963696 (patch)
treeb2a7be7b5c14cd5dd82cac3066881cd9069d28b5 /sql/core/src/test/scala
parentb2349e6a00d569851f0ca91a60e9299306208e92 (diff)
downloadspark-567a50acfb0ae26bd430c290348886d494963696.tar.gz
spark-567a50acfb0ae26bd430c290348886d494963696.tar.bz2
spark-567a50acfb0ae26bd430c290348886d494963696.zip
[SPARK-20165][SS] Resolve state encoder's deserializer in driver in FlatMapGroupsWithStateExec
## What changes were proposed in this pull request? - Encoder's deserializer must be resolved at the driver where the class is defined. Otherwise there are corner cases using nested classes where resolving at the executor can fail. - Fixed flaky test related to processing time timeout. The flakiness is caused because the test thread (that adds data to memory source) has a race condition with the streaming query thread. When testing the manual clock, the goal is to add data and increment clock together atomically, such that a trigger sees new data AND updated clock simultaneously (both or none). This fix adds additional synchronization in when adding data; it makes sure that the streaming query thread is waiting on the manual clock to be incremented (so no batch is currently running) before adding data. - Added`testQuietly` on some tests that generate a lot of error logs. ## How was this patch tested? Multiple runs on existing unit tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #17488 from tdas/SPARK-20165.
Diffstat (limited to 'sql/core/src/test/scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala23
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala2
5 files changed, 29 insertions, 9 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index f705da3d6a..171877abe6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -909,7 +909,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}
- test("max files per trigger - incorrect values") {
+ testQuietly("max files per trigger - incorrect values") {
val testTable = "maxFilesPerTrigger_test"
withTable(testTable) {
withTempDir { case src =>
@@ -1326,7 +1326,7 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest {
import testImplicits._
- test("file source stress test") {
+ testQuietly("file source stress test") {
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
index a00a1a582a..c8e31e3ca2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
@@ -21,6 +21,8 @@ import java.sql.Date
import java.util.concurrent.ConcurrentHashMap
import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.Eventually.eventually
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.apache.spark.SparkException
import org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction
@@ -574,11 +576,10 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
assertNumStateRows(total = 1, updated = 2),
StopStream,
- StartStream(ProcessingTime("1 second"), triggerClock = clock),
- AdvanceManualClock(10 * 1000),
+ StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
AddData(inputData, "c"),
- AdvanceManualClock(1 * 1000),
+ AdvanceManualClock(11 * 1000),
CheckLastBatch(("b", "-1"), ("c", "1")),
assertNumStateRows(total = 1, updated = 2),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 32920f6dfa..388f15405e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -426,7 +426,7 @@ class StreamSuite extends StreamTest {
CheckAnswer((1, 2), (2, 2), (3, 2)))
}
- test("recover from a Spark v2.1 checkpoint") {
+ testQuietly("recover from a Spark v2.1 checkpoint") {
var inputData: MemoryStream[Int] = null
var query: DataStreamWriter[Row] = null
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 8cf1791336..951ff2ca0d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -488,8 +488,27 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
case a: AddData =>
try {
- // Add data and get the source where it was added, and the expected offset of the
- // added data.
+
+ // If the query is running with manual clock, then wait for the stream execution
+ // thread to start waiting for the clock to increment. This is needed so that we
+ // are adding data when there is no trigger that is active. This would ensure that
+ // the data gets deterministically added to the next batch triggered after the manual
+ // clock is incremented in following AdvanceManualClock. This avoid race conditions
+ // between the test thread and the stream execution thread in tests using manual
+ // clock.
+ if (currentStream != null &&
+ currentStream.triggerClock.isInstanceOf[StreamManualClock]) {
+ val clock = currentStream.triggerClock.asInstanceOf[StreamManualClock]
+ eventually("Error while synchronizing with manual clock before adding data") {
+ if (currentStream.isActive) {
+ assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+ }
+ }
+ if (!currentStream.isActive) {
+ failTest("Query terminated while synchronizing with manual clock")
+ }
+ }
+ // Add data
val queryToUse = Option(currentStream).orElse(Option(lastStream))
val (source, offset) = a.addData(queryToUse)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 3f41ecdb7f..1172531fe9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -487,7 +487,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
}
}
- test("StreamingQuery should be Serializable but cannot be used in executors") {
+ testQuietly("StreamingQuery should be Serializable but cannot be used in executors") {
def startQuery(ds: Dataset[Int], queryName: String): StreamingQuery = {
ds.writeStream
.queryName(queryName)