aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-26 10:58:56 -0700
committerYin Huai <yhuai@databricks.com>2016-04-26 10:58:56 -0700
commit5cb03220a02c70d343e82d69cfd30edb894595a1 (patch)
tree41232454ab88f2c9c1feadd13d43b8cc46873cf5
parent92f66331b4ba3634f54f57ddb5e7962b14aa4ca1 (diff)
downloadspark-5cb03220a02c70d343e82d69cfd30edb894595a1.tar.gz
spark-5cb03220a02c70d343e82d69cfd30edb894595a1.tar.bz2
spark-5cb03220a02c70d343e82d69cfd30edb894595a1.zip
[SPARK-14912][SQL] Propagate data source options to Hadoop configuration
## What changes were proposed in this pull request? We currently have no way for users to propagate options to the underlying library that rely in Hadoop configurations to work. For example, there are various options in parquet-mr that users might want to set, but the data source API does not expose a per-job way to set it. This patch propagates the user-specified options also into Hadoop Configuration. ## How was this patch tested? Used a mock data source implementation to test both the read path and the write path. Author: Reynold Xin <rxin@databricks.com> Closes #12688 from rxin/SPARK-14912.
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala23
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala17
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala23
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala32
12 files changed, 99 insertions, 57 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index f07374ac0c..ba2e1e2bc2 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -181,20 +181,20 @@ class DefaultSource extends FileFormat with DataSourceRegister {
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
- options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
+ options: Map[String, String],
+ hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
verifySchema(dataSchema)
val numFeatures = options("numFeatures").toInt
assert(numFeatures > 0)
val sparse = options.getOrElse("vectorType", "sparse") == "sparse"
- val broadcastedConf = sparkSession.sparkContext.broadcast(
- new SerializableConfiguration(
- new Configuration(sparkSession.sparkContext.hadoopConfiguration)))
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
(file: PartitionedFile) => {
val points =
- new HadoopFileLinesReader(file, broadcastedConf.value.value)
+ new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value)
.map(_.toString.trim)
.filterNot(line => line.isEmpty || line.startsWith("#"))
.map { line =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 9e1308bed5..c26cae84d7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources
import scala.collection.mutable.ArrayBuffer
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
import org.apache.spark.internal.Logging
@@ -106,13 +107,17 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
+ val hadoopConf = new Configuration(files.sparkSession.sessionState.hadoopConf)
+ files.options.foreach { case (k, v) => if (v ne null) hadoopConf.set(k, v) }
+
val readFile = files.fileFormat.buildReader(
sparkSession = files.sparkSession,
dataSchema = files.dataSchema,
partitionSchema = files.partitionSchema,
requiredSchema = prunedDataSchema,
filters = pushedDownFilters,
- options = files.options)
+ options = files.options,
+ hadoopConf = hadoopConf)
val plannedPartitions = files.bucketSpec match {
case Some(bucketing) if files.sparkSession.sessionState.conf.bucketingEnabled =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
index b2483e69a6..fa954975b8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
@@ -106,6 +106,10 @@ private[sql] case class InsertIntoHadoopFsRelation(
val job = Job.getInstance(hadoopConf)
job.setOutputKeyClass(classOf[Void])
job.setOutputValueClass(classOf[InternalRow])
+
+ // Also set the options in Hadoop Configuration
+ options.foreach { case (k, v) => if (v ne null) job.getConfiguration.set(k, v) }
+
FileOutputFormat.setOutputPath(job, qualifiedOutputPath)
val partitionSet = AttributeSet(partitionColumns)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
index fb047ff867..8ca105d923 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
@@ -99,16 +99,17 @@ class DefaultSource extends FileFormat with DataSourceRegister {
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
- options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
+ options: Map[String, String],
+ hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
val csvOptions = new CSVOptions(options)
val headers = requiredSchema.fields.map(_.name)
- val conf = new Configuration(sparkSession.sessionState.hadoopConf)
- val broadcastedConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(conf))
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
(file: PartitionedFile) => {
val lineIterator = {
- val conf = broadcastedConf.value.value
+ val conf = broadcastedHadoopConf.value.value
new HadoopFileLinesReader(file, conf).map { line =>
new String(line.getBytes, 0, line.getLength, csvOptions.charset)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index 2628788ad3..3058e79201 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -217,7 +217,8 @@ trait FileFormat {
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
- options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
+ options: Map[String, String],
+ hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
// TODO: Remove this default implementation when the other formats have been ported
// Until then we guard in [[FileSourceStrategy]] to only call this method on supported formats.
throw new UnsupportedOperationException(s"buildReader is not supported for $this")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index f9c34c6bb5..b6b3907e3e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -97,10 +97,10 @@ class DefaultSource extends FileFormat with DataSourceRegister {
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
- options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
- val conf = new Configuration(sparkSession.sessionState.hadoopConf)
- val broadcastedConf =
- sparkSession.sparkContext.broadcast(new SerializableConfiguration(conf))
+ options: Map[String, String],
+ hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
val parsedOptions: JSONOptions = new JSONOptions(options)
val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord
@@ -109,8 +109,8 @@ class DefaultSource extends FileFormat with DataSourceRegister {
val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val joinedRow = new JoinedRow()
- file => {
- val lines = new HadoopFileLinesReader(file, broadcastedConf.value.value).map(_.toString)
+ (file: PartitionedFile) => {
+ val lines = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map(_.toString)
val rows = JacksonParser.parseJson(
lines,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index b156581564..5be8770790 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -262,13 +262,13 @@ private[sql] class DefaultSource
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
- options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
- val parquetConf = new Configuration(sparkSession.sessionState.hadoopConf)
- parquetConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
- parquetConf.set(
+ options: Map[String, String],
+ hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+ hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
+ hadoopConf.set(
CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
CatalystSchemaConverter.checkFieldNames(requiredSchema).json)
- parquetConf.set(
+ hadoopConf.set(
CatalystWriteSupport.SPARK_ROW_SCHEMA,
CatalystSchemaConverter.checkFieldNames(requiredSchema).json)
@@ -276,13 +276,13 @@ private[sql] class DefaultSource
// This metadata is only useful for detecting optional columns when pushdowning filters.
val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
requiredSchema).asInstanceOf[StructType]
- CatalystWriteSupport.setSchema(dataSchemaToWrite, parquetConf)
+ CatalystWriteSupport.setSchema(dataSchemaToWrite, hadoopConf)
// Sets flags for `CatalystSchemaConverter`
- parquetConf.setBoolean(
+ hadoopConf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
sparkSession.getConf(SQLConf.PARQUET_BINARY_AS_STRING))
- parquetConf.setBoolean(
+ hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP))
@@ -298,8 +298,8 @@ private[sql] class DefaultSource
None
}
- val broadcastedConf =
- sparkSession.sparkContext.broadcast(new SerializableConfiguration(parquetConf))
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
// TODO: if you move this into the closure it reverts to the default values.
// If true, enable using the custom RecordReader for parquet. This only works for
@@ -327,7 +327,8 @@ private[sql] class DefaultSource
null)
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
- val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedConf.value.value, attemptId)
+ val hadoopAttemptContext =
+ new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
val parquetReader = if (enableVectorizedReader) {
val vectorizedReader = new VectorizedParquetRecordReader()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index a0d680c708..348edfcf7a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -89,17 +89,17 @@ class DefaultSource extends FileFormat with DataSourceRegister {
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
- options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
- val conf = new Configuration(sparkSession.sessionState.hadoopConf)
- val broadcastedConf =
- sparkSession.sparkContext.broadcast(new SerializableConfiguration(conf))
+ options: Map[String, String],
+ hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
- file => {
+ (file: PartitionedFile) => {
val unsafeRow = new UnsafeRow(1)
val bufferHolder = new BufferHolder(unsafeRow)
val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1)
- new HadoopFileLinesReader(file, broadcastedConf.value.value).map { line =>
+ new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map { line =>
// Writes to an UnsafeRow directly
bufferHolder.reset()
unsafeRowWriter.write(0, line.getBytes, 0, line.getLength)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 9da0af3a76..f73d485acf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources
import java.io.File
import java.util.concurrent.atomic.AtomicInteger
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{BlockLocation, FileStatus, RawLocalFileSystem}
import org.apache.hadoop.mapreduce.Job
@@ -476,7 +477,8 @@ class TestFileFormat extends FileFormat {
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
- options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
+ options: Map[String, String],
+ hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
// Record the arguments so they can be checked in the test case.
LastArguments.partitionSchema = partitionSchema
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index cb49fc910b..4f81967a5b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -114,22 +114,21 @@ private[sql] class DefaultSource
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
- options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
- val orcConf = new Configuration(sparkSession.sessionState.hadoopConf)
-
+ options: Map[String, String],
+ hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
if (sparkSession.sessionState.conf.orcFilterPushDown) {
// Sets pushed predicates
OrcFilters.createFilter(filters.toArray).foreach { f =>
- orcConf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo)
- orcConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
+ hadoopConf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo)
+ hadoopConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
}
}
- val broadcastedConf =
- sparkSession.sparkContext.broadcast(new SerializableConfiguration(orcConf))
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
(file: PartitionedFile) => {
- val conf = broadcastedConf.value.value
+ val conf = broadcastedHadoopConf.value.value
// SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this
// case, `OrcFileOperator.readSchema` returns `None`, and we can't read the underlying file
@@ -154,7 +153,7 @@ private[sql] class DefaultSource
// Specifically would be helpful for partitioned datasets.
val orcReader = OrcFile.createReader(
new Path(new URI(file.filePath)), OrcFile.readerOptions(conf))
- new SparkOrcNewRecordReader(orcReader, conf, fileSplit.getStart(), fileSplit.getLength())
+ new SparkOrcNewRecordReader(orcReader, conf, fileSplit.getStart, fileSplit.getLength)
}
// Unwraps `OrcStruct`s to `UnsafeRow`s
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
index 71e3457d25..9ad0887609 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
@@ -65,4 +65,27 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat
.load(file.getCanonicalPath))
}
}
+
+ test("test hadoop conf option propagation") {
+ withTempPath { file =>
+ // Test write side
+ val df = sqlContext.range(10).selectExpr("cast(id as string)")
+ df.write
+ .option("some-random-write-option", "hahah-WRITE")
+ .option("some-null-value-option", null) // test null robustness
+ .option("dataSchema", df.schema.json)
+ .format(dataSourceName).save(file.getAbsolutePath)
+ assert(SimpleTextRelation.lastHadoopConf.get.get("some-random-write-option") == "hahah-WRITE")
+
+ // Test read side
+ val df1 = sqlContext.read
+ .option("some-random-read-option", "hahah-READ")
+ .option("some-null-value-option", null) // test null robustness
+ .option("dataSchema", df.schema.json)
+ .format(dataSourceName)
+ .load(file.getAbsolutePath)
+ df1.count()
+ assert(SimpleTextRelation.lastHadoopConf.get.get("some-random-read-option") == "hahah-READ")
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index e4bd1f93c5..0fa1841415 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -47,13 +47,16 @@ class SimpleTextSource extends FileFormat with DataSourceRegister {
sparkSession: SparkSession,
job: Job,
options: Map[String, String],
- dataSchema: StructType): OutputWriterFactory = new OutputWriterFactory {
- override def newInstance(
- path: String,
- bucketId: Option[Int],
- dataSchema: StructType,
- context: TaskAttemptContext): OutputWriter = {
- new SimpleTextOutputWriter(path, context)
+ dataSchema: StructType): OutputWriterFactory = {
+ SimpleTextRelation.lastHadoopConf = Option(job.getConfiguration)
+ new OutputWriterFactory {
+ override def newInstance(
+ path: String,
+ bucketId: Option[Int],
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter = {
+ new SimpleTextOutputWriter(path, context)
+ }
}
}
@@ -63,8 +66,9 @@ class SimpleTextSource extends FileFormat with DataSourceRegister {
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
- options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
-
+ options: Map[String, String],
+ hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
+ SimpleTextRelation.lastHadoopConf = Option(hadoopConf)
SimpleTextRelation.requiredColumns = requiredSchema.fieldNames
SimpleTextRelation.pushedFilters = filters.toSet
@@ -74,9 +78,8 @@ class SimpleTextSource extends FileFormat with DataSourceRegister {
inputAttributes.find(_.name == field.name)
}
- val conf = new Configuration(sparkSession.sessionState.hadoopConf)
- val broadcastedConf =
- sparkSession.sparkContext.broadcast(new SerializableConfiguration(conf))
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
(file: PartitionedFile) => {
val predicate = {
@@ -95,7 +98,7 @@ class SimpleTextSource extends FileFormat with DataSourceRegister {
val projection = new InterpretedProjection(outputAttributes, inputAttributes)
val unsafeRowIterator =
- new HadoopFileLinesReader(file, broadcastedConf.value.value).map { line =>
+ new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map { line =>
val record = line.toString
new GenericInternalRow(record.split(",", -1).zip(fieldTypes).map {
case (v, dataType) =>
@@ -164,4 +167,7 @@ object SimpleTextRelation {
// Used to test failure callback
var callbackCalled = false
+
+ // Used by the test case to check the value propagated in the hadoop confs.
+ var lastHadoopConf: Option[Configuration] = None
}