aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-04-06 12:28:04 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-04-06 12:28:04 -0700
commita4ead6d3881f071a2ae53ff1c961c6ac388cac1d (patch)
tree60e6875acb4485401b6996bb1bbda472a1df98f4 /sql/core
parent9c6556c5f8ab013b36312db4bf02c4c6d965a535 (diff)
downloadspark-a4ead6d3881f071a2ae53ff1c961c6ac388cac1d.tar.gz
spark-a4ead6d3881f071a2ae53ff1c961c6ac388cac1d.tar.bz2
spark-a4ead6d3881f071a2ae53ff1c961c6ac388cac1d.zip
[SPARK-14382][SQL] QueryProgress should be post after committedOffsets is updated
## What changes were proposed in this pull request? Make sure QueryProgress is post after committedOffsets is updated. If QueryProgress is post before committedOffsets is updated, the listener may see a wrong sinkStatus (created from committedOffsets). See https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-hadoop-2.2/644/testReport/junit/org.apache.spark.sql.util/ContinuousQueryListenerSuite/single_listener/ for an example of the failure. ## How was this patch tested? Existing unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #12155 from zsxwing/SPARK-14382.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala15
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala3
2 files changed, 6 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 3e4acb752a..688e051e1f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -159,7 +159,7 @@ class StreamExecution(
triggerExecutor.execute(() => {
if (isActive) {
if (dataAvailable) runBatch()
- commitAndConstructNextBatch()
+ constructNextBatch()
true
} else {
false
@@ -207,7 +207,7 @@ class StreamExecution(
case None => // We are starting this stream for the first time.
logInfo(s"Starting new continuous query.")
currentBatchId = 0
- commitAndConstructNextBatch()
+ constructNextBatch()
}
}
@@ -227,15 +227,8 @@ class StreamExecution(
/**
* Queries all of the sources to see if any new data is available. When there is new data the
* batchId counter is incremented and a new log entry is written with the newest offsets.
- *
- * Note that committing the offsets for a new batch implicitly marks the previous batch as
- * finished and thus this method should only be called when all currently available data
- * has been written to the sink.
*/
- private def commitAndConstructNextBatch(): Boolean = {
- // Update committed offsets.
- committedOffsets ++= availableOffsets
-
+ private def constructNextBatch(): Boolean = {
// There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622).
// If we interrupt some thread running Shell.runCommand, we may hit this issue.
// As "FileStreamSource.getOffset" will create a file using HDFS API and call "Shell.runCommand"
@@ -331,6 +324,8 @@ class StreamExecution(
val batchTime = (System.nanoTime() - startTime).toDouble / 1000000
logInfo(s"Completed up to $availableOffsets in ${batchTime}ms")
+ // Update committed offsets.
+ committedOffsets ++= availableOffsets
postEvent(new QueryProgress(this))
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
index d04783ecac..3498fe83d0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
@@ -146,7 +146,6 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
private def withListenerAdded(listener: ContinuousQueryListener)(body: => Unit): Unit = {
- @volatile var query: StreamExecution = null
try {
failAfter(1 minute) {
sqlContext.streams.addListener(listener)
@@ -212,7 +211,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
case class QueryStatus(
active: Boolean,
- expection: Option[Exception],
+ exception: Option[Exception],
sourceStatuses: Array[SourceStatus],
sinkStatus: SinkStatus)