diff options
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala | 92 |
1 files changed, 45 insertions, 47 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala index 8c819f1a48..9502b835a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.util.{Map => JMap} -import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter, mapAsScalaMapConverter} +import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.api.ReadSupport.ReadContext @@ -29,34 +29,62 @@ import org.apache.parquet.schema.Type.Repetition import org.apache.parquet.schema._ import org.apache.spark.Logging +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types._ +/** + * A Parquet [[ReadSupport]] implementation for reading Parquet records as Catalyst + * [[InternalRow]]s. + * + * The API interface of [[ReadSupport]] is a little bit over complicated because of historical + * reasons. In older versions of parquet-mr (say 1.6.0rc3 and prior), [[ReadSupport]] need to be + * instantiated and initialized twice on both driver side and executor side. The [[init()]] method + * is for driver side initialization, while [[prepareForRead()]] is for executor side. However, + * starting from parquet-mr 1.6.0, it's no longer the case, and [[ReadSupport]] is only instantiated + * and initialized on executor side. So, theoretically, now it's totally fine to combine these two + * methods into a single initialization method. The only reason (I could think of) to still have + * them here is for parquet-mr API backwards-compatibility. + * + * Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]] + * to [[prepareForRead()]], but use a private `var` for simplicity. + */ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging { - // Called after `init()` when initializing Parquet record reader. + private var catalystRequestedSchema: StructType = _ + + /** + * Called on executor side before [[prepareForRead()]] and instantiating actual Parquet record + * readers. Responsible for figuring out Parquet requested schema used for column pruning. + */ + override def init(context: InitContext): ReadContext = { + catalystRequestedSchema = { + // scalastyle:off jobcontext + val conf = context.getConfiguration + // scalastyle:on jobcontext + val schemaString = conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA) + assert(schemaString != null, "Parquet requested schema not set.") + StructType.fromString(schemaString) + } + + val parquetRequestedSchema = + CatalystReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) + + new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava) + } + + /** + * Called on executor side after [[init()]], before instantiating actual Parquet record readers. + * Responsible for instantiating [[RecordMaterializer]], which is used for converting Parquet + * records to Catalyst [[InternalRow]]s. + */ override def prepareForRead( conf: Configuration, keyValueMetaData: JMap[String, String], fileSchema: MessageType, readContext: ReadContext): RecordMaterializer[InternalRow] = { log.debug(s"Preparing for read Parquet file with message type: $fileSchema") - - val toCatalyst = new CatalystSchemaConverter(conf) val parquetRequestedSchema = readContext.getRequestedSchema - val catalystRequestedSchema = - Option(readContext.getReadSupportMetadata).map(_.asScala).flatMap { metadata => - metadata - // First tries to read requested schema, which may result from projections - .get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA) - // If not available, tries to read Catalyst schema from file metadata. It's only - // available if the target file is written by Spark SQL. - .orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY)) - }.map(StructType.fromString).getOrElse { - logInfo("Catalyst schema not available, falling back to Parquet schema") - toCatalyst.convert(parquetRequestedSchema) - } - logInfo { s"""Going to read the following fields from the Parquet file: | @@ -69,36 +97,6 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema) } - - // Called before `prepareForRead()` when initializing Parquet record reader. - override def init(context: InitContext): ReadContext = { - val conf = { - // scalastyle:off jobcontext - context.getConfiguration - // scalastyle:on jobcontext - } - - // If the target file was written by Spark SQL, we should be able to find a serialized Catalyst - // schema of this file from its metadata. - val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA)) - - // Optional schema of requested columns, in the form of a string serialized from a Catalyst - // `StructType` containing all requested columns. - val maybeRequestedSchema = Option(conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)) - - val parquetRequestedSchema = - maybeRequestedSchema.fold(context.getFileSchema) { schemaString => - val catalystRequestedSchema = StructType.fromString(schemaString) - CatalystReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) - } - - val metadata = - Map.empty[String, String] ++ - maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++ - maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _) - - new ReadContext(parquetRequestedSchema, metadata.asJava) - } } private[parquet] object CatalystReadSupport { |