aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2015-11-24 20:58:47 -0800
committerShixiong Zhu <shixiong@databricks.com>2015-11-24 20:58:47 -0800
commita5d988763319f63a8e2b58673dd4f9098f17c835 (patch)
tree5ee8cbdc22354756bb547849ac2881c698e03d90 /streaming
parent4d6bbbc03ddb6650b00eb638e4876a196014c19c (diff)
downloadspark-a5d988763319f63a8e2b58673dd4f9098f17c835.tar.gz
spark-a5d988763319f63a8e2b58673dd4f9098f17c835.tar.bz2
spark-a5d988763319f63a8e2b58673dd4f9098f17c835.zip
[STREAMING][FLAKY-TEST] Catch execution context race condition in `FileBasedWriteAheadLog.close()`
There is a race condition in `FileBasedWriteAheadLog.close()`, where if delete's of old log files are in progress, the write ahead log may close, and result in a `RejectedExecutionException`. This is okay, and should be handled gracefully. Example test failures: https://amplab.cs.berkeley.edu/jenkins/job/Spark-1.6-SBT/AMPLAB_JENKINS_BUILD_PROFILE=hadoop1.0,label=spark-test/95/testReport/junit/org.apache.spark.streaming.util/BatchedWriteAheadLogWithCloseFileAfterWriteSuite/BatchedWriteAheadLog___clean_old_logs/ The reason the test fails is in `afterEach`, `writeAheadLog.close` is called, and there may still be async deletes in flight. tdas zsxwing Author: Burak Yavuz <brkyvz@gmail.com> Closes #9953 from brkyvz/flaky-ss.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala16
1 files changed, 11 insertions, 5 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 72705f1a9c..f5165f7c39 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.util
import java.nio.ByteBuffer
-import java.util.concurrent.ThreadPoolExecutor
+import java.util.concurrent.{RejectedExecutionException, ThreadPoolExecutor}
import java.util.{Iterator => JIterator}
import scala.collection.JavaConverters._
@@ -176,10 +176,16 @@ private[streaming] class FileBasedWriteAheadLog(
}
oldLogFiles.foreach { logInfo =>
if (!executionContext.isShutdown) {
- val f = Future { deleteFile(logInfo) }(executionContext)
- if (waitForCompletion) {
- import scala.concurrent.duration._
- Await.ready(f, 1 second)
+ try {
+ val f = Future { deleteFile(logInfo) }(executionContext)
+ if (waitForCompletion) {
+ import scala.concurrent.duration._
+ Await.ready(f, 1 second)
+ }
+ } catch {
+ case e: RejectedExecutionException =>
+ logWarning("Execution context shutdown before deleting old WriteAheadLogs. " +
+ "This would not affect recovery correctness.", e)
}
}
}