aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorIan Hummel <ian@themodernlife.net>2014-09-21 13:04:36 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-09-21 13:06:10 -0700
commit7a766577a466377bf504fa2d8c3ca454844a6ea6 (patch)
treea3163f18bb3d2af446f540a74af7d755efe0851f /core/src/main
parentfd8835323f14ad5cac35b4fca5a3aa031c378854 (diff)
downloadspark-7a766577a466377bf504fa2d8c3ca454844a6ea6.tar.gz
spark-7a766577a466377bf504fa2d8c3ca454844a6ea6.tar.bz2
spark-7a766577a466377bf504fa2d8c3ca454844a6ea6.zip
[SPARK-3595] Respect configured OutputCommitters when calling saveAsHadoopFile
Addresses the issue in https://issues.apache.org/jira/browse/SPARK-3595, namely saveAsHadoopFile hardcoding the OutputCommitter. This is not ideal when running Spark jobs that write to S3, especially when running them from an EMR cluster where the default OutputCommitter is a DirectOutputCommitter. Author: Ian Hummel <ian@themodernlife.net> Closes #2450 from themodernlife/spark-3595 and squashes the following commits: f37a0e5 [Ian Hummel] Update based on comments from pwendell a11d9f3 [Ian Hummel] Fix formatting 4359664 [Ian Hummel] Add an example showing usage 8b6be94 [Ian Hummel] Add ability to specify OutputCommitter, espcially useful when writing to an S3 bucket from an EMR cluster
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala7
2 files changed, 7 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index f6703986bd..376e69cd99 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -116,7 +116,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
}
}
} else {
- logWarning ("No need to commit output of task: " + taID.value)
+ logInfo ("No need to commit output of task: " + taID.value)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index f6d9d12fe9..51ba8c2d17 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -872,7 +872,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
}
- hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
+
+ // Use configured output committer if already set
+ if (conf.getOutputCommitter == null) {
+ hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
+ }
+
FileOutputFormat.setOutputPath(hadoopConf,
SparkHadoopWriter.createPathFromString(path, hadoopConf))
saveAsHadoopDataset(hadoopConf)