aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-01-30 23:02:49 -0800
committerReynold Xin <rxin@databricks.com>2016-01-30 23:02:49 -0800
commita1303de0a0e9d0c80327977abf52a79e2aa95e1f (patch)
tree4e4af212fd658a4f91ba8205d631d26462bb1775 /sql
parentde283719980ae78b740e507e4d70c7ebbf6c5f74 (diff)
downloadspark-a1303de0a0e9d0c80327977abf52a79e2aa95e1f.tar.gz
spark-a1303de0a0e9d0c80327977abf52a79e2aa95e1f.tar.bz2
spark-a1303de0a0e9d0c80327977abf52a79e2aa95e1f.zip
[SPARK-13070][SQL] Better error message when Parquet schema merging fails
Make sure we throw better error messages when Parquet schema merging fails. Author: Cheng Lian <lian@databricks.com> Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #10979 from viirya/schema-merging-failure-message.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala33
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala15
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala30
4 files changed, 77 insertions, 7 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index da0c92864e..c9e7e7fe63 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -424,13 +424,13 @@ object StructType extends AbstractDataType {
if ((leftPrecision == rightPrecision) && (leftScale == rightScale)) {
DecimalType(leftPrecision, leftScale)
} else if ((leftPrecision != rightPrecision) && (leftScale != rightScale)) {
- throw new SparkException("Failed to merge Decimal Tpes with incompatible " +
+ throw new SparkException("Failed to merge decimal types with incompatible " +
s"precision $leftPrecision and $rightPrecision & scale $leftScale and $rightScale")
} else if (leftPrecision != rightPrecision) {
- throw new SparkException("Failed to merge Decimal Tpes with incompatible " +
+ throw new SparkException("Failed to merge decimal types with incompatible " +
s"precision $leftPrecision and $rightPrecision")
} else {
- throw new SparkException("Failed to merge Decimal Tpes with incompatible " +
+ throw new SparkException("Failed to merge decimal types with incompatible " +
s"scala $leftScale and $rightScale")
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index f87590095d..1e686d41f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -800,12 +800,37 @@ private[sql] object ParquetRelation extends Logging {
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
writeLegacyParquetFormat = writeLegacyParquetFormat)
- footers.map { footer =>
- ParquetRelation.readSchemaFromFooter(footer, converter)
- }.reduceLeftOption(_ merge _).iterator
+ if (footers.isEmpty) {
+ Iterator.empty
+ } else {
+ var mergedSchema = ParquetRelation.readSchemaFromFooter(footers.head, converter)
+ footers.tail.foreach { footer =>
+ val schema = ParquetRelation.readSchemaFromFooter(footer, converter)
+ try {
+ mergedSchema = mergedSchema.merge(schema)
+ } catch { case cause: SparkException =>
+ throw new SparkException(
+ s"Failed merging schema of file ${footer.getFile}:\n${schema.treeString}", cause)
+ }
+ }
+ Iterator.single(mergedSchema)
+ }
}.collect()
- partiallyMergedSchemas.reduceLeftOption(_ merge _)
+ if (partiallyMergedSchemas.isEmpty) {
+ None
+ } else {
+ var finalSchema = partiallyMergedSchemas.head
+ partiallyMergedSchemas.tail.foreach { schema =>
+ try {
+ finalSchema = finalSchema.merge(schema)
+ } catch { case cause: SparkException =>
+ throw new SparkException(
+ s"Failed merging schema:\n${schema.treeString}", cause)
+ }
+ }
+ Some(finalSchema)
+ }
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 1796b3af0e..3ded32c450 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -421,6 +421,21 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
// We will remove the temporary metadata when writing Parquet file.
val forPathSix = sqlContext.read.parquet(pathSix).schema
assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
+
+ // sanity test: make sure optional metadata field is not wrongly set.
+ val pathSeven = s"${dir.getCanonicalPath}/table7"
+ (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven)
+ val pathEight = s"${dir.getCanonicalPath}/table8"
+ (4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight)
+
+ val df2 = sqlContext.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a", "b")
+ checkAnswer(
+ df2,
+ Row(1, "1"))
+
+ // The fields "a" and "b" exist in both two Parquet files. No metadata is set.
+ assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField))
+ assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField))
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 60fa81b1ab..d860651d42 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -22,6 +22,7 @@ import scala.reflect.runtime.universe.TypeTag
import org.apache.parquet.schema.MessageTypeParser
+import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
@@ -449,6 +450,35 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
}.getMessage.contains("detected conflicting schemas"))
}
+ test("schema merging failure error message") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ sqlContext.range(3).write.parquet(s"$path/p=1")
+ sqlContext.range(3).selectExpr("CAST(id AS INT) AS id").write.parquet(s"$path/p=2")
+
+ val message = intercept[SparkException] {
+ sqlContext.read.option("mergeSchema", "true").parquet(path).schema
+ }.getMessage
+
+ assert(message.contains("Failed merging schema of file"))
+ }
+
+ // test for second merging (after read Parquet schema in parallel done)
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ sqlContext.range(3).write.parquet(s"$path/p=1")
+ sqlContext.range(3).selectExpr("CAST(id AS INT) AS id").write.parquet(s"$path/p=2")
+
+ sqlContext.sparkContext.conf.set("spark.default.parallelism", "20")
+
+ val message = intercept[SparkException] {
+ sqlContext.read.option("mergeSchema", "true").parquet(path).schema
+ }.getMessage
+
+ assert(message.contains("Failed merging schema:"))
+ }
+ }
+
// =======================================================
// Tests for converting Parquet LIST to Catalyst ArrayType
// =======================================================