aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-06-08 11:34:18 -0700
committerYin Huai <yhuai@databricks.com>2015-06-08 11:34:18 -0700
commitbbdfc0a40fb39760c122e7b9ce80aa1e340e55ee (patch)
tree2ec66ec8e7dedafa4ee720cd7ad7f129204c6b34 /sql/core
parented5c2dccd0397c4c4b0008c437e6845dd583c9c2 (diff)
downloadspark-bbdfc0a40fb39760c122e7b9ce80aa1e340e55ee.tar.gz
spark-bbdfc0a40fb39760c122e7b9ce80aa1e340e55ee.tar.bz2
spark-bbdfc0a40fb39760c122e7b9ce80aa1e340e55ee.zip
[SPARK-8121] [SQL] Fixes InsertIntoHadoopFsRelation job initialization for Hadoop 1.x
For Hadoop 1.x, `TaskAttemptContext` constructor clones the `Configuration` argument, thus configurations done in `HadoopFsRelation.prepareForWriteJob()` are not populated to *driver* side `TaskAttemptContext` (executor side configurations are properly populated). Currently this should only affect Parquet output committer class configuration. Author: Cheng Lian <lian@databricks.com> Closes #6669 from liancheng/spark-8121 and squashes the following commits: 73819e8 [Cheng Lian] Minor logging fix fce089c [Cheng Lian] Adds more logging b6f78a6 [Cheng Lian] Fixes compilation error introduced while rebasing 963a1aa [Cheng Lian] Addresses @yhuai's comment c3a0b1a [Cheng Lian] Fixes InsertIntoHadoopFsRelation job initialization
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala18
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala52
4 files changed, 65 insertions, 13 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index c778889045..be786f9b7f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -76,6 +76,7 @@ private[spark] object SQLConf {
// The output committer class used by FSBasedRelation. The specified class needs to be a
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
+ // NOTE: This property should be set in Hadoop `Configuration` rather than Spark `SQLConf`
val OUTPUT_COMMITTER_CLASS = "spark.sql.sources.outputCommitterClass"
// Whether to perform eager analysis when constructing a dataframe.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 5dda440240..7af4eb1ca4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -212,6 +212,13 @@ private[sql] class ParquetRelation2(
classOf[ParquetOutputCommitter],
classOf[ParquetOutputCommitter])
+ if (conf.get("spark.sql.parquet.output.committer.class") == null) {
+ logInfo("Using default output committer for Parquet: " +
+ classOf[ParquetOutputCommitter].getCanonicalName)
+ } else {
+ logInfo("Using user defined output committer for Parquet: " + committerClass.getCanonicalName)
+ }
+
conf.setClass(
SQLConf.OUTPUT_COMMITTER_CLASS,
committerClass,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index bd3aad6631..c94199bfcd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -297,12 +297,16 @@ private[sql] abstract class BaseWriterContainer(
def driverSideSetup(): Unit = {
setupIDs(0, 0, 0)
setupConf()
- taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
- // This preparation must happen before initializing output format and output committer, since
- // their initialization involves the job configuration, which can be potentially decorated in
- // `relation.prepareJobForWrite`.
+ // Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor
+ // clones the Configuration object passed in. If we initialize the TaskAttemptContext first,
+ // configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext.
+ //
+ // Also, the `prepareJobForWrite` call must happen before initializing output format and output
+ // committer, since their initialization involve the job configuration, which can be potentially
+ // decorated in `prepareJobForWrite`.
outputWriterFactory = relation.prepareJobForWrite(job)
+ taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
outputFormatClass = job.getOutputFormatClass
outputCommitter = newOutputCommitter(taskAttemptContext)
@@ -331,6 +335,8 @@ private[sql] abstract class BaseWriterContainer(
SQLConf.OUTPUT_COMMITTER_CLASS, null, classOf[OutputCommitter])
Option(committerClass).map { clazz =>
+ logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
+
// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
// has an associated output committer. To override this output committer,
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
@@ -350,7 +356,9 @@ private[sql] abstract class BaseWriterContainer(
}.getOrElse {
// If output committer class is not set, we will use the one associated with the
// file output format.
- outputFormatClass.newInstance().getOutputCommitter(context)
+ val outputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
+ logInfo(s"Using output committer class ${outputCommitter.getClass.getCanonicalName}")
+ outputCommitter
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index 2b6a27032e..46b25859d9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -23,16 +23,18 @@ import scala.reflect.runtime.universe.TypeTag
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
-import org.scalatest.BeforeAndAfterAll
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.parquet.example.data.simple.SimpleGroup
import org.apache.parquet.example.data.{Group, GroupWriter}
import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
-import org.apache.parquet.hadoop.metadata.{ParquetMetadata, FileMetaData, CompressionCodecName}
-import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, ParquetWriter}
+import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata}
+import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, ParquetOutputCommitter, ParquetWriter}
import org.apache.parquet.io.api.RecordConsumer
import org.apache.parquet.schema.{MessageType, MessageTypeParser}
+import org.scalatest.BeforeAndAfterAll
+import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.util.DateUtils
@@ -196,7 +198,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
withParquetDataFrame(allNulls :: Nil) { df =>
val rows = df.collect()
- assert(rows.size === 1)
+ assert(rows.length === 1)
assert(rows.head === Row(Seq.fill(5)(null): _*))
}
}
@@ -209,7 +211,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
withParquetDataFrame(allNones :: Nil) { df =>
val rows = df.collect()
- assert(rows.size === 1)
+ assert(rows.length === 1)
assert(rows.head === Row(Seq.fill(3)(null): _*))
}
}
@@ -379,6 +381,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
}
test("SPARK-6352 DirectParquetOutputCommitter") {
+ val clonedConf = new Configuration(configuration)
+
// Write to a parquet file and let it fail.
// _temporary should be missing if direct output committer works.
try {
@@ -393,14 +397,46 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
val fs = path.getFileSystem(configuration)
assert(!fs.exists(path))
}
+ } finally {
+ // Hadoop 1 doesn't have `Configuration.unset`
+ configuration.clear()
+ clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
}
- finally {
- configuration.set("spark.sql.parquet.output.committer.class",
- "org.apache.parquet.hadoop.ParquetOutputCommitter")
+ }
+
+ test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overriden") {
+ withTempPath { dir =>
+ val clonedConf = new Configuration(configuration)
+
+ configuration.set(
+ SQLConf.OUTPUT_COMMITTER_CLASS, classOf[ParquetOutputCommitter].getCanonicalName)
+
+ configuration.set(
+ "spark.sql.parquet.output.committer.class",
+ classOf[BogusParquetOutputCommitter].getCanonicalName)
+
+ try {
+ val message = intercept[SparkException] {
+ sqlContext.range(0, 1).write.parquet(dir.getCanonicalPath)
+ }.getCause.getMessage
+ assert(message === "Intentional exception for testing purposes")
+ } finally {
+ // Hadoop 1 doesn't have `Configuration.unset`
+ configuration.clear()
+ clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
+ }
}
}
}
+class BogusParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
+ extends ParquetOutputCommitter(outputPath, context) {
+
+ override def commitJob(jobContext: JobContext): Unit = {
+ sys.error("Intentional exception for testing purposes")
+ }
+}
+
class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterAll {
private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi