aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-05-02 11:28:21 -0700
committerMichael Armbrust <michael@databricks.com>2016-05-02 11:28:21 -0700
commita35a67a83dbb450d26ce0d142ab106e952670842 (patch)
tree5db377d5459b8fa799e15a27c30569648ec3174f /sql
parent99274418684ebae5b98d15b4686b95c1ac029e94 (diff)
downloadspark-a35a67a83dbb450d26ce0d142ab106e952670842.tar.gz
spark-a35a67a83dbb450d26ce0d142ab106e952670842.tar.bz2
spark-a35a67a83dbb450d26ce0d142ab106e952670842.zip
[SPARK-14579][SQL] Fix the race condition in StreamExecution.processAllAvailable again
## What changes were proposed in this pull request? #12339 didn't fix the race condition. MemorySinkSuite is still flaky: https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-hadoop-2.2/814/testReport/junit/org.apache.spark.sql.streaming/MemorySinkSuite/registering_as_a_table/ Here is an execution order to reproduce it. | Time |Thread 1 | MicroBatchThread | |:-------------:|:-------------:|:-----:| | 1 | | `MemorySink.getOffset` | | 2 | | availableOffsets ++= newData (availableOffsets is not changed here) | | 3 | addData(newData) | | | 4 | Set `noNewData` to `false` in processAllAvailable | | | 5 | | `dataAvailable` returns `false` | | 6 | | noNewData = true | | 7 | `noNewData` is true so just return | | | 8 | assert results and fail | | | 9 | | `dataAvailable` returns true so process the new batch | This PR expands the scope of `awaitBatchLock.synchronized` to eliminate the above race. ## How was this patch tested? test("stress test"). It always failed before this patch. And it will pass after applying this patch. Ignore this test in the PR as it takes several minutes to finish. Author: Shixiong Zhu <shixiong@databricks.com> Closes #12582 from zsxwing/SPARK-14579-2.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala9
2 files changed, 14 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index fc18e5f065..ce68c0968f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -242,12 +242,12 @@ class StreamExecution(
// method. See SPARK-14131.
//
// Check to see what new data is available.
- val newData = microBatchThread.runUninterruptibly {
- uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
- }
- availableOffsets ++= newData
-
val hasNewData = awaitBatchLock.synchronized {
+ val newData = microBatchThread.runUninterruptibly {
+ uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
+ }
+ availableOffsets ++= newData
+
if (dataAvailable) {
true
} else {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
index 1f28340545..74ca3977d6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
@@ -26,6 +26,15 @@ class MemorySinkSuite extends StreamTest with SharedSQLContext {
import testImplicits._
test("registering as a table") {
+ testRegisterAsTable()
+ }
+
+ ignore("stress test") {
+ // Ignore the stress test as it takes several minutes to run
+ (0 until 1000).foreach(_ => testRegisterAsTable())
+ }
+
+ private def testRegisterAsTable(): Unit = {
val input = MemoryStream[Int]
val query = input.toDF().write
.format("memory")