aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala9
1 files changed, 7 insertions, 2 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index 66e67cbfa1..450e48d66e 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -8,7 +8,7 @@ import org.apache.hadoop.conf.Configuration
import java.io._
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import java.util.concurrent.Executors
-
+import java.util.concurrent.RejectedExecutionException
private[streaming]
class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
@@ -91,7 +91,12 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
oos.writeObject(checkpoint)
oos.close()
bos.close()
- executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray))
+ try {
+ executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray))
+ } catch {
+ case rej: RejectedExecutionException =>
+ logError("Could not submit checkpoint task to the thread pool executor", rej)
+ }
}
def stop() {