diff options
author | Sean Owen <sowen@cloudera.com> | 2016-07-19 12:10:24 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-07-19 12:10:24 +0100 |
commit | 6c4b9f4be6b429197c6a53f937a82c2ac5866d65 (patch) | |
tree | 773e451197c192a1fb6c20b02d8fb0d984ab2239 /streaming/src/main/scala | |
parent | 8310c0741c0ca805ec74c1a78ba4a0f18e82d459 (diff) | |
download | spark-6c4b9f4be6b429197c6a53f937a82c2ac5866d65.tar.gz spark-6c4b9f4be6b429197c6a53f937a82c2ac5866d65.tar.bz2 spark-6c4b9f4be6b429197c6a53f937a82c2ac5866d65.zip |
[SPARK-16395][STREAMING] Fail if too many CheckpointWriteHandlers are queued up in the fixed thread pool
## What changes were proposed in this pull request?
Begin failing if checkpoint writes will likely keep up with storage's ability to write them, to fail fast instead of slowly filling memory
## How was this patch tested?
Jenkins tests
Author: Sean Owen <sowen@cloudera.com>
Closes #14152 from srowen/SPARK-16395.
Diffstat (limited to 'streaming/src/main/scala')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala | 13 |
1 files changed, 10 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 0b11026863..398fa6500f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -18,8 +18,8 @@ package org.apache.spark.streaming import java.io._ -import java.util.concurrent.Executors -import java.util.concurrent.RejectedExecutionException +import java.util.concurrent.{ArrayBlockingQueue, RejectedExecutionException, + ThreadPoolExecutor, TimeUnit} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -184,7 +184,14 @@ class CheckpointWriter( hadoopConf: Configuration ) extends Logging { val MAX_ATTEMPTS = 3 - val executor = Executors.newFixedThreadPool(1) + + // Single-thread executor which rejects executions when a large amount have queued up. + // This fails fast since this typically means the checkpoint store will never keep up, and + // will otherwise lead to filling memory with waiting payloads of byte[] to write. + val executor = new ThreadPoolExecutor( + 1, 1, + 0L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue[Runnable](1000)) val compressionCodec = CompressionCodec.createCodec(conf) private var stopped = false @volatile private[this] var fs: FileSystem = null |