aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala56
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala6
2 files changed, 32 insertions, 30 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 8857966676..bcf0d970f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -284,42 +284,38 @@ class StreamExecution(
triggerExecutor.execute(() => {
startTrigger()
- val continueToRun =
- if (isActive) {
- reportTimeTaken("triggerExecution") {
- if (currentBatchId < 0) {
- // We'll do this initialization only once
- populateStartOffsets(sparkSessionToRunBatches)
- logDebug(s"Stream running from $committedOffsets to $availableOffsets")
- } else {
- constructNextBatch()
- }
- if (dataAvailable) {
- currentStatus = currentStatus.copy(isDataAvailable = true)
- updateStatusMessage("Processing new data")
- runBatch(sparkSessionToRunBatches)
- }
+ if (isActive) {
+ reportTimeTaken("triggerExecution") {
+ if (currentBatchId < 0) {
+ // We'll do this initialization only once
+ populateStartOffsets(sparkSessionToRunBatches)
+ logDebug(s"Stream running from $committedOffsets to $availableOffsets")
+ } else {
+ constructNextBatch()
}
- // Report trigger as finished and construct progress object.
- finishTrigger(dataAvailable)
if (dataAvailable) {
- // Update committed offsets.
- batchCommitLog.add(currentBatchId)
- committedOffsets ++= availableOffsets
- logDebug(s"batch ${currentBatchId} committed")
- // We'll increase currentBatchId after we complete processing current batch's data
- currentBatchId += 1
- } else {
- currentStatus = currentStatus.copy(isDataAvailable = false)
- updateStatusMessage("Waiting for data to arrive")
- Thread.sleep(pollingDelayMs)
+ currentStatus = currentStatus.copy(isDataAvailable = true)
+ updateStatusMessage("Processing new data")
+ runBatch(sparkSessionToRunBatches)
}
- true
+ }
+ // Report trigger as finished and construct progress object.
+ finishTrigger(dataAvailable)
+ if (dataAvailable) {
+ // Update committed offsets.
+ batchCommitLog.add(currentBatchId)
+ committedOffsets ++= availableOffsets
+ logDebug(s"batch ${currentBatchId} committed")
+ // We'll increase currentBatchId after we complete processing current batch's data
+ currentBatchId += 1
} else {
- false
+ currentStatus = currentStatus.copy(isDataAvailable = false)
+ updateStatusMessage("Waiting for data to arrive")
+ Thread.sleep(pollingDelayMs)
}
+ }
updateStatusMessage("Waiting for next trigger")
- continueToRun
+ isActive
})
updateStatusMessage("Stopped")
} else {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 03aa45b616..5bc36dd30f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -277,6 +277,11 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
def threadState =
if (currentStream != null && currentStream.microBatchThread.isAlive) "alive" else "dead"
+ def threadStackTrace = if (currentStream != null && currentStream.microBatchThread.isAlive) {
+ s"Thread stack trace: ${currentStream.microBatchThread.getStackTrace.mkString("\n")}"
+ } else {
+ ""
+ }
def testState =
s"""
@@ -287,6 +292,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
|Output Mode: $outputMode
|Stream state: $currentOffsets
|Thread state: $threadState
+ |$threadStackTrace
|${if (streamThreadDeathCause != null) stackTraceToString(streamThreadDeathCause) else ""}
|
|== Sink ==