aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRyan Blue <blue@apache.org>2016-05-27 16:59:38 -0700
committerCheng Lian <lian@databricks.com>2016-05-27 16:59:38 -0700
commit776d183c82b424ef7c3cae30537d8afe9b9eee83 (patch)
tree86fc982928fd18f2396072b00ade94dbd917996b
parent019afd9c78a9f40e1d07f0a74868010206e90ed5 (diff)
downloadspark-776d183c82b424ef7c3cae30537d8afe9b9eee83.tar.gz
spark-776d183c82b424ef7c3cae30537d8afe9b9eee83.tar.bz2
spark-776d183c82b424ef7c3cae30537d8afe9b9eee83.zip
[SPARK-9876][SQL] Update Parquet to 1.8.1.
## What changes were proposed in this pull request? This includes minimal changes to get Spark using the current release of Parquet, 1.8.1. ## How was this patch tested? This uses the existing Parquet tests. Author: Ryan Blue <blue@apache.org> Closes #13280 from rdblue/SPARK-9876-update-parquet.
-rw-r--r--dev/deps/spark-deps-hadoop-2.211
-rw-r--r--dev/deps/spark-deps-hadoop-2.311
-rw-r--r--dev/deps/spark-deps-hadoop-2.411
-rw-r--r--dev/deps/spark-deps-hadoop-2.611
-rw-r--r--dev/deps/spark-deps-hadoop-2.711
-rw-r--r--pom.xml2
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala83
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala20
11 files changed, 91 insertions, 117 deletions
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index 578691cc93..deec033c21 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -129,14 +129,13 @@ opencsv-2.3.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.8.jar
-parquet-column-1.7.0.jar
-parquet-common-1.7.0.jar
-parquet-encoding-1.7.0.jar
+parquet-column-1.8.1.jar
+parquet-common-1.8.1.jar
+parquet-encoding-1.8.1.jar
parquet-format-2.3.0-incubating.jar
-parquet-generator-1.7.0.jar
-parquet-hadoop-1.7.0.jar
+parquet-hadoop-1.8.1.jar
parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.7.0.jar
+parquet-jackson-1.8.1.jar
pmml-model-1.2.15.jar
pmml-schema-1.2.15.jar
protobuf-java-2.5.0.jar
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index fc6306f366..43c7dd3580 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -136,14 +136,13 @@ opencsv-2.3.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.8.jar
-parquet-column-1.7.0.jar
-parquet-common-1.7.0.jar
-parquet-encoding-1.7.0.jar
+parquet-column-1.8.1.jar
+parquet-common-1.8.1.jar
+parquet-encoding-1.8.1.jar
parquet-format-2.3.0-incubating.jar
-parquet-generator-1.7.0.jar
-parquet-hadoop-1.7.0.jar
+parquet-hadoop-1.8.1.jar
parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.7.0.jar
+parquet-jackson-1.8.1.jar
pmml-model-1.2.15.jar
pmml-schema-1.2.15.jar
protobuf-java-2.5.0.jar
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index dee1417c79..7186b305a8 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -136,14 +136,13 @@ opencsv-2.3.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.8.jar
-parquet-column-1.7.0.jar
-parquet-common-1.7.0.jar
-parquet-encoding-1.7.0.jar
+parquet-column-1.8.1.jar
+parquet-common-1.8.1.jar
+parquet-encoding-1.8.1.jar
parquet-format-2.3.0-incubating.jar
-parquet-generator-1.7.0.jar
-parquet-hadoop-1.7.0.jar
+parquet-hadoop-1.8.1.jar
parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.7.0.jar
+parquet-jackson-1.8.1.jar
pmml-model-1.2.15.jar
pmml-schema-1.2.15.jar
protobuf-java-2.5.0.jar
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 9695661b9c..3e4ed74cc6 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -144,14 +144,13 @@ opencsv-2.3.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.8.jar
-parquet-column-1.7.0.jar
-parquet-common-1.7.0.jar
-parquet-encoding-1.7.0.jar
+parquet-column-1.8.1.jar
+parquet-common-1.8.1.jar
+parquet-encoding-1.8.1.jar
parquet-format-2.3.0-incubating.jar
-parquet-generator-1.7.0.jar
-parquet-hadoop-1.7.0.jar
+parquet-hadoop-1.8.1.jar
parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.7.0.jar
+parquet-jackson-1.8.1.jar
pmml-model-1.2.15.jar
pmml-schema-1.2.15.jar
protobuf-java-2.5.0.jar
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 18c136ed63..6b999538a3 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -145,14 +145,13 @@ opencsv-2.3.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.8.jar
-parquet-column-1.7.0.jar
-parquet-common-1.7.0.jar
-parquet-encoding-1.7.0.jar
+parquet-column-1.8.1.jar
+parquet-common-1.8.1.jar
+parquet-encoding-1.8.1.jar
parquet-format-2.3.0-incubating.jar
-parquet-generator-1.7.0.jar
-parquet-hadoop-1.7.0.jar
+parquet-hadoop-1.8.1.jar
parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.7.0.jar
+parquet-jackson-1.8.1.jar
pmml-model-1.2.15.jar
pmml-schema-1.2.15.jar
protobuf-java-2.5.0.jar
diff --git a/pom.xml b/pom.xml
index 3fa0eeb5f0..ce9aa9aa00 100644
--- a/pom.xml
+++ b/pom.xml
@@ -133,7 +133,7 @@
<!-- Version used for internal directory structure -->
<hive.version.short>1.2.1</hive.version.short>
<derby.version>10.11.1.1</derby.version>
- <parquet.version>1.7.0</parquet.version>
+ <parquet.version>1.8.1</parquet.version>
<hive.parquet.version>1.6.0</hive.parquet.version>
<jetty.version>9.2.16.v20160414</jetty.version>
<javaxservlet.version>3.1.0</javaxservlet.version>
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index cbe8f78164..3f7a872ff6 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -58,6 +58,8 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.ConfigurationUtil;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
import org.apache.spark.sql.types.StructType;
@@ -186,15 +188,19 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
if (columns == null) {
this.requestedSchema = fileSchema;
} else {
- Types.MessageTypeBuilder builder = Types.buildMessage();
- for (String s: columns) {
- if (!fileSchema.containsField(s)) {
- throw new IOException("Can only project existing columns. Unknown field: " + s +
- " File schema:\n" + fileSchema);
+ if (columns.size() > 0) {
+ Types.MessageTypeBuilder builder = Types.buildMessage();
+ for (String s: columns) {
+ if (!fileSchema.containsField(s)) {
+ throw new IOException("Can only project existing columns. Unknown field: " + s +
+ " File schema:\n" + fileSchema);
+ }
+ builder.addFields(fileSchema.getType(s));
}
- builder.addFields(fileSchema.getType(s));
+ this.requestedSchema = builder.named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME());
+ } else {
+ this.requestedSchema = CatalystSchemaConverter.EMPTY_MESSAGE();
}
- this.requestedSchema = builder.named("spark_schema");
}
this.sparkSchema = new CatalystSchemaConverter(config).convert(requestedSchema);
this.reader = new ParquetFileReader(config, file, blocks, requestedSchema.getColumns());
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
index 850e807b86..9c885b252f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
@@ -109,10 +109,14 @@ private[parquet] object CatalystReadSupport {
*/
def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = {
val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
- Types
- .buildMessage()
- .addFields(clippedParquetFields: _*)
- .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
+ if (clippedParquetFields.isEmpty) {
+ CatalystSchemaConverter.EMPTY_MESSAGE
+ } else {
+ Types
+ .buildMessage()
+ .addFields(clippedParquetFields: _*)
+ .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
+ }
}
private def clipParquetType(parquetType: Type, catalystType: DataType): Type = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
index 6f6340f541..3688c3e2b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
@@ -538,6 +538,22 @@ private[parquet] class CatalystSchemaConverter(
private[parquet] object CatalystSchemaConverter {
val SPARK_PARQUET_SCHEMA_NAME = "spark_schema"
+ // !! HACK ALERT !!
+ //
+ // PARQUET-363 & PARQUET-278: parquet-mr 1.8.1 doesn't allow constructing empty GroupType,
+ // which prevents us to avoid selecting any columns for queries like `SELECT COUNT(*) FROM t`.
+ // This issue has been fixed in parquet-mr 1.8.2-SNAPSHOT.
+ //
+ // To workaround this problem, here we first construct a `MessageType` with a single dummy
+ // field, and then remove the field to obtain an empty `MessageType`.
+ //
+ // TODO Reverts this change after upgrading parquet-mr to 1.8.2+
+ val EMPTY_MESSAGE = Types
+ .buildMessage()
+ .required(PrimitiveType.PrimitiveTypeName.INT32).named("dummy")
+ .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
+ EMPTY_MESSAGE.getFields.clear()
+
def checkFieldName(name: String): Unit = {
// ,;{}()\n\t= and space are special characters in Parquet schema
checkConversionRequirement(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 95afdc789f..6240812501 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -22,8 +22,6 @@ import java.io.Serializable
import org.apache.parquet.filter2.predicate._
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.io.api.Binary
-import org.apache.parquet.schema.OriginalType
-import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.spark.sql.sources
import org.apache.spark.sql.types._
@@ -53,18 +51,15 @@ private[sql] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
- // See https://issues.apache.org/jira/browse/SPARK-11153
- /*
// Binary.fromString and Binary.fromByteArray don't accept null values
case StringType =>
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
- Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8"))).orNull)
+ Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
case BinaryType =>
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
- Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
- */
+ Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
}
private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -79,17 +74,14 @@ private[sql] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
- // See https://issues.apache.org/jira/browse/SPARK-11153
- /*
case StringType =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
- Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8"))).orNull)
+ Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
case BinaryType =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
- Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
- */
+ Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
}
private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -102,16 +94,13 @@ private[sql] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
- // See https://issues.apache.org/jira/browse/SPARK-11153
- /*
case StringType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n),
- Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
+ Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
(n: String, v: Any) =>
- FilterApi.lt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
- */
+ FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
}
private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -124,16 +113,13 @@ private[sql] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
- // See https://issues.apache.org/jira/browse/SPARK-11153
- /*
case StringType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n),
- Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
+ Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
(n: String, v: Any) =>
- FilterApi.ltEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
- */
+ FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
}
private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -147,15 +133,13 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
// See https://issues.apache.org/jira/browse/SPARK-11153
- /*
case StringType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n),
- Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
+ Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
(n: String, v: Any) =>
- FilterApi.gt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
- */
+ FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
}
private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -168,16 +152,13 @@ private[sql] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
- // See https://issues.apache.org/jira/browse/SPARK-11153
- /*
case StringType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n),
- Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
+ Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
(n: String, v: Any) =>
- FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
- */
+ FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
}
private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = {
@@ -194,17 +175,14 @@ private[sql] object ParquetFilters {
(n: String, v: Set[Any]) =>
FilterApi.userDefined(doubleColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Double]]))
- // See https://issues.apache.org/jira/browse/SPARK-11153
- /*
case StringType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(binaryColumn(n),
- SetInFilter(v.map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8")))))
+ SetInFilter(v.map(s => Binary.fromString(s.asInstanceOf[String]))))
case BinaryType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(binaryColumn(n),
- SetInFilter(v.map(e => Binary.fromByteArray(e.asInstanceOf[Array[Byte]]))))
- */
+ SetInFilter(v.map(e => Binary.fromReusedByteArray(e.asInstanceOf[Array[Byte]]))))
}
/**
@@ -228,8 +206,6 @@ private[sql] object ParquetFilters {
def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = {
val dataTypeOf = getFieldMap(schema).toMap
- relaxParquetValidTypeMap
-
// NOTE:
//
// For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
@@ -299,35 +275,4 @@ private[sql] object ParquetFilters {
case _ => None
}
}
-
- // !! HACK ALERT !!
- //
- // This lazy val is a workaround for PARQUET-201, and should be removed once we upgrade to
- // parquet-mr 1.8.1 or higher versions.
- //
- // In Parquet, not all types of columns can be used for filter push-down optimization. The set
- // of valid column types is controlled by `ValidTypeMap`. Unfortunately, in parquet-mr 1.7.0 and
- // prior versions, the limitation is too strict, and doesn't allow `BINARY (ENUM)` columns to be
- // pushed down.
- //
- // This restriction is problematic for Spark SQL, because Spark SQL doesn't have a type that maps
- // to Parquet original type `ENUM` directly, and always converts `ENUM` to `StringType`. Thus,
- // a predicate involving a `ENUM` field can be pushed-down as a string column, which is perfectly
- // legal except that it fails the `ValidTypeMap` check.
- //
- // Here we add `BINARY (ENUM)` into `ValidTypeMap` lazily via reflection to workaround this issue.
- private lazy val relaxParquetValidTypeMap: Unit = {
- val constructor = Class
- .forName(classOf[ValidTypeMap].getCanonicalName + "$FullTypeDescriptor")
- .getDeclaredConstructor(classOf[PrimitiveTypeName], classOf[OriginalType])
-
- constructor.setAccessible(true)
- val enumTypeDescriptor = constructor
- .newInstance(PrimitiveTypeName.BINARY, OriginalType.ENUM)
- .asInstanceOf[AnyRef]
-
- val addMethod = classOf[ValidTypeMap].getDeclaredMethods.find(_.getName == "add").get
- addMethod.setAccessible(true)
- addMethod.invoke(null, classOf[Binary], enumTypeDescriptor)
- }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 6db6492282..0b5038cb82 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
-import org.apache.parquet.schema.MessageTypeParser
+import org.apache.parquet.schema.{MessageType, MessageTypeParser}
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.ScalaReflection
@@ -1065,18 +1065,26 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
parquetSchema: String,
catalystSchema: StructType,
expectedSchema: String): Unit = {
+ testSchemaClipping(testName, parquetSchema, catalystSchema,
+ MessageTypeParser.parseMessageType(expectedSchema))
+ }
+
+ private def testSchemaClipping(
+ testName: String,
+ parquetSchema: String,
+ catalystSchema: StructType,
+ expectedSchema: MessageType): Unit = {
test(s"Clipping - $testName") {
- val expected = MessageTypeParser.parseMessageType(expectedSchema)
val actual = CatalystReadSupport.clipParquetSchema(
MessageTypeParser.parseMessageType(parquetSchema), catalystSchema)
try {
- expected.checkContains(actual)
- actual.checkContains(expected)
+ expectedSchema.checkContains(actual)
+ actual.checkContains(expectedSchema)
} catch { case cause: Throwable =>
fail(
s"""Expected clipped schema:
- |$expected
+ |$expectedSchema
|Actual clipped schema:
|$actual
""".stripMargin,
@@ -1429,7 +1437,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
catalystSchema = new StructType(),
- expectedSchema = "message root {}")
+ expectedSchema = CatalystSchemaConverter.EMPTY_MESSAGE)
testSchemaClipping(
"disjoint field sets",