aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala23
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala17
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala23
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala32
12 files changed, 99 insertions, 57 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index f07374ac0c..ba2e1e2bc2 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -181,20 +181,20 @@ class DefaultSource extends FileFormat with DataSourceRegister {
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
- options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
+ options: Map[String, String],
+ hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
verifySchema(dataSchema)
val numFeatures = options("numFeatures").toInt
assert(numFeatures > 0)
val sparse = options.getOrElse("vectorType", "sparse") == "sparse"
- val broadcastedConf = sparkSession.sparkContext.broadcast(
- new SerializableConfiguration(
- new Configuration(sparkSession.sparkContext.hadoopConfiguration)))
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
(file: PartitionedFile) => {
val points =
- new HadoopFileLinesReader(file, broadcastedConf.value.value)
+ new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value)
.map(_.toString.trim)
.filterNot(line => line.isEmpty || line.startsWith("#"))
.map { line =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 9e1308bed5..c26cae84d7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources
import scala.collection.mutable.ArrayBuffer
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
import org.apache.spark.internal.Logging
@@ -106,13 +107,17 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
+ val hadoopConf = new Configuration(files.sparkSession.sessionState.hadoopConf)
+ files.options.foreach { case (k, v) => if (v ne null) hadoopConf.set(k, v) }
+
val readFile = files.fileFormat.buildReader(
sparkSession = files.sparkSession,
dataSchema = files.dataSchema,
partitionSchema = files.partitionSchema,
requiredSchema = prunedDataSchema,
filters = pushedDownFilters,
- options = files.options)
+ options = files.options,
+ hadoopConf = hadoopConf)
val plannedPartitions = files.bucketSpec match {
case Some(bucketing) if files.sparkSession.sessionState.conf.bucketingEnabled =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
index b2483e69a6..fa954975b8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
@@ -106,6 +106,10 @@ private[sql] case class InsertIntoHadoopFsRelation(
val job = Job.getInstance(hadoopConf)
job.setOutputKeyClass(classOf[Void])
job.setOutputValueClass(classOf[InternalRow])
+
+ // Also set the options in Hadoop Configuration
+ options.foreach { case (k, v) => if (v ne null) job.getConfiguration.set(k, v) }
+
FileOutputFormat.setOutputPath(job, qualifiedOutputPath)
val partitionSet = AttributeSet(partitionColumns)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
index fb047ff867..8ca105d923 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
@@ -99,16 +99,17 @@ class DefaultSource extends FileFormat with DataSourceRegister {
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
- options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
+ options: Map[String, String],
+ hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
val csvOptions = new CSVOptions(options)
val headers = requiredSchema.fields.map(_.name)
- val conf = new Configuration(sparkSession.sessionState.hadoopConf)
- val broadcastedConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(conf))
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
(file: PartitionedFile) => {
val lineIterator = {
- val conf = broadcastedConf.value.value
+ val conf = broadcastedHadoopConf.value.value
new HadoopFileLinesReader(file, conf).map { line =>
new String(line.getBytes, 0, line.getLength, csvOptions.charset)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index 2628788ad3..3058e79201 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -217,7 +217,8 @@ trait FileFormat {
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
- options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
+ options: Map[String, String],
+ hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
// TODO: Remove this default implementation when the other formats have been ported
// Until then we guard in [[FileSourceStrategy]] to only call this method on supported formats.
throw new UnsupportedOperationException(s"buildReader is not supported for $this")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index f9c34c6bb5..b6b3907e3e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -97,10 +97,10 @@ class DefaultSource extends FileFormat with DataSourceRegister {
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
- options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
- val conf = new Configuration(sparkSession.sessionState.hadoopConf)
- val broadcastedConf =
- sparkSession.sparkContext.broadcast(new SerializableConfiguration(conf))
+ options: Map[String, String],
+ hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
val parsedOptions: JSONOptions = new JSONOptions(options)
val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord
@@ -109,8 +109,8 @@ class DefaultSource extends FileFormat with DataSourceRegister {
val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val joinedRow = new JoinedRow()
- file => {
- val lines = new HadoopFileLinesReader(file, broadcastedConf.value.value).map(_.toString)
+ (file: PartitionedFile) => {
+ val lines = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map(_.toString)
val rows = JacksonParser.parseJson(
lines,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index b156581564..5be8770790 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -262,13 +262,13 @@ private[sql] class DefaultSource
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
- options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
- val parquetConf = new Configuration(sparkSession.sessionState.hadoopConf)
- parquetConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
- parquetConf.set(
+ options: Map[String, String],
+ hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+ hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
+ hadoopConf.set(
CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
CatalystSchemaConverter.checkFieldNames(requiredSchema).json)
- parquetConf.set(
+ hadoopConf.set(
CatalystWriteSupport.SPARK_ROW_SCHEMA,
CatalystSchemaConverter.checkFieldNames(requiredSchema).json)
@@ -276,13 +276,13 @@ private[sql] class DefaultSource
// This metadata is only useful for detecting optional columns when pushdowning filters.
val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
requiredSchema).asInstanceOf[StructType]
- CatalystWriteSupport.setSchema(dataSchemaToWrite, parquetConf)
+ CatalystWriteSupport.setSchema(dataSchemaToWrite, hadoopConf)
// Sets flags for `CatalystSchemaConverter`
- parquetConf.setBoolean(
+ hadoopConf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
sparkSession.getConf(SQLConf.PARQUET_BINARY_AS_STRING))
- parquetConf.setBoolean(
+ hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP))
@@ -298,8 +298,8 @@ private[sql] class DefaultSource
None
}
- val broadcastedConf =
- sparkSession.sparkContext.broadcast(new SerializableConfiguration(parquetConf))
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
// TODO: if you move this into the closure it reverts to the default values.
// If true, enable using the custom RecordReader for parquet. This only works for
@@ -327,7 +327,8 @@ private[sql] class DefaultSource
null)
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
- val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedConf.value.value, attemptId)
+ val hadoopAttemptContext =
+ new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
val parquetReader = if (enableVectorizedReader) {
val vectorizedReader = new VectorizedParquetRecordReader()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index a0d680c708..348edfcf7a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -89,17 +89,17 @@ class DefaultSource extends FileFormat with DataSourceRegister {
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
- options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
- val conf = new Configuration(sparkSession.sessionState.hadoopConf)
- val broadcastedConf =
- sparkSession.sparkContext.broadcast(new SerializableConfiguration(conf))
+ options: Map[String, String],
+ hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
- file => {
+ (file: PartitionedFile) => {
val unsafeRow = new UnsafeRow(1)
val bufferHolder = new BufferHolder(unsafeRow)
val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1)
- new HadoopFileLinesReader(file, broadcastedConf.value.value).map { line =>
+ new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map { line =>
// Writes to an UnsafeRow directly
bufferHolder.reset()
unsafeRowWriter.write(0, line.getBytes, 0, line.getLength)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 9da0af3a76..f73d485acf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources
import java.io.File
import java.util.concurrent.atomic.AtomicInteger
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{BlockLocation, FileStatus, RawLocalFileSystem}
import org.apache.hadoop.mapreduce.Job
@@ -476,7 +477,8 @@ class TestFileFormat extends FileFormat {
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
- options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
+ options: Map[String, String],
+ hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
// Record the arguments so they can be checked in the test case.
LastArguments.partitionSchema = partitionSchema
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index cb49fc910b..4f81967a5b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -114,22 +114,21 @@ private[sql] class DefaultSource
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
- options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
- val orcConf = new Configuration(sparkSession.sessionState.hadoopConf)
-
+ options: Map[String, String],
+ hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
if (sparkSession.sessionState.conf.orcFilterPushDown) {
// Sets pushed predicates
OrcFilters.createFilter(filters.toArray).foreach { f =>
- orcConf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo)
- orcConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
+ hadoopConf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo)
+ hadoopConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
}
}
- val broadcastedConf =
- sparkSession.sparkContext.broadcast(new SerializableConfiguration(orcConf))
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
(file: PartitionedFile) => {
- val conf = broadcastedConf.value.value
+ val conf = broadcastedHadoopConf.value.value
// SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this
// case, `OrcFileOperator.readSchema` returns `None`, and we can't read the underlying file
@@ -154,7 +153,7 @@ private[sql] class DefaultSource
// Specifically would be helpful for partitioned datasets.
val orcReader = OrcFile.createReader(
new Path(new URI(file.filePath)), OrcFile.readerOptions(conf))
- new SparkOrcNewRecordReader(orcReader, conf, fileSplit.getStart(), fileSplit.getLength())
+ new SparkOrcNewRecordReader(orcReader, conf, fileSplit.getStart, fileSplit.getLength)
}
// Unwraps `OrcStruct`s to `UnsafeRow`s
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
index 71e3457d25..9ad0887609 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
@@ -65,4 +65,27 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat
.load(file.getCanonicalPath))
}
}
+
+ test("test hadoop conf option propagation") {
+ withTempPath { file =>
+ // Test write side
+ val df = sqlContext.range(10).selectExpr("cast(id as string)")
+ df.write
+ .option("some-random-write-option", "hahah-WRITE")
+ .option("some-null-value-option", null) // test null robustness
+ .option("dataSchema", df.schema.json)
+ .format(dataSourceName).save(file.getAbsolutePath)
+ assert(SimpleTextRelation.lastHadoopConf.get.get("some-random-write-option") == "hahah-WRITE")
+
+ // Test read side
+ val df1 = sqlContext.read
+ .option("some-random-read-option", "hahah-READ")
+ .option("some-null-value-option", null) // test null robustness
+ .option("dataSchema", df.schema.json)
+ .format(dataSourceName)
+ .load(file.getAbsolutePath)
+ df1.count()
+ assert(SimpleTextRelation.lastHadoopConf.get.get("some-random-read-option") == "hahah-READ")
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index e4bd1f93c5..0fa1841415 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -47,13 +47,16 @@ class SimpleTextSource extends FileFormat with DataSourceRegister {
sparkSession: SparkSession,
job: Job,
options: Map[String, String],
- dataSchema: StructType): OutputWriterFactory = new OutputWriterFactory {
- override def newInstance(
- path: String,
- bucketId: Option[Int],
- dataSchema: StructType,
- context: TaskAttemptContext): OutputWriter = {
- new SimpleTextOutputWriter(path, context)
+ dataSchema: StructType): OutputWriterFactory = {
+ SimpleTextRelation.lastHadoopConf = Option(job.getConfiguration)
+ new OutputWriterFactory {
+ override def newInstance(
+ path: String,
+ bucketId: Option[Int],
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter = {
+ new SimpleTextOutputWriter(path, context)
+ }
}
}
@@ -63,8 +66,9 @@ class SimpleTextSource extends FileFormat with DataSourceRegister {
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
- options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
-
+ options: Map[String, String],
+ hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
+ SimpleTextRelation.lastHadoopConf = Option(hadoopConf)
SimpleTextRelation.requiredColumns = requiredSchema.fieldNames
SimpleTextRelation.pushedFilters = filters.toSet
@@ -74,9 +78,8 @@ class SimpleTextSource extends FileFormat with DataSourceRegister {
inputAttributes.find(_.name == field.name)
}
- val conf = new Configuration(sparkSession.sessionState.hadoopConf)
- val broadcastedConf =
- sparkSession.sparkContext.broadcast(new SerializableConfiguration(conf))
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
(file: PartitionedFile) => {
val predicate = {
@@ -95,7 +98,7 @@ class SimpleTextSource extends FileFormat with DataSourceRegister {
val projection = new InterpretedProjection(outputAttributes, inputAttributes)
val unsafeRowIterator =
- new HadoopFileLinesReader(file, broadcastedConf.value.value).map { line =>
+ new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map { line =>
val record = line.toString
new GenericInternalRow(record.split(",", -1).zip(fieldTypes).map {
case (v, dataType) =>
@@ -164,4 +167,7 @@ object SimpleTextRelation {
// Used to test failure callback
var callbackCalled = false
+
+ // Used by the test case to check the value propagated in the hadoop confs.
+ var lastHadoopConf: Option[Configuration] = None
}