From 567a50acfb0ae26bd430c290348886d494963696 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 31 Mar 2017 10:58:43 -0700 Subject: [SPARK-20165][SS] Resolve state encoder's deserializer in driver in FlatMapGroupsWithStateExec ## What changes were proposed in this pull request? - Encoder's deserializer must be resolved at the driver where the class is defined. Otherwise there are corner cases using nested classes where resolving at the executor can fail. - Fixed flaky test related to processing time timeout. The flakiness is caused because the test thread (that adds data to memory source) has a race condition with the streaming query thread. When testing the manual clock, the goal is to add data and increment clock together atomically, such that a trigger sees new data AND updated clock simultaneously (both or none). This fix adds additional synchronization in when adding data; it makes sure that the streaming query thread is waiting on the manual clock to be incremented (so no batch is currently running) before adding data. - Added`testQuietly` on some tests that generate a lot of error logs. ## How was this patch tested? Multiple runs on existing unit tests Author: Tathagata Das Closes #17488 from tdas/SPARK-20165. --- .../sql/streaming/FileStreamSourceSuite.scala | 4 ++-- .../streaming/FlatMapGroupsWithStateSuite.scala | 7 ++++--- .../apache/spark/sql/streaming/StreamSuite.scala | 2 +- .../apache/spark/sql/streaming/StreamTest.scala | 23 ++++++++++++++++++++-- .../spark/sql/streaming/StreamingQuerySuite.scala | 2 +- 5 files changed, 29 insertions(+), 9 deletions(-) (limited to 'sql/core/src/test/scala') diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index f705da3d6a..171877abe6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -909,7 +909,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } - test("max files per trigger - incorrect values") { + testQuietly("max files per trigger - incorrect values") { val testTable = "maxFilesPerTrigger_test" withTable(testTable) { withTempDir { case src => @@ -1326,7 +1326,7 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest { import testImplicits._ - test("file source stress test") { + testQuietly("file source stress test") { val src = Utils.createTempDir(namePrefix = "streaming.src") val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala index a00a1a582a..c8e31e3ca2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala @@ -21,6 +21,8 @@ import java.sql.Date import java.util.concurrent.ConcurrentHashMap import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.Eventually.eventually +import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.apache.spark.SparkException import org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction @@ -574,11 +576,10 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf assertNumStateRows(total = 1, updated = 2), StopStream, - StartStream(ProcessingTime("1 second"), triggerClock = clock), - AdvanceManualClock(10 * 1000), + StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock), AddData(inputData, "c"), - AdvanceManualClock(1 * 1000), + AdvanceManualClock(11 * 1000), CheckLastBatch(("b", "-1"), ("c", "1")), assertNumStateRows(total = 1, updated = 2), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 32920f6dfa..388f15405e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -426,7 +426,7 @@ class StreamSuite extends StreamTest { CheckAnswer((1, 2), (2, 2), (3, 2))) } - test("recover from a Spark v2.1 checkpoint") { + testQuietly("recover from a Spark v2.1 checkpoint") { var inputData: MemoryStream[Int] = null var query: DataStreamWriter[Row] = null diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 8cf1791336..951ff2ca0d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -488,8 +488,27 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { case a: AddData => try { - // Add data and get the source where it was added, and the expected offset of the - // added data. + + // If the query is running with manual clock, then wait for the stream execution + // thread to start waiting for the clock to increment. This is needed so that we + // are adding data when there is no trigger that is active. This would ensure that + // the data gets deterministically added to the next batch triggered after the manual + // clock is incremented in following AdvanceManualClock. This avoid race conditions + // between the test thread and the stream execution thread in tests using manual + // clock. + if (currentStream != null && + currentStream.triggerClock.isInstanceOf[StreamManualClock]) { + val clock = currentStream.triggerClock.asInstanceOf[StreamManualClock] + eventually("Error while synchronizing with manual clock before adding data") { + if (currentStream.isActive) { + assert(clock.isStreamWaitingAt(clock.getTimeMillis())) + } + } + if (!currentStream.isActive) { + failTest("Query terminated while synchronizing with manual clock") + } + } + // Add data val queryToUse = Option(currentStream).orElse(Option(lastStream)) val (source, offset) = a.addData(queryToUse) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 3f41ecdb7f..1172531fe9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -487,7 +487,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi } } - test("StreamingQuery should be Serializable but cannot be used in executors") { + testQuietly("StreamingQuery should be Serializable but cannot be used in executors") { def startQuery(ds: Dataset[Int], queryName: String): StreamingQuery = { ds.writeStream .queryName(queryName) -- cgit v1.2.3