aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2017-04-12 11:24:59 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2017-04-12 11:24:59 -0700
commit924c42477b5d6ed3c217c8eaaf4dc64b2379851a (patch)
tree28209b8e1ad914b87812e07b08388c5ed16f6d51
parent99a9473127ec389283ac4ec3b721d2e34434e647 (diff)
downloadspark-924c42477b5d6ed3c217c8eaaf4dc64b2379851a.tar.gz
spark-924c42477b5d6ed3c217c8eaaf4dc64b2379851a.tar.bz2
spark-924c42477b5d6ed3c217c8eaaf4dc64b2379851a.zip
[SPARK-20301][FLAKY-TEST] Fix Hadoop Shell.runCommand flakiness in Structured Streaming tests
## What changes were proposed in this pull request? Some Structured Streaming tests show flakiness such as: ``` [info] - prune results by current_date, complete mode - 696 *** FAILED *** (10 seconds, 937 milliseconds) [info] Timed out while stopping and waiting for microbatchthread to terminate.: The code passed to failAfter did not complete within 10 seconds. ``` This happens when we wait for the stream to stop, but it doesn't. The reason it doesn't stop is that we interrupt the microBatchThread, but Hadoop's `Shell.runCommand` swallows the interrupt exception, and the exception is not propagated upstream to the microBatchThread. Then this thread continues to run, only to start blocking on the `streamManualClock`. ## How was this patch tested? Thousand retries locally and [Jenkins](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75720/testReport) of the flaky tests Author: Burak Yavuz <brkyvz@gmail.com> Closes #17613 from brkyvz/flaky-stream-agg.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala56
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala6
2 files changed, 32 insertions, 30 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 8857966676..bcf0d970f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -284,42 +284,38 @@ class StreamExecution(
triggerExecutor.execute(() => {
startTrigger()
- val continueToRun =
- if (isActive) {
- reportTimeTaken("triggerExecution") {
- if (currentBatchId < 0) {
- // We'll do this initialization only once
- populateStartOffsets(sparkSessionToRunBatches)
- logDebug(s"Stream running from $committedOffsets to $availableOffsets")
- } else {
- constructNextBatch()
- }
- if (dataAvailable) {
- currentStatus = currentStatus.copy(isDataAvailable = true)
- updateStatusMessage("Processing new data")
- runBatch(sparkSessionToRunBatches)
- }
+ if (isActive) {
+ reportTimeTaken("triggerExecution") {
+ if (currentBatchId < 0) {
+ // We'll do this initialization only once
+ populateStartOffsets(sparkSessionToRunBatches)
+ logDebug(s"Stream running from $committedOffsets to $availableOffsets")
+ } else {
+ constructNextBatch()
}
- // Report trigger as finished and construct progress object.
- finishTrigger(dataAvailable)
if (dataAvailable) {
- // Update committed offsets.
- batchCommitLog.add(currentBatchId)
- committedOffsets ++= availableOffsets
- logDebug(s"batch ${currentBatchId} committed")
- // We'll increase currentBatchId after we complete processing current batch's data
- currentBatchId += 1
- } else {
- currentStatus = currentStatus.copy(isDataAvailable = false)
- updateStatusMessage("Waiting for data to arrive")
- Thread.sleep(pollingDelayMs)
+ currentStatus = currentStatus.copy(isDataAvailable = true)
+ updateStatusMessage("Processing new data")
+ runBatch(sparkSessionToRunBatches)
}
- true
+ }
+ // Report trigger as finished and construct progress object.
+ finishTrigger(dataAvailable)
+ if (dataAvailable) {
+ // Update committed offsets.
+ batchCommitLog.add(currentBatchId)
+ committedOffsets ++= availableOffsets
+ logDebug(s"batch ${currentBatchId} committed")
+ // We'll increase currentBatchId after we complete processing current batch's data
+ currentBatchId += 1
} else {
- false
+ currentStatus = currentStatus.copy(isDataAvailable = false)
+ updateStatusMessage("Waiting for data to arrive")
+ Thread.sleep(pollingDelayMs)
}
+ }
updateStatusMessage("Waiting for next trigger")
- continueToRun
+ isActive
})
updateStatusMessage("Stopped")
} else {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 03aa45b616..5bc36dd30f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -277,6 +277,11 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
def threadState =
if (currentStream != null && currentStream.microBatchThread.isAlive) "alive" else "dead"
+ def threadStackTrace = if (currentStream != null && currentStream.microBatchThread.isAlive) {
+ s"Thread stack trace: ${currentStream.microBatchThread.getStackTrace.mkString("\n")}"
+ } else {
+ ""
+ }
def testState =
s"""
@@ -287,6 +292,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
|Output Mode: $outputMode
|Stream state: $currentOffsets
|Thread state: $threadState
+ |$threadStackTrace
|${if (streamThreadDeathCause != null) stackTraceToString(streamThreadDeathCause) else ""}
|
|== Sink ==