aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main/scala')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala13
1 files changed, 10 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 0b11026863..398fa6500f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -18,8 +18,8 @@
package org.apache.spark.streaming
import java.io._
-import java.util.concurrent.Executors
-import java.util.concurrent.RejectedExecutionException
+import java.util.concurrent.{ArrayBlockingQueue, RejectedExecutionException,
+ ThreadPoolExecutor, TimeUnit}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
@@ -184,7 +184,14 @@ class CheckpointWriter(
hadoopConf: Configuration
) extends Logging {
val MAX_ATTEMPTS = 3
- val executor = Executors.newFixedThreadPool(1)
+
+ // Single-thread executor which rejects executions when a large amount have queued up.
+ // This fails fast since this typically means the checkpoint store will never keep up, and
+ // will otherwise lead to filling memory with waiting payloads of byte[] to write.
+ val executor = new ThreadPoolExecutor(
+ 1, 1,
+ 0L, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue[Runnable](1000))
val compressionCodec = CompressionCodec.createCodec(conf)
private var stopped = false
@volatile private[this] var fs: FileSystem = null