From a0454efe21e5c7ffe1b9bb7b18021a5580952e69 Mon Sep 17 00:00:00 2001 From: Ian Hummel Date: Sun, 21 Sep 2014 13:04:36 -0700 Subject: [SPARK-3595] Respect configured OutputCommitters when calling saveAsHadoopFile Addresses the issue in https://issues.apache.org/jira/browse/SPARK-3595, namely saveAsHadoopFile hardcoding the OutputCommitter. This is not ideal when running Spark jobs that write to S3, especially when running them from an EMR cluster where the default OutputCommitter is a DirectOutputCommitter. Author: Ian Hummel Closes #2450 from themodernlife/spark-3595 and squashes the following commits: f37a0e5 [Ian Hummel] Update based on comments from pwendell a11d9f3 [Ian Hummel] Fix formatting 4359664 [Ian Hummel] Add an example showing usage 8b6be94 [Ian Hummel] Add ability to specify OutputCommitter, espcially useful when writing to an S3 bucket from an EMR cluster --- core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala | 2 +- core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) (limited to 'core/src/main/scala') diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index f6703986bd..376e69cd99 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -116,7 +116,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf) } } } else { - logWarning ("No need to commit output of task: " + taID.value) + logInfo ("No need to commit output of task: " + taID.value) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index f6d9d12fe9..51ba8c2d17 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -872,7 +872,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName) hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) } - hadoopConf.setOutputCommitter(classOf[FileOutputCommitter]) + + // Use configured output committer if already set + if (conf.getOutputCommitter == null) { + hadoopConf.setOutputCommitter(classOf[FileOutputCommitter]) + } + FileOutputFormat.setOutputPath(hadoopConf, SparkHadoopWriter.createPathFromString(path, hadoopConf)) saveAsHadoopDataset(hadoopConf) -- cgit v1.2.3