From 6722aca809ddc28aa20abf3bbb2e0de8629a9903 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 8 Jul 2015 10:09:50 -0700 Subject: [SPARK-8785] [SQL] Improve Parquet schema merging JIRA: https://issues.apache.org/jira/browse/SPARK-8785 Currently, the parquet schema merging (`ParquetRelation2.readSchema`) may spend much time to merge duplicate schema. We can select only non duplicate schema and merge them later. Author: Liang-Chi Hsieh Author: Liang-Chi Hsieh Closes #7182 from viirya/improve_parquet_merging and squashes the following commits: 5cf934f [Liang-Chi Hsieh] Refactor it to make it faster. f3411ea [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into improve_parquet_merging a63c3ff [Liang-Chi Hsieh] Improve Parquet schema merging. --- .../org/apache/spark/sql/parquet/newParquet.scala | 82 +++++++++++++--------- 1 file changed, 48 insertions(+), 34 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 6bc69c6ad0..ce456e7fbe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -21,6 +21,7 @@ import java.net.URI import java.util.{List => JList} import scala.collection.JavaConversions._ +import scala.collection.mutable import scala.util.Try import com.google.common.base.Objects @@ -30,8 +31,9 @@ import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.hadoop._ -import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.apache.parquet.hadoop.metadata.{FileMetaData, CompressionCodecName} import org.apache.parquet.hadoop.util.ContextUtil +import org.apache.parquet.schema.MessageType import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil @@ -508,44 +510,56 @@ private[sql] object ParquetRelation2 extends Logging { private[parquet] def readSchema( footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = { - footers.map { footer => + + def parseParquetSchema(schema: MessageType): StructType = { + StructType.fromAttributes( + // TODO Really no need to use `Attribute` here, we only need to know the data type. + ParquetTypesConverter.convertToAttributes( + schema, + sqlContext.conf.isParquetBinaryAsString, + sqlContext.conf.isParquetINT96AsTimestamp)) + } + + val seen = mutable.HashSet[String]() + val finalSchemas: Seq[StructType] = footers.flatMap { footer => val metadata = footer.getParquetMetadata.getFileMetaData - val parquetSchema = metadata.getSchema - val maybeSparkSchema = metadata + val serializedSchema = metadata .getKeyValueMetaData .toMap .get(RowReadSupport.SPARK_METADATA_KEY) - .flatMap { serializedSchema => - // Don't throw even if we failed to parse the serialized Spark schema. Just fallback to - // whatever is available. - Try(DataType.fromJson(serializedSchema)) - .recover { case _: Throwable => - logInfo( - s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " + - "falling back to the deprecated DataType.fromCaseClassString parser.") - DataType.fromCaseClassString(serializedSchema) - } - .recover { case cause: Throwable => - logWarning( - s"""Failed to parse serialized Spark schema in Parquet key-value metadata: - |\t$serializedSchema - """.stripMargin, - cause) - } - .map(_.asInstanceOf[StructType]) - .toOption - } - - maybeSparkSchema.getOrElse { - // Falls back to Parquet schema if Spark SQL schema is absent. - StructType.fromAttributes( - // TODO Really no need to use `Attribute` here, we only need to know the data type. - ParquetTypesConverter.convertToAttributes( - parquetSchema, - sqlContext.conf.isParquetBinaryAsString, - sqlContext.conf.isParquetINT96AsTimestamp)) + if (serializedSchema == None) { + // Falls back to Parquet schema if no Spark SQL schema found. + Some(parseParquetSchema(metadata.getSchema)) + } else if (!seen.contains(serializedSchema.get)) { + seen += serializedSchema.get + + // Don't throw even if we failed to parse the serialized Spark schema. Just fallback to + // whatever is available. + Some(Try(DataType.fromJson(serializedSchema.get)) + .recover { case _: Throwable => + logInfo( + s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " + + "falling back to the deprecated DataType.fromCaseClassString parser.") + DataType.fromCaseClassString(serializedSchema.get) + } + .recover { case cause: Throwable => + logWarning( + s"""Failed to parse serialized Spark schema in Parquet key-value metadata: + |\t$serializedSchema + """.stripMargin, + cause) + } + .map(_.asInstanceOf[StructType]) + .getOrElse { + // Falls back to Parquet schema if Spark SQL schema can't be parsed. + parseParquetSchema(metadata.getSchema) + }) + } else { + None } - }.reduceOption { (left, right) => + } + + finalSchemas.reduceOption { (left, right) => try left.merge(right) catch { case e: Throwable => throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e) } -- cgit v1.2.3