aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-09-28 13:53:45 -0700
committerDavies Liu <davies.liu@gmail.com>2015-09-28 13:53:45 -0700
commit14978b785a43e0c13c8bdfd52d20cc8984984ba3 (patch)
tree64ea81d949967637a171ac564d3ed479c6ea771f /sql
parent353c30bd7dfbd3b76fc8bc9a6dfab9321439a34b (diff)
downloadspark-14978b785a43e0c13c8bdfd52d20cc8984984ba3.tar.gz
spark-14978b785a43e0c13c8bdfd52d20cc8984984ba3.tar.bz2
spark-14978b785a43e0c13c8bdfd52d20cc8984984ba3.zip
[SPARK-10395] [SQL] Simplifies CatalystReadSupport
Please refer to [SPARK-10395] [1] for details. [1]: https://issues.apache.org/jira/browse/SPARK-10395 Author: Cheng Lian <lian@databricks.com> Closes #8553 from liancheng/spark-10395/simplify-parquet-read-support.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala92
1 files changed, 45 insertions, 47 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
index 8c819f1a48..9502b835a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.util.{Map => JMap}
-import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter, mapAsScalaMapConverter}
+import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
@@ -29,34 +29,62 @@ import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema._
import org.apache.spark.Logging
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types._
+/**
+ * A Parquet [[ReadSupport]] implementation for reading Parquet records as Catalyst
+ * [[InternalRow]]s.
+ *
+ * The API interface of [[ReadSupport]] is a little bit over complicated because of historical
+ * reasons. In older versions of parquet-mr (say 1.6.0rc3 and prior), [[ReadSupport]] need to be
+ * instantiated and initialized twice on both driver side and executor side. The [[init()]] method
+ * is for driver side initialization, while [[prepareForRead()]] is for executor side. However,
+ * starting from parquet-mr 1.6.0, it's no longer the case, and [[ReadSupport]] is only instantiated
+ * and initialized on executor side. So, theoretically, now it's totally fine to combine these two
+ * methods into a single initialization method. The only reason (I could think of) to still have
+ * them here is for parquet-mr API backwards-compatibility.
+ *
+ * Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]]
+ * to [[prepareForRead()]], but use a private `var` for simplicity.
+ */
private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging {
- // Called after `init()` when initializing Parquet record reader.
+ private var catalystRequestedSchema: StructType = _
+
+ /**
+ * Called on executor side before [[prepareForRead()]] and instantiating actual Parquet record
+ * readers. Responsible for figuring out Parquet requested schema used for column pruning.
+ */
+ override def init(context: InitContext): ReadContext = {
+ catalystRequestedSchema = {
+ // scalastyle:off jobcontext
+ val conf = context.getConfiguration
+ // scalastyle:on jobcontext
+ val schemaString = conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
+ assert(schemaString != null, "Parquet requested schema not set.")
+ StructType.fromString(schemaString)
+ }
+
+ val parquetRequestedSchema =
+ CatalystReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema)
+
+ new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
+ }
+
+ /**
+ * Called on executor side after [[init()]], before instantiating actual Parquet record readers.
+ * Responsible for instantiating [[RecordMaterializer]], which is used for converting Parquet
+ * records to Catalyst [[InternalRow]]s.
+ */
override def prepareForRead(
conf: Configuration,
keyValueMetaData: JMap[String, String],
fileSchema: MessageType,
readContext: ReadContext): RecordMaterializer[InternalRow] = {
log.debug(s"Preparing for read Parquet file with message type: $fileSchema")
-
- val toCatalyst = new CatalystSchemaConverter(conf)
val parquetRequestedSchema = readContext.getRequestedSchema
- val catalystRequestedSchema =
- Option(readContext.getReadSupportMetadata).map(_.asScala).flatMap { metadata =>
- metadata
- // First tries to read requested schema, which may result from projections
- .get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
- // If not available, tries to read Catalyst schema from file metadata. It's only
- // available if the target file is written by Spark SQL.
- .orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY))
- }.map(StructType.fromString).getOrElse {
- logInfo("Catalyst schema not available, falling back to Parquet schema")
- toCatalyst.convert(parquetRequestedSchema)
- }
-
logInfo {
s"""Going to read the following fields from the Parquet file:
|
@@ -69,36 +97,6 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema)
}
-
- // Called before `prepareForRead()` when initializing Parquet record reader.
- override def init(context: InitContext): ReadContext = {
- val conf = {
- // scalastyle:off jobcontext
- context.getConfiguration
- // scalastyle:on jobcontext
- }
-
- // If the target file was written by Spark SQL, we should be able to find a serialized Catalyst
- // schema of this file from its metadata.
- val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA))
-
- // Optional schema of requested columns, in the form of a string serialized from a Catalyst
- // `StructType` containing all requested columns.
- val maybeRequestedSchema = Option(conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA))
-
- val parquetRequestedSchema =
- maybeRequestedSchema.fold(context.getFileSchema) { schemaString =>
- val catalystRequestedSchema = StructType.fromString(schemaString)
- CatalystReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema)
- }
-
- val metadata =
- Map.empty[String, String] ++
- maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++
- maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _)
-
- new ReadContext(parquetRequestedSchema, metadata.asJava)
- }
}
private[parquet] object CatalystReadSupport {