diff options
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala | 23 |
1 files changed, 12 insertions, 11 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index b156581564..5be8770790 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -262,13 +262,13 @@ private[sql] class DefaultSource partitionSchema: StructType, requiredSchema: StructType, filters: Seq[Filter], - options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = { - val parquetConf = new Configuration(sparkSession.sessionState.hadoopConf) - parquetConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName) - parquetConf.set( + options: Map[String, String], + hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { + hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName) + hadoopConf.set( CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, CatalystSchemaConverter.checkFieldNames(requiredSchema).json) - parquetConf.set( + hadoopConf.set( CatalystWriteSupport.SPARK_ROW_SCHEMA, CatalystSchemaConverter.checkFieldNames(requiredSchema).json) @@ -276,13 +276,13 @@ private[sql] class DefaultSource // This metadata is only useful for detecting optional columns when pushdowning filters. val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField, requiredSchema).asInstanceOf[StructType] - CatalystWriteSupport.setSchema(dataSchemaToWrite, parquetConf) + CatalystWriteSupport.setSchema(dataSchemaToWrite, hadoopConf) // Sets flags for `CatalystSchemaConverter` - parquetConf.setBoolean( + hadoopConf.setBoolean( SQLConf.PARQUET_BINARY_AS_STRING.key, sparkSession.getConf(SQLConf.PARQUET_BINARY_AS_STRING)) - parquetConf.setBoolean( + hadoopConf.setBoolean( SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, sparkSession.getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP)) @@ -298,8 +298,8 @@ private[sql] class DefaultSource None } - val broadcastedConf = - sparkSession.sparkContext.broadcast(new SerializableConfiguration(parquetConf)) + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) // TODO: if you move this into the closure it reverts to the default values. // If true, enable using the custom RecordReader for parquet. This only works for @@ -327,7 +327,8 @@ private[sql] class DefaultSource null) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) - val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedConf.value.value, attemptId) + val hadoopAttemptContext = + new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId) val parquetReader = if (enableVectorizedReader) { val vectorizedReader = new VectorizedParquetRecordReader() |