diff options
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala | 9 |
1 files changed, 5 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala index fb047ff867..8ca105d923 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala @@ -99,16 +99,17 @@ class DefaultSource extends FileFormat with DataSourceRegister { partitionSchema: StructType, requiredSchema: StructType, filters: Seq[Filter], - options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = { + options: Map[String, String], + hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { val csvOptions = new CSVOptions(options) val headers = requiredSchema.fields.map(_.name) - val conf = new Configuration(sparkSession.sessionState.hadoopConf) - val broadcastedConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(conf)) + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) (file: PartitionedFile) => { val lineIterator = { - val conf = broadcastedConf.value.value + val conf = broadcastedHadoopConf.value.value new HadoopFileLinesReader(file, conf).map { line => new String(line.getBytes, 0, line.getLength, csvOptions.charset) } |