diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-04-06 12:28:04 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2016-04-06 12:28:04 -0700 |
commit | a4ead6d3881f071a2ae53ff1c961c6ac388cac1d (patch) | |
tree | 60e6875acb4485401b6996bb1bbda472a1df98f4 /sql/core | |
parent | 9c6556c5f8ab013b36312db4bf02c4c6d965a535 (diff) | |
download | spark-a4ead6d3881f071a2ae53ff1c961c6ac388cac1d.tar.gz spark-a4ead6d3881f071a2ae53ff1c961c6ac388cac1d.tar.bz2 spark-a4ead6d3881f071a2ae53ff1c961c6ac388cac1d.zip |
[SPARK-14382][SQL] QueryProgress should be post after committedOffsets is updated
## What changes were proposed in this pull request?
Make sure QueryProgress is post after committedOffsets is updated. If QueryProgress is post before committedOffsets is updated, the listener may see a wrong sinkStatus (created from committedOffsets).
See https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-hadoop-2.2/644/testReport/junit/org.apache.spark.sql.util/ContinuousQueryListenerSuite/single_listener/ for an example of the failure.
## How was this patch tested?
Existing unit tests.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #12155 from zsxwing/SPARK-14382.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala | 15 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala | 3 |
2 files changed, 6 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 3e4acb752a..688e051e1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -159,7 +159,7 @@ class StreamExecution( triggerExecutor.execute(() => { if (isActive) { if (dataAvailable) runBatch() - commitAndConstructNextBatch() + constructNextBatch() true } else { false @@ -207,7 +207,7 @@ class StreamExecution( case None => // We are starting this stream for the first time. logInfo(s"Starting new continuous query.") currentBatchId = 0 - commitAndConstructNextBatch() + constructNextBatch() } } @@ -227,15 +227,8 @@ class StreamExecution( /** * Queries all of the sources to see if any new data is available. When there is new data the * batchId counter is incremented and a new log entry is written with the newest offsets. - * - * Note that committing the offsets for a new batch implicitly marks the previous batch as - * finished and thus this method should only be called when all currently available data - * has been written to the sink. */ - private def commitAndConstructNextBatch(): Boolean = { - // Update committed offsets. - committedOffsets ++= availableOffsets - + private def constructNextBatch(): Boolean = { // There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). // If we interrupt some thread running Shell.runCommand, we may hit this issue. // As "FileStreamSource.getOffset" will create a file using HDFS API and call "Shell.runCommand" @@ -331,6 +324,8 @@ class StreamExecution( val batchTime = (System.nanoTime() - startTime).toDouble / 1000000 logInfo(s"Completed up to $availableOffsets in ${batchTime}ms") + // Update committed offsets. + committedOffsets ++= availableOffsets postEvent(new QueryProgress(this)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala index d04783ecac..3498fe83d0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala @@ -146,7 +146,6 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with private def withListenerAdded(listener: ContinuousQueryListener)(body: => Unit): Unit = { - @volatile var query: StreamExecution = null try { failAfter(1 minute) { sqlContext.streams.addListener(listener) @@ -212,7 +211,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with case class QueryStatus( active: Boolean, - expection: Option[Exception], + exception: Option[Exception], sourceStatuses: Array[SourceStatus], sinkStatus: SinkStatus) |