From 6c4b9f4be6b429197c6a53f937a82c2ac5866d65 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Tue, 19 Jul 2016 12:10:24 +0100 Subject: [SPARK-16395][STREAMING] Fail if too many CheckpointWriteHandlers are queued up in the fixed thread pool ## What changes were proposed in this pull request? Begin failing if checkpoint writes will likely keep up with storage's ability to write them, to fail fast instead of slowly filling memory ## How was this patch tested? Jenkins tests Author: Sean Owen Closes #14152 from srowen/SPARK-16395. --- .../main/scala/org/apache/spark/streaming/Checkpoint.scala | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) (limited to 'streaming/src/main/scala') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 0b11026863..398fa6500f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -18,8 +18,8 @@ package org.apache.spark.streaming import java.io._ -import java.util.concurrent.Executors -import java.util.concurrent.RejectedExecutionException +import java.util.concurrent.{ArrayBlockingQueue, RejectedExecutionException, + ThreadPoolExecutor, TimeUnit} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -184,7 +184,14 @@ class CheckpointWriter( hadoopConf: Configuration ) extends Logging { val MAX_ATTEMPTS = 3 - val executor = Executors.newFixedThreadPool(1) + + // Single-thread executor which rejects executions when a large amount have queued up. + // This fails fast since this typically means the checkpoint store will never keep up, and + // will otherwise lead to filling memory with waiting payloads of byte[] to write. + val executor = new ThreadPoolExecutor( + 1, 1, + 0L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue[Runnable](1000)) val compressionCodec = CompressionCodec.createCodec(conf) private var stopped = false @volatile private[this] var fs: FileSystem = null -- cgit v1.2.3