diff options
author | Shivaram Venkataraman <shivaram@eecs.berkeley.edu> | 2013-07-07 03:32:26 -0700 |
---|---|---|
committer | Shivaram Venkataraman <shivaram@eecs.berkeley.edu> | 2013-07-07 04:09:37 -0700 |
commit | 3350ad0d7fc3ecece78f87d7aa6a727e48b21c8c (patch) | |
tree | ddd55d3ec413c4c599927cb295dd17cedc8cb454 /streaming/src/main | |
parent | 7d6d9e6ab226579518f1c7fbe108c4e66acc6ed0 (diff) | |
download | spark-3350ad0d7fc3ecece78f87d7aa6a727e48b21c8c.tar.gz spark-3350ad0d7fc3ecece78f87d7aa6a727e48b21c8c.tar.bz2 spark-3350ad0d7fc3ecece78f87d7aa6a727e48b21c8c.zip |
Catch RejectedExecution exception in Checkpoint handler.
Diffstat (limited to 'streaming/src/main')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/Checkpoint.scala | 9 |
1 files changed, 7 insertions, 2 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala index 66e67cbfa1..450e48d66e 100644 --- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala @@ -8,7 +8,7 @@ import org.apache.hadoop.conf.Configuration import java.io._ import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} import java.util.concurrent.Executors - +import java.util.concurrent.RejectedExecutionException private[streaming] class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) @@ -91,7 +91,12 @@ class CheckpointWriter(checkpointDir: String) extends Logging { oos.writeObject(checkpoint) oos.close() bos.close() - executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray)) + try { + executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray)) + } catch { + case rej: RejectedExecutionException => + logError("Could not submit checkpoint task to the thread pool executor", rej) + } } def stop() { |