From a4ead6d3881f071a2ae53ff1c961c6ac388cac1d Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 6 Apr 2016 12:28:04 -0700 Subject: [SPARK-14382][SQL] QueryProgress should be post after committedOffsets is updated ## What changes were proposed in this pull request? Make sure QueryProgress is post after committedOffsets is updated. If QueryProgress is post before committedOffsets is updated, the listener may see a wrong sinkStatus (created from committedOffsets). See https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-hadoop-2.2/644/testReport/junit/org.apache.spark.sql.util/ContinuousQueryListenerSuite/single_listener/ for an example of the failure. ## How was this patch tested? Existing unit tests. Author: Shixiong Zhu Closes #12155 from zsxwing/SPARK-14382. --- .../spark/sql/execution/streaming/StreamExecution.scala | 15 +++++---------- .../spark/sql/util/ContinuousQueryListenerSuite.scala | 3 +-- 2 files changed, 6 insertions(+), 12 deletions(-) (limited to 'sql/core') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 3e4acb752a..688e051e1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -159,7 +159,7 @@ class StreamExecution( triggerExecutor.execute(() => { if (isActive) { if (dataAvailable) runBatch() - commitAndConstructNextBatch() + constructNextBatch() true } else { false @@ -207,7 +207,7 @@ class StreamExecution( case None => // We are starting this stream for the first time. logInfo(s"Starting new continuous query.") currentBatchId = 0 - commitAndConstructNextBatch() + constructNextBatch() } } @@ -227,15 +227,8 @@ class StreamExecution( /** * Queries all of the sources to see if any new data is available. When there is new data the * batchId counter is incremented and a new log entry is written with the newest offsets. - * - * Note that committing the offsets for a new batch implicitly marks the previous batch as - * finished and thus this method should only be called when all currently available data - * has been written to the sink. */ - private def commitAndConstructNextBatch(): Boolean = { - // Update committed offsets. - committedOffsets ++= availableOffsets - + private def constructNextBatch(): Boolean = { // There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). // If we interrupt some thread running Shell.runCommand, we may hit this issue. // As "FileStreamSource.getOffset" will create a file using HDFS API and call "Shell.runCommand" @@ -331,6 +324,8 @@ class StreamExecution( val batchTime = (System.nanoTime() - startTime).toDouble / 1000000 logInfo(s"Completed up to $availableOffsets in ${batchTime}ms") + // Update committed offsets. + committedOffsets ++= availableOffsets postEvent(new QueryProgress(this)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala index d04783ecac..3498fe83d0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala @@ -146,7 +146,6 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with private def withListenerAdded(listener: ContinuousQueryListener)(body: => Unit): Unit = { - @volatile var query: StreamExecution = null try { failAfter(1 minute) { sqlContext.streams.addListener(listener) @@ -212,7 +211,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with case class QueryStatus( active: Boolean, - expection: Option[Exception], + exception: Option[Exception], sourceStatuses: Array[SourceStatus], sinkStatus: SinkStatus) -- cgit v1.2.3