aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala9
1 files changed, 5 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
index fb047ff867..8ca105d923 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
@@ -99,16 +99,17 @@ class DefaultSource extends FileFormat with DataSourceRegister {
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
- options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
+ options: Map[String, String],
+ hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
val csvOptions = new CSVOptions(options)
val headers = requiredSchema.fields.map(_.name)
- val conf = new Configuration(sparkSession.sessionState.hadoopConf)
- val broadcastedConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(conf))
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
(file: PartitionedFile) => {
val lineIterator = {
- val conf = broadcastedConf.value.value
+ val conf = broadcastedHadoopConf.value.value
new HadoopFileLinesReader(file, conf).map { line =>
new String(line.getBytes, 0, line.getLength, csvOptions.charset)
}