aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-26 10:58:56 -0700
committerYin Huai <yhuai@databricks.com>2016-04-26 10:58:56 -0700
commit5cb03220a02c70d343e82d69cfd30edb894595a1 (patch)
tree41232454ab88f2c9c1feadd13d43b8cc46873cf5 /sql/hive/src
parent92f66331b4ba3634f54f57ddb5e7962b14aa4ca1 (diff)
downloadspark-5cb03220a02c70d343e82d69cfd30edb894595a1.tar.gz
spark-5cb03220a02c70d343e82d69cfd30edb894595a1.tar.bz2
spark-5cb03220a02c70d343e82d69cfd30edb894595a1.zip
[SPARK-14912][SQL] Propagate data source options to Hadoop configuration
## What changes were proposed in this pull request? We currently have no way for users to propagate options to the underlying library that rely in Hadoop configurations to work. For example, there are various options in parquet-mr that users might want to set, but the data source API does not expose a per-job way to set it. This patch propagates the user-specified options also into Hadoop Configuration. ## How was this patch tested? Used a mock data source implementation to test both the read path and the write path. Author: Reynold Xin <rxin@databricks.com> Closes #12688 from rxin/SPARK-14912.
Diffstat (limited to 'sql/hive/src')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala17
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala23
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala32
3 files changed, 50 insertions, 22 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index cb49fc910b..4f81967a5b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -114,22 +114,21 @@ private[sql] class DefaultSource
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
- options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
- val orcConf = new Configuration(sparkSession.sessionState.hadoopConf)
-
+ options: Map[String, String],
+ hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
if (sparkSession.sessionState.conf.orcFilterPushDown) {
// Sets pushed predicates
OrcFilters.createFilter(filters.toArray).foreach { f =>
- orcConf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo)
- orcConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
+ hadoopConf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo)
+ hadoopConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
}
}
- val broadcastedConf =
- sparkSession.sparkContext.broadcast(new SerializableConfiguration(orcConf))
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
(file: PartitionedFile) => {
- val conf = broadcastedConf.value.value
+ val conf = broadcastedHadoopConf.value.value
// SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this
// case, `OrcFileOperator.readSchema` returns `None`, and we can't read the underlying file
@@ -154,7 +153,7 @@ private[sql] class DefaultSource
// Specifically would be helpful for partitioned datasets.
val orcReader = OrcFile.createReader(
new Path(new URI(file.filePath)), OrcFile.readerOptions(conf))
- new SparkOrcNewRecordReader(orcReader, conf, fileSplit.getStart(), fileSplit.getLength())
+ new SparkOrcNewRecordReader(orcReader, conf, fileSplit.getStart, fileSplit.getLength)
}
// Unwraps `OrcStruct`s to `UnsafeRow`s
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
index 71e3457d25..9ad0887609 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
@@ -65,4 +65,27 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat
.load(file.getCanonicalPath))
}
}
+
+ test("test hadoop conf option propagation") {
+ withTempPath { file =>
+ // Test write side
+ val df = sqlContext.range(10).selectExpr("cast(id as string)")
+ df.write
+ .option("some-random-write-option", "hahah-WRITE")
+ .option("some-null-value-option", null) // test null robustness
+ .option("dataSchema", df.schema.json)
+ .format(dataSourceName).save(file.getAbsolutePath)
+ assert(SimpleTextRelation.lastHadoopConf.get.get("some-random-write-option") == "hahah-WRITE")
+
+ // Test read side
+ val df1 = sqlContext.read
+ .option("some-random-read-option", "hahah-READ")
+ .option("some-null-value-option", null) // test null robustness
+ .option("dataSchema", df.schema.json)
+ .format(dataSourceName)
+ .load(file.getAbsolutePath)
+ df1.count()
+ assert(SimpleTextRelation.lastHadoopConf.get.get("some-random-read-option") == "hahah-READ")
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index e4bd1f93c5..0fa1841415 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -47,13 +47,16 @@ class SimpleTextSource extends FileFormat with DataSourceRegister {
sparkSession: SparkSession,
job: Job,
options: Map[String, String],
- dataSchema: StructType): OutputWriterFactory = new OutputWriterFactory {
- override def newInstance(
- path: String,
- bucketId: Option[Int],
- dataSchema: StructType,
- context: TaskAttemptContext): OutputWriter = {
- new SimpleTextOutputWriter(path, context)
+ dataSchema: StructType): OutputWriterFactory = {
+ SimpleTextRelation.lastHadoopConf = Option(job.getConfiguration)
+ new OutputWriterFactory {
+ override def newInstance(
+ path: String,
+ bucketId: Option[Int],
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter = {
+ new SimpleTextOutputWriter(path, context)
+ }
}
}
@@ -63,8 +66,9 @@ class SimpleTextSource extends FileFormat with DataSourceRegister {
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
- options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
-
+ options: Map[String, String],
+ hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
+ SimpleTextRelation.lastHadoopConf = Option(hadoopConf)
SimpleTextRelation.requiredColumns = requiredSchema.fieldNames
SimpleTextRelation.pushedFilters = filters.toSet
@@ -74,9 +78,8 @@ class SimpleTextSource extends FileFormat with DataSourceRegister {
inputAttributes.find(_.name == field.name)
}
- val conf = new Configuration(sparkSession.sessionState.hadoopConf)
- val broadcastedConf =
- sparkSession.sparkContext.broadcast(new SerializableConfiguration(conf))
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
(file: PartitionedFile) => {
val predicate = {
@@ -95,7 +98,7 @@ class SimpleTextSource extends FileFormat with DataSourceRegister {
val projection = new InterpretedProjection(outputAttributes, inputAttributes)
val unsafeRowIterator =
- new HadoopFileLinesReader(file, broadcastedConf.value.value).map { line =>
+ new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map { line =>
val record = line.toString
new GenericInternalRow(record.split(",", -1).zip(fieldTypes).map {
case (v, dataType) =>
@@ -164,4 +167,7 @@ object SimpleTextRelation {
// Used to test failure callback
var callbackCalled = false
+
+ // Used by the test case to check the value propagated in the hadoop confs.
+ var lastHadoopConf: Option[Configuration] = None
}