aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@gmail.com>2015-07-08 10:09:50 -0700
committerCheng Lian <lian@databricks.com>2015-07-08 10:09:50 -0700
commit6722aca809ddc28aa20abf3bbb2e0de8629a9903 (patch)
tree9e11f3a4da4feebee391bfcaa211536bd7ba6be9
parentbf02e377168f39459d5c216e939097ae5705f573 (diff)
downloadspark-6722aca809ddc28aa20abf3bbb2e0de8629a9903.tar.gz
spark-6722aca809ddc28aa20abf3bbb2e0de8629a9903.tar.bz2
spark-6722aca809ddc28aa20abf3bbb2e0de8629a9903.zip
[SPARK-8785] [SQL] Improve Parquet schema merging
JIRA: https://issues.apache.org/jira/browse/SPARK-8785 Currently, the parquet schema merging (`ParquetRelation2.readSchema`) may spend much time to merge duplicate schema. We can select only non duplicate schema and merge them later. Author: Liang-Chi Hsieh <viirya@gmail.com> Author: Liang-Chi Hsieh <viirya@appier.com> Closes #7182 from viirya/improve_parquet_merging and squashes the following commits: 5cf934f [Liang-Chi Hsieh] Refactor it to make it faster. f3411ea [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into improve_parquet_merging a63c3ff [Liang-Chi Hsieh] Improve Parquet schema merging.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala82
1 files changed, 48 insertions, 34 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 6bc69c6ad0..ce456e7fbe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -21,6 +21,7 @@ import java.net.URI
import java.util.{List => JList}
import scala.collection.JavaConversions._
+import scala.collection.mutable
import scala.util.Try
import com.google.common.base.Objects
@@ -30,8 +31,9 @@ import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.hadoop._
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.apache.parquet.hadoop.metadata.{FileMetaData, CompressionCodecName}
import org.apache.parquet.hadoop.util.ContextUtil
+import org.apache.parquet.schema.MessageType
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
@@ -508,44 +510,56 @@ private[sql] object ParquetRelation2 extends Logging {
private[parquet] def readSchema(
footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
- footers.map { footer =>
+
+ def parseParquetSchema(schema: MessageType): StructType = {
+ StructType.fromAttributes(
+ // TODO Really no need to use `Attribute` here, we only need to know the data type.
+ ParquetTypesConverter.convertToAttributes(
+ schema,
+ sqlContext.conf.isParquetBinaryAsString,
+ sqlContext.conf.isParquetINT96AsTimestamp))
+ }
+
+ val seen = mutable.HashSet[String]()
+ val finalSchemas: Seq[StructType] = footers.flatMap { footer =>
val metadata = footer.getParquetMetadata.getFileMetaData
- val parquetSchema = metadata.getSchema
- val maybeSparkSchema = metadata
+ val serializedSchema = metadata
.getKeyValueMetaData
.toMap
.get(RowReadSupport.SPARK_METADATA_KEY)
- .flatMap { serializedSchema =>
- // Don't throw even if we failed to parse the serialized Spark schema. Just fallback to
- // whatever is available.
- Try(DataType.fromJson(serializedSchema))
- .recover { case _: Throwable =>
- logInfo(
- s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
- "falling back to the deprecated DataType.fromCaseClassString parser.")
- DataType.fromCaseClassString(serializedSchema)
- }
- .recover { case cause: Throwable =>
- logWarning(
- s"""Failed to parse serialized Spark schema in Parquet key-value metadata:
- |\t$serializedSchema
- """.stripMargin,
- cause)
- }
- .map(_.asInstanceOf[StructType])
- .toOption
- }
-
- maybeSparkSchema.getOrElse {
- // Falls back to Parquet schema if Spark SQL schema is absent.
- StructType.fromAttributes(
- // TODO Really no need to use `Attribute` here, we only need to know the data type.
- ParquetTypesConverter.convertToAttributes(
- parquetSchema,
- sqlContext.conf.isParquetBinaryAsString,
- sqlContext.conf.isParquetINT96AsTimestamp))
+ if (serializedSchema == None) {
+ // Falls back to Parquet schema if no Spark SQL schema found.
+ Some(parseParquetSchema(metadata.getSchema))
+ } else if (!seen.contains(serializedSchema.get)) {
+ seen += serializedSchema.get
+
+ // Don't throw even if we failed to parse the serialized Spark schema. Just fallback to
+ // whatever is available.
+ Some(Try(DataType.fromJson(serializedSchema.get))
+ .recover { case _: Throwable =>
+ logInfo(
+ s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
+ "falling back to the deprecated DataType.fromCaseClassString parser.")
+ DataType.fromCaseClassString(serializedSchema.get)
+ }
+ .recover { case cause: Throwable =>
+ logWarning(
+ s"""Failed to parse serialized Spark schema in Parquet key-value metadata:
+ |\t$serializedSchema
+ """.stripMargin,
+ cause)
+ }
+ .map(_.asInstanceOf[StructType])
+ .getOrElse {
+ // Falls back to Parquet schema if Spark SQL schema can't be parsed.
+ parseParquetSchema(metadata.getSchema)
+ })
+ } else {
+ None
}
- }.reduceOption { (left, right) =>
+ }
+
+ finalSchemas.reduceOption { (left, right) =>
try left.merge(right) catch { case e: Throwable =>
throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
}