aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala16
1 files changed, 11 insertions, 5 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 72705f1a9c..f5165f7c39 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.util
import java.nio.ByteBuffer
-import java.util.concurrent.ThreadPoolExecutor
+import java.util.concurrent.{RejectedExecutionException, ThreadPoolExecutor}
import java.util.{Iterator => JIterator}
import scala.collection.JavaConverters._
@@ -176,10 +176,16 @@ private[streaming] class FileBasedWriteAheadLog(
}
oldLogFiles.foreach { logInfo =>
if (!executionContext.isShutdown) {
- val f = Future { deleteFile(logInfo) }(executionContext)
- if (waitForCompletion) {
- import scala.concurrent.duration._
- Await.ready(f, 1 second)
+ try {
+ val f = Future { deleteFile(logInfo) }(executionContext)
+ if (waitForCompletion) {
+ import scala.concurrent.duration._
+ Await.ready(f, 1 second)
+ }
+ } catch {
+ case e: RejectedExecutionException =>
+ logWarning("Execution context shutdown before deleting old WriteAheadLogs. " +
+ "This would not affect recovery correctness.", e)
}
}
}