aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala23
1 files changed, 12 insertions, 11 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index b156581564..5be8770790 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -262,13 +262,13 @@ private[sql] class DefaultSource
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
- options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
- val parquetConf = new Configuration(sparkSession.sessionState.hadoopConf)
- parquetConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
- parquetConf.set(
+ options: Map[String, String],
+ hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+ hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
+ hadoopConf.set(
CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
CatalystSchemaConverter.checkFieldNames(requiredSchema).json)
- parquetConf.set(
+ hadoopConf.set(
CatalystWriteSupport.SPARK_ROW_SCHEMA,
CatalystSchemaConverter.checkFieldNames(requiredSchema).json)
@@ -276,13 +276,13 @@ private[sql] class DefaultSource
// This metadata is only useful for detecting optional columns when pushdowning filters.
val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
requiredSchema).asInstanceOf[StructType]
- CatalystWriteSupport.setSchema(dataSchemaToWrite, parquetConf)
+ CatalystWriteSupport.setSchema(dataSchemaToWrite, hadoopConf)
// Sets flags for `CatalystSchemaConverter`
- parquetConf.setBoolean(
+ hadoopConf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
sparkSession.getConf(SQLConf.PARQUET_BINARY_AS_STRING))
- parquetConf.setBoolean(
+ hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP))
@@ -298,8 +298,8 @@ private[sql] class DefaultSource
None
}
- val broadcastedConf =
- sparkSession.sparkContext.broadcast(new SerializableConfiguration(parquetConf))
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
// TODO: if you move this into the closure it reverts to the default values.
// If true, enable using the custom RecordReader for parquet. This only works for
@@ -327,7 +327,8 @@ private[sql] class DefaultSource
null)
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
- val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedConf.value.value, attemptId)
+ val hadoopAttemptContext =
+ new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
val parquetReader = if (enableVectorizedReader) {
val vectorizedReader = new VectorizedParquetRecordReader()