aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjiangxingbo <jiangxb1987@gmail.com>2016-11-09 13:14:26 -0800
committerReynold Xin <rxin@databricks.com>2016-11-09 13:14:26 -0800
commit64fbdf1aa90b66269daec29f62dc9431c1173bab (patch)
tree68fff8366110c21b6806e0c5537a2882c30cc318
parentd8b81f778af8c3d7112ad37f691c49215b392836 (diff)
downloadspark-64fbdf1aa90b66269daec29f62dc9431c1173bab.tar.gz
spark-64fbdf1aa90b66269daec29f62dc9431c1173bab.tar.bz2
spark-64fbdf1aa90b66269daec29f62dc9431c1173bab.zip
[SPARK-18191][CORE][FOLLOWUP] Call `setConf` if `OutputFormat` is `Configurable`.
## What changes were proposed in this pull request? We should call `setConf` if `OutputFormat` is `Configurable`, this should be done before we create `OutputCommitter` and `RecordWriter`. This is follow up of #15769, see discussion [here](https://github.com/apache/spark/pull/15769/files#r87064229) ## How was this patch tested? Add test of this case in `PairRDDFunctionsSuite`. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #15823 from jiangxb1987/config-format.
-rw-r--r--core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala15
3 files changed, 30 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
index d643a32af0..6b0bcb8f90 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
@@ -19,6 +19,7 @@ package org.apache.spark.internal.io
import java.util.Date
+import org.apache.hadoop.conf.Configurable
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
@@ -42,7 +43,13 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
@transient private var committer: OutputCommitter = _
protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
- context.getOutputFormatClass.newInstance().getOutputCommitter(context)
+ val format = context.getOutputFormatClass.newInstance()
+ // If OutputFormat is Configurable, we should set conf to it.
+ format match {
+ case c: Configurable => c.setConf(context.getConfiguration)
+ case _ => ()
+ }
+ format.getOutputCommitter(context)
}
override def newTaskTempFile(
diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
index a405c44e10..796439276a 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
@@ -23,7 +23,7 @@ import java.util.{Date, Locale}
import scala.reflect.ClassTag
import scala.util.DynamicVariable
-import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.{JobConf, JobID}
import org.apache.hadoop.mapreduce._
@@ -140,7 +140,12 @@ object SparkHadoopMapReduceWriter extends Logging {
SparkHadoopWriterUtils.initHadoopOutputMetrics(context)
// Initiate the writer.
- val taskFormat = outputFormat.newInstance
+ val taskFormat = outputFormat.newInstance()
+ // If OutputFormat is Configurable, we should set conf to it.
+ taskFormat match {
+ case c: Configurable => c.setConf(hadoopConf)
+ case _ => ()
+ }
val writer = taskFormat.getRecordWriter(taskContext)
.asInstanceOf[RecordWriter[K, V]]
require(writer != null, "Unable to obtain RecordWriter")
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index fe547d4d91..02df157be3 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -509,6 +509,21 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
(2, ArrayBuffer(1))))
}
+ test("saveNewAPIHadoopFile should call setConf if format is configurable") {
+ val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
+
+ // No error, non-configurable formats still work
+ pairs.saveAsNewAPIHadoopFile[NewFakeFormat]("ignored")
+
+ /*
+ * Check that configurable formats get configured:
+ * ConfigTestFormat throws an exception if we try to write
+ * to it when setConf hasn't been called first.
+ * Assertion is in ConfigTestFormat.getRecordWriter.
+ */
+ pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
+ }
+
test("saveAsHadoopFile should respect configured output committers") {
val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
val conf = new JobConf()