aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorIan Hummel <ian@themodernlife.net>2014-09-21 13:04:36 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-09-21 13:04:36 -0700
commita0454efe21e5c7ffe1b9bb7b18021a5580952e69 (patch)
tree9c7df79201b003b81e0c54cb07283a69088860dd /core/src/main/scala
parentd112a6c79dee7b5d8459696f97d329190e8d09a5 (diff)
downloadspark-a0454efe21e5c7ffe1b9bb7b18021a5580952e69.tar.gz
spark-a0454efe21e5c7ffe1b9bb7b18021a5580952e69.tar.bz2
spark-a0454efe21e5c7ffe1b9bb7b18021a5580952e69.zip
[SPARK-3595] Respect configured OutputCommitters when calling saveAsHadoopFile
Addresses the issue in https://issues.apache.org/jira/browse/SPARK-3595, namely saveAsHadoopFile hardcoding the OutputCommitter. This is not ideal when running Spark jobs that write to S3, especially when running them from an EMR cluster where the default OutputCommitter is a DirectOutputCommitter. Author: Ian Hummel <ian@themodernlife.net> Closes #2450 from themodernlife/spark-3595 and squashes the following commits: f37a0e5 [Ian Hummel] Update based on comments from pwendell a11d9f3 [Ian Hummel] Fix formatting 4359664 [Ian Hummel] Add an example showing usage 8b6be94 [Ian Hummel] Add ability to specify OutputCommitter, espcially useful when writing to an S3 bucket from an EMR cluster
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala7
2 files changed, 7 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index f6703986bd..376e69cd99 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -116,7 +116,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
}
}
} else {
- logWarning ("No need to commit output of task: " + taID.value)
+ logInfo ("No need to commit output of task: " + taID.value)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index f6d9d12fe9..51ba8c2d17 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -872,7 +872,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
}
- hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
+
+ // Use configured output committer if already set
+ if (conf.getOutputCommitter == null) {
+ hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
+ }
+
FileOutputFormat.setOutputPath(hadoopConf,
SparkHadoopWriter.createPathFromString(path, hadoopConf))
saveAsHadoopDataset(hadoopConf)