aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorShivaram Venkataraman <shivaram@eecs.berkeley.edu>2013-07-07 03:32:26 -0700
committerShivaram Venkataraman <shivaram@eecs.berkeley.edu>2013-07-07 04:09:37 -0700
commit3350ad0d7fc3ecece78f87d7aa6a727e48b21c8c (patch)
treeddd55d3ec413c4c599927cb295dd17cedc8cb454 /streaming
parent7d6d9e6ab226579518f1c7fbe108c4e66acc6ed0 (diff)
downloadspark-3350ad0d7fc3ecece78f87d7aa6a727e48b21c8c.tar.gz
spark-3350ad0d7fc3ecece78f87d7aa6a727e48b21c8c.tar.bz2
spark-3350ad0d7fc3ecece78f87d7aa6a727e48b21c8c.zip
Catch RejectedExecution exception in Checkpoint handler.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala9
1 files changed, 7 insertions, 2 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index 66e67cbfa1..450e48d66e 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -8,7 +8,7 @@ import org.apache.hadoop.conf.Configuration
import java.io._
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import java.util.concurrent.Executors
-
+import java.util.concurrent.RejectedExecutionException
private[streaming]
class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
@@ -91,7 +91,12 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
oos.writeObject(checkpoint)
oos.close()
bos.close()
- executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray))
+ try {
+ executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray))
+ } catch {
+ case rej: RejectedExecutionException =>
+ logError("Could not submit checkpoint task to the thread pool executor", rej)
+ }
}
def stop() {