aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala40
1 files changed, 27 insertions, 13 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 688e051e1f..87dd27a2b1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -59,12 +59,14 @@ class StreamExecution(
* Tracks how much data we have processed and committed to the sink or state store from each
* input source.
*/
+ @volatile
private[sql] var committedOffsets = new StreamProgress
/**
* Tracks the offsets that are available to be processed, but have not yet be committed to the
* sink.
*/
+ @volatile
private var availableOffsets = new StreamProgress
/** The current batchId or -1 if execution has not yet been initialized. */
@@ -111,7 +113,8 @@ class StreamExecution(
/** Returns current status of all the sources. */
override def sourceStatuses: Array[SourceStatus] = {
- sources.map(s => new SourceStatus(s.toString, availableOffsets.get(s))).toArray
+ val localAvailableOffsets = availableOffsets
+ sources.map(s => new SourceStatus(s.toString, localAvailableOffsets.get(s))).toArray
}
/** Returns current status of the sink. */
@@ -228,7 +231,7 @@ class StreamExecution(
* Queries all of the sources to see if any new data is available. When there is new data the
* batchId counter is incremented and a new log entry is written with the newest offsets.
*/
- private def constructNextBatch(): Boolean = {
+ private def constructNextBatch(): Unit = {
// There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622).
// If we interrupt some thread running Shell.runCommand, we may hit this issue.
// As "FileStreamSource.getOffset" will create a file using HDFS API and call "Shell.runCommand"
@@ -241,7 +244,15 @@ class StreamExecution(
}
availableOffsets ++= newData
- if (dataAvailable) {
+ val hasNewData = awaitBatchLock.synchronized {
+ if (dataAvailable) {
+ true
+ } else {
+ noNewData = true
+ false
+ }
+ }
+ if (hasNewData) {
// There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622).
// If we interrupt some thread running Shell.runCommand, we may hit this issue.
// As "offsetLog.add" will create a file using HDFS API and call "Shell.runCommand" to set
@@ -254,15 +265,11 @@ class StreamExecution(
}
currentBatchId += 1
logInfo(s"Committed offsets for batch $currentBatchId.")
- true
} else {
- noNewData = true
awaitBatchLock.synchronized {
// Wake up any threads that are waiting for the stream to progress.
awaitBatchLock.notifyAll()
}
-
- false
}
}
@@ -353,7 +360,10 @@ class StreamExecution(
* least the given `Offset`. This method is indented for use primarily when writing tests.
*/
def awaitOffset(source: Source, newOffset: Offset): Unit = {
- def notDone = !committedOffsets.contains(source) || committedOffsets(source) < newOffset
+ def notDone = {
+ val localCommittedOffsets = committedOffsets
+ !localCommittedOffsets.contains(source) || localCommittedOffsets(source) < newOffset
+ }
while (notDone) {
logInfo(s"Waiting until $newOffset at $source")
@@ -365,13 +375,17 @@ class StreamExecution(
/** A flag to indicate that a batch has completed with no new data available. */
@volatile private var noNewData = false
- override def processAllAvailable(): Unit = {
+ override def processAllAvailable(): Unit = awaitBatchLock.synchronized {
noNewData = false
- while (!noNewData) {
- awaitBatchLock.synchronized { awaitBatchLock.wait(10000) }
- if (streamDeathCause != null) { throw streamDeathCause }
+ while (true) {
+ awaitBatchLock.wait(10000)
+ if (streamDeathCause != null) {
+ throw streamDeathCause
+ }
+ if (noNewData) {
+ return
+ }
}
- if (streamDeathCause != null) { throw streamDeathCause }
}
override def awaitTermination(): Unit = {