aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-03-21 11:18:45 +0800
committerCheng Lian <lian@databricks.com>2015-03-21 11:18:45 +0800
commit937c1e5503963e67a5412be993d30dbec6fc9883 (patch)
tree4c15793c4045c44720d004b2a36aa3180e7adefd /sql
parentbc37c9743e065a0c756363c7b70e88f22a6e6edd (diff)
downloadspark-937c1e5503963e67a5412be993d30dbec6fc9883.tar.gz
spark-937c1e5503963e67a5412be993d30dbec6fc9883.tar.bz2
spark-937c1e5503963e67a5412be993d30dbec6fc9883.zip
[SPARK-6315] [SQL] Also tries the case class string parser while reading Parquet schema
When writing Parquet files, Spark 1.1.x persists the schema string into Parquet metadata with the result of `StructType.toString`, which was then deprecated in Spark 1.2 by a schema string in JSON format. But we still need to take the old schema format into account while reading Parquet files. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5034) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #5034 from liancheng/spark-6315 and squashes the following commits: a182f58 [Cheng Lian] Adds a regression test b9c6dbe [Cheng Lian] Also tries the case class string parser while reading Parquet schema
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala23
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala42
2 files changed, 60 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index fbe7a419fe..410600b052 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -681,7 +681,7 @@ private[sql] case class ParquetRelation2(
}
}
-private[sql] object ParquetRelation2 {
+private[sql] object ParquetRelation2 extends Logging {
// Whether we should merge schemas collected from all Parquet part-files.
val MERGE_SCHEMA = "mergeSchema"
@@ -701,7 +701,26 @@ private[sql] object ParquetRelation2 {
.getKeyValueMetaData
.toMap
.get(RowReadSupport.SPARK_METADATA_KEY)
- .map(DataType.fromJson(_).asInstanceOf[StructType])
+ .flatMap { serializedSchema =>
+ // Don't throw even if we failed to parse the serialized Spark schema. Just fallback to
+ // whatever is available.
+ Try(DataType.fromJson(serializedSchema))
+ .recover { case _: Throwable =>
+ logInfo(
+ s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
+ "falling back to the deprecated DataType.fromCaseClassString parser.")
+ DataType.fromCaseClassString(serializedSchema)
+ }
+ .recover { case cause: Throwable =>
+ logWarning(
+ s"""Failed to parse serialized Spark schema in Parquet key-value metadata:
+ |\t$serializedSchema
+ """.stripMargin,
+ cause)
+ }
+ .map(_.asInstanceOf[StructType])
+ .toOption
+ }
maybeSparkSchema.getOrElse {
// Falls back to Parquet schema if Spark SQL schema is absent.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index a70b3c7ce4..5438095add 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -28,8 +28,8 @@ import parquet.example.data.simple.SimpleGroup
import parquet.example.data.{Group, GroupWriter}
import parquet.hadoop.api.WriteSupport
import parquet.hadoop.api.WriteSupport.WriteContext
-import parquet.hadoop.metadata.CompressionCodecName
-import parquet.hadoop.{ParquetFileWriter, ParquetWriter}
+import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData, CompressionCodecName}
+import parquet.hadoop.{Footer, ParquetFileWriter, ParquetWriter}
import parquet.io.api.RecordConsumer
import parquet.schema.{MessageType, MessageTypeParser}
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._
import org.apache.spark.sql.test.TestSQLContext.implicits._
-import org.apache.spark.sql.types.DecimalType
+import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, QueryTest, SQLConf, SaveMode}
// Write support class for nested groups: ParquetWriter initializes GroupWriteSupport
@@ -330,6 +330,42 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
}
}
+ test("SPARK-6315 regression test") {
+ // Spark 1.1 and prior versions write Spark schema as case class string into Parquet metadata.
+ // This has been deprecated by JSON format since 1.2. Notice that, 1.3 further refactored data
+ // types API, and made StructType.fields an array. This makes the result of StructType.toString
+ // different from prior versions: there's no "Seq" wrapping the fields part in the string now.
+ val sparkSchema =
+ "StructType(Seq(StructField(a,BooleanType,false),StructField(b,IntegerType,false)))"
+
+ // The Parquet schema is intentionally made different from the Spark schema. Because the new
+ // Parquet data source simply falls back to the Parquet schema once it fails to parse the Spark
+ // schema. By making these two different, we are able to assert the old style case class string
+ // is parsed successfully.
+ val parquetSchema = MessageTypeParser.parseMessageType(
+ """message root {
+ | required int32 c;
+ |}
+ """.stripMargin)
+
+ withTempPath { location =>
+ val extraMetadata = Map(RowReadSupport.SPARK_METADATA_KEY -> sparkSchema.toString)
+ val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, "Spark")
+ val path = new Path(location.getCanonicalPath)
+
+ ParquetFileWriter.writeMetadataFile(
+ sparkContext.hadoopConfiguration,
+ path,
+ new Footer(path, new ParquetMetadata(fileMetadata, Nil)) :: Nil)
+
+ assertResult(parquetFile(path.toString).schema) {
+ StructType(
+ StructField("a", BooleanType, nullable = false) ::
+ StructField("b", IntegerType, nullable = false) ::
+ Nil)
+ }
+ }
+ }
}
class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterAll {