aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorIan Hummel <ian@themodernlife.net>2014-09-21 13:04:36 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-09-21 13:04:36 -0700
commita0454efe21e5c7ffe1b9bb7b18021a5580952e69 (patch)
tree9c7df79201b003b81e0c54cb07283a69088860dd /core
parentd112a6c79dee7b5d8459696f97d329190e8d09a5 (diff)
downloadspark-a0454efe21e5c7ffe1b9bb7b18021a5580952e69.tar.gz
spark-a0454efe21e5c7ffe1b9bb7b18021a5580952e69.tar.bz2
spark-a0454efe21e5c7ffe1b9bb7b18021a5580952e69.zip
[SPARK-3595] Respect configured OutputCommitters when calling saveAsHadoopFile
Addresses the issue in https://issues.apache.org/jira/browse/SPARK-3595, namely saveAsHadoopFile hardcoding the OutputCommitter. This is not ideal when running Spark jobs that write to S3, especially when running them from an EMR cluster where the default OutputCommitter is a DirectOutputCommitter. Author: Ian Hummel <ian@themodernlife.net> Closes #2450 from themodernlife/spark-3595 and squashes the following commits: f37a0e5 [Ian Hummel] Update based on comments from pwendell a11d9f3 [Ian Hummel] Fix formatting 4359664 [Ian Hummel] Add an example showing usage 8b6be94 [Ian Hummel] Add ability to specify OutputCommitter, espcially useful when writing to an S3 bucket from an EMR cluster
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala107
3 files changed, 91 insertions, 25 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index f6703986bd..376e69cd99 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -116,7 +116,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
}
}
} else {
- logWarning ("No need to commit output of task: " + taID.value)
+ logInfo ("No need to commit output of task: " + taID.value)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index f6d9d12fe9..51ba8c2d17 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -872,7 +872,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
}
- hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
+
+ // Use configured output committer if already set
+ if (conf.getOutputCommitter == null) {
+ hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
+ }
+
FileOutputFormat.setOutputPath(hadoopConf,
SparkHadoopWriter.createPathFromString(path, hadoopConf))
saveAsHadoopDataset(hadoopConf)
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index 63d3ddb4af..e84cc69592 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -17,17 +17,21 @@
package org.apache.spark.rdd
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashSet
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.mapred._
+import org.apache.hadoop.util.Progressable
+
+import scala.collection.mutable.{ArrayBuffer, HashSet}
import scala.util.Random
-import org.scalatest.FunSuite
import com.google.common.io.Files
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.conf.{Configuration, Configurable}
-
-import org.apache.spark.SparkContext._
+import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.mapreduce.{JobContext => NewJobContext, OutputCommitter => NewOutputCommitter,
+OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter,
+TaskAttemptContext => NewTaskAttempContext}
import org.apache.spark.{Partitioner, SharedSparkContext}
+import org.apache.spark.SparkContext._
+import org.scalatest.FunSuite
class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
test("aggregateByKey") {
@@ -467,7 +471,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
// No error, non-configurable formats still work
- pairs.saveAsNewAPIHadoopFile[FakeFormat]("ignored")
+ pairs.saveAsNewAPIHadoopFile[NewFakeFormat]("ignored")
/*
Check that configurable formats get configured:
@@ -478,6 +482,17 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
}
+ test("saveAsHadoopFile should respect configured output committers") {
+ val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
+ val conf = new JobConf()
+ conf.setOutputCommitter(classOf[FakeOutputCommitter])
+
+ FakeOutputCommitter.ran = false
+ pairs.saveAsHadoopFile("ignored", pairs.keyClass, pairs.valueClass, classOf[FakeOutputFormat], conf)
+
+ assert(FakeOutputCommitter.ran, "OutputCommitter was never called")
+ }
+
test("lookup") {
val pairs = sc.parallelize(Array((1,2), (3,4), (5,6), (5,7)))
@@ -621,40 +636,86 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
and the test will therefore throw InstantiationException when saveAsNewAPIHadoopFile
tries to instantiate them with Class.newInstance.
*/
+
+/*
+ * Original Hadoop API
+ */
class FakeWriter extends RecordWriter[Integer, Integer] {
+ override def write(key: Integer, value: Integer): Unit = ()
- def close(p1: TaskAttemptContext) = ()
+ override def close(reporter: Reporter): Unit = ()
+}
+
+class FakeOutputCommitter() extends OutputCommitter() {
+ override def setupJob(jobContext: JobContext): Unit = ()
+
+ override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true
+
+ override def setupTask(taskContext: TaskAttemptContext): Unit = ()
+
+ override def commitTask(taskContext: TaskAttemptContext): Unit = {
+ FakeOutputCommitter.ran = true
+ ()
+ }
+
+ override def abortTask(taskContext: TaskAttemptContext): Unit = ()
+}
+
+/*
+ * Used to communicate state between the test harness and the OutputCommitter.
+ */
+object FakeOutputCommitter {
+ var ran = false
+}
+
+class FakeOutputFormat() extends OutputFormat[Integer, Integer]() {
+ override def getRecordWriter(
+ ignored: FileSystem,
+ job: JobConf, name: String,
+ progress: Progressable): RecordWriter[Integer, Integer] = {
+ new FakeWriter()
+ }
+
+ override def checkOutputSpecs(ignored: FileSystem, job: JobConf): Unit = ()
+}
+
+/*
+ * New-style Hadoop API
+ */
+class NewFakeWriter extends NewRecordWriter[Integer, Integer] {
+
+ def close(p1: NewTaskAttempContext) = ()
def write(p1: Integer, p2: Integer) = ()
}
-class FakeCommitter extends OutputCommitter {
- def setupJob(p1: JobContext) = ()
+class NewFakeCommitter extends NewOutputCommitter {
+ def setupJob(p1: NewJobContext) = ()
- def needsTaskCommit(p1: TaskAttemptContext): Boolean = false
+ def needsTaskCommit(p1: NewTaskAttempContext): Boolean = false
- def setupTask(p1: TaskAttemptContext) = ()
+ def setupTask(p1: NewTaskAttempContext) = ()
- def commitTask(p1: TaskAttemptContext) = ()
+ def commitTask(p1: NewTaskAttempContext) = ()
- def abortTask(p1: TaskAttemptContext) = ()
+ def abortTask(p1: NewTaskAttempContext) = ()
}
-class FakeFormat() extends OutputFormat[Integer, Integer]() {
+class NewFakeFormat() extends NewOutputFormat[Integer, Integer]() {
- def checkOutputSpecs(p1: JobContext) = ()
+ def checkOutputSpecs(p1: NewJobContext) = ()
- def getRecordWriter(p1: TaskAttemptContext): RecordWriter[Integer, Integer] = {
- new FakeWriter()
+ def getRecordWriter(p1: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = {
+ new NewFakeWriter()
}
- def getOutputCommitter(p1: TaskAttemptContext): OutputCommitter = {
- new FakeCommitter()
+ def getOutputCommitter(p1: NewTaskAttempContext): NewOutputCommitter = {
+ new NewFakeCommitter()
}
}
-class ConfigTestFormat() extends FakeFormat() with Configurable {
+class ConfigTestFormat() extends NewFakeFormat() with Configurable {
var setConfCalled = false
def setConf(p1: Configuration) = {
@@ -664,7 +725,7 @@ class ConfigTestFormat() extends FakeFormat() with Configurable {
def getConf: Configuration = null
- override def getRecordWriter(p1: TaskAttemptContext): RecordWriter[Integer, Integer] = {
+ override def getRecordWriter(p1: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = {
assert(setConfCalled, "setConf was never called")
super.getRecordWriter(p1)
}