diff options
author | Reynold Xin <rxin@databricks.com> | 2016-04-26 10:58:56 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-04-26 10:58:56 -0700 |
commit | 5cb03220a02c70d343e82d69cfd30edb894595a1 (patch) | |
tree | 41232454ab88f2c9c1feadd13d43b8cc46873cf5 /sql/hive/src | |
parent | 92f66331b4ba3634f54f57ddb5e7962b14aa4ca1 (diff) | |
download | spark-5cb03220a02c70d343e82d69cfd30edb894595a1.tar.gz spark-5cb03220a02c70d343e82d69cfd30edb894595a1.tar.bz2 spark-5cb03220a02c70d343e82d69cfd30edb894595a1.zip |
[SPARK-14912][SQL] Propagate data source options to Hadoop configuration
## What changes were proposed in this pull request?
We currently have no way for users to propagate options to the underlying library that rely in Hadoop configurations to work. For example, there are various options in parquet-mr that users might want to set, but the data source API does not expose a per-job way to set it. This patch propagates the user-specified options also into Hadoop Configuration.
## How was this patch tested?
Used a mock data source implementation to test both the read path and the write path.
Author: Reynold Xin <rxin@databricks.com>
Closes #12688 from rxin/SPARK-14912.
Diffstat (limited to 'sql/hive/src')
3 files changed, 50 insertions, 22 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index cb49fc910b..4f81967a5b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -114,22 +114,21 @@ private[sql] class DefaultSource partitionSchema: StructType, requiredSchema: StructType, filters: Seq[Filter], - options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = { - val orcConf = new Configuration(sparkSession.sessionState.hadoopConf) - + options: Map[String, String], + hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { if (sparkSession.sessionState.conf.orcFilterPushDown) { // Sets pushed predicates OrcFilters.createFilter(filters.toArray).foreach { f => - orcConf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo) - orcConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true) + hadoopConf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo) + hadoopConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true) } } - val broadcastedConf = - sparkSession.sparkContext.broadcast(new SerializableConfiguration(orcConf)) + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) (file: PartitionedFile) => { - val conf = broadcastedConf.value.value + val conf = broadcastedHadoopConf.value.value // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this // case, `OrcFileOperator.readSchema` returns `None`, and we can't read the underlying file @@ -154,7 +153,7 @@ private[sql] class DefaultSource // Specifically would be helpful for partitioned datasets. val orcReader = OrcFile.createReader( new Path(new URI(file.filePath)), OrcFile.readerOptions(conf)) - new SparkOrcNewRecordReader(orcReader, conf, fileSplit.getStart(), fileSplit.getLength()) + new SparkOrcNewRecordReader(orcReader, conf, fileSplit.getStart, fileSplit.getLength) } // Unwraps `OrcStruct`s to `UnsafeRow`s diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala index 71e3457d25..9ad0887609 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala @@ -65,4 +65,27 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat .load(file.getCanonicalPath)) } } + + test("test hadoop conf option propagation") { + withTempPath { file => + // Test write side + val df = sqlContext.range(10).selectExpr("cast(id as string)") + df.write + .option("some-random-write-option", "hahah-WRITE") + .option("some-null-value-option", null) // test null robustness + .option("dataSchema", df.schema.json) + .format(dataSourceName).save(file.getAbsolutePath) + assert(SimpleTextRelation.lastHadoopConf.get.get("some-random-write-option") == "hahah-WRITE") + + // Test read side + val df1 = sqlContext.read + .option("some-random-read-option", "hahah-READ") + .option("some-null-value-option", null) // test null robustness + .option("dataSchema", df.schema.json) + .format(dataSourceName) + .load(file.getAbsolutePath) + df1.count() + assert(SimpleTextRelation.lastHadoopConf.get.get("some-random-read-option") == "hahah-READ") + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index e4bd1f93c5..0fa1841415 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -47,13 +47,16 @@ class SimpleTextSource extends FileFormat with DataSourceRegister { sparkSession: SparkSession, job: Job, options: Map[String, String], - dataSchema: StructType): OutputWriterFactory = new OutputWriterFactory { - override def newInstance( - path: String, - bucketId: Option[Int], - dataSchema: StructType, - context: TaskAttemptContext): OutputWriter = { - new SimpleTextOutputWriter(path, context) + dataSchema: StructType): OutputWriterFactory = { + SimpleTextRelation.lastHadoopConf = Option(job.getConfiguration) + new OutputWriterFactory { + override def newInstance( + path: String, + bucketId: Option[Int], + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + new SimpleTextOutputWriter(path, context) + } } } @@ -63,8 +66,9 @@ class SimpleTextSource extends FileFormat with DataSourceRegister { partitionSchema: StructType, requiredSchema: StructType, filters: Seq[Filter], - options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = { - + options: Map[String, String], + hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { + SimpleTextRelation.lastHadoopConf = Option(hadoopConf) SimpleTextRelation.requiredColumns = requiredSchema.fieldNames SimpleTextRelation.pushedFilters = filters.toSet @@ -74,9 +78,8 @@ class SimpleTextSource extends FileFormat with DataSourceRegister { inputAttributes.find(_.name == field.name) } - val conf = new Configuration(sparkSession.sessionState.hadoopConf) - val broadcastedConf = - sparkSession.sparkContext.broadcast(new SerializableConfiguration(conf)) + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) (file: PartitionedFile) => { val predicate = { @@ -95,7 +98,7 @@ class SimpleTextSource extends FileFormat with DataSourceRegister { val projection = new InterpretedProjection(outputAttributes, inputAttributes) val unsafeRowIterator = - new HadoopFileLinesReader(file, broadcastedConf.value.value).map { line => + new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map { line => val record = line.toString new GenericInternalRow(record.split(",", -1).zip(fieldTypes).map { case (v, dataType) => @@ -164,4 +167,7 @@ object SimpleTextRelation { // Used to test failure callback var callbackCalled = false + + // Used by the test case to check the value propagated in the hadoop confs. + var lastHadoopConf: Option[Configuration] = None } |