aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-07-19 12:10:24 +0100
committerSean Owen <sowen@cloudera.com>2016-07-19 12:10:24 +0100
commit6c4b9f4be6b429197c6a53f937a82c2ac5866d65 (patch)
tree773e451197c192a1fb6c20b02d8fb0d984ab2239 /streaming/src/main/scala
parent8310c0741c0ca805ec74c1a78ba4a0f18e82d459 (diff)
downloadspark-6c4b9f4be6b429197c6a53f937a82c2ac5866d65.tar.gz
spark-6c4b9f4be6b429197c6a53f937a82c2ac5866d65.tar.bz2
spark-6c4b9f4be6b429197c6a53f937a82c2ac5866d65.zip
[SPARK-16395][STREAMING] Fail if too many CheckpointWriteHandlers are queued up in the fixed thread pool
## What changes were proposed in this pull request? Begin failing if checkpoint writes will likely keep up with storage's ability to write them, to fail fast instead of slowly filling memory ## How was this patch tested? Jenkins tests Author: Sean Owen <sowen@cloudera.com> Closes #14152 from srowen/SPARK-16395.
Diffstat (limited to 'streaming/src/main/scala')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala13
1 files changed, 10 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 0b11026863..398fa6500f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -18,8 +18,8 @@
package org.apache.spark.streaming
import java.io._
-import java.util.concurrent.Executors
-import java.util.concurrent.RejectedExecutionException
+import java.util.concurrent.{ArrayBlockingQueue, RejectedExecutionException,
+ ThreadPoolExecutor, TimeUnit}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
@@ -184,7 +184,14 @@ class CheckpointWriter(
hadoopConf: Configuration
) extends Logging {
val MAX_ATTEMPTS = 3
- val executor = Executors.newFixedThreadPool(1)
+
+ // Single-thread executor which rejects executions when a large amount have queued up.
+ // This fails fast since this typically means the checkpoint store will never keep up, and
+ // will otherwise lead to filling memory with waiting payloads of byte[] to write.
+ val executor = new ThreadPoolExecutor(
+ 1, 1,
+ 0L, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue[Runnable](1000))
val compressionCodec = CompressionCodec.createCodec(conf)
private var stopped = false
@volatile private[this] var fs: FileSystem = null