aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala38
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala2
-rw-r--r--sql/core/src/test/README.md16
-rw-r--r--sql/core/src/test/avro/parquet-compat.avdl13
-rw-r--r--sql/core/src/test/avro/parquet-compat.avpr13
-rw-r--r--sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/CompatibilityTest.java2
-rw-r--r--sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/Nested.java4
-rw-r--r--sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/ParquetAvroCompat.java4
-rw-r--r--sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/ParquetEnum.java142
-rw-r--r--sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/Suit.java13
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala105
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala33
-rwxr-xr-xsql/core/src/test/scripts/gen-avro.sh (renamed from sql/core/src/test/scripts/gen-code.sh)13
-rwxr-xr-xsql/core/src/test/scripts/gen-thrift.sh27
-rw-r--r--sql/core/src/test/thrift/parquet-compat.thrift2
15 files changed, 332 insertions, 95 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 9e2e232f50..63915e0a28 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -25,9 +25,10 @@ import org.apache.hadoop.conf.Configuration
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.compat.FilterCompat._
import org.apache.parquet.filter2.predicate.FilterApi._
-import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, Statistics}
-import org.apache.parquet.filter2.predicate.UserDefinedPredicate
+import org.apache.parquet.filter2.predicate._
import org.apache.parquet.io.api.Binary
+import org.apache.parquet.schema.OriginalType
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.spark.SparkEnv
import org.apache.spark.sql.catalyst.expressions._
@@ -197,6 +198,8 @@ private[sql] object ParquetFilters {
def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = {
val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
+ relaxParquetValidTypeMap
+
// NOTE:
//
// For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
@@ -239,6 +242,37 @@ private[sql] object ParquetFilters {
}
}
+ // !! HACK ALERT !!
+ //
+ // This lazy val is a workaround for PARQUET-201, and should be removed once we upgrade to
+ // parquet-mr 1.8.1 or higher versions.
+ //
+ // In Parquet, not all types of columns can be used for filter push-down optimization. The set
+ // of valid column types is controlled by `ValidTypeMap`. Unfortunately, in parquet-mr 1.7.0 and
+ // prior versions, the limitation is too strict, and doesn't allow `BINARY (ENUM)` columns to be
+ // pushed down.
+ //
+ // This restriction is problematic for Spark SQL, because Spark SQL doesn't have a type that maps
+ // to Parquet original type `ENUM` directly, and always converts `ENUM` to `StringType`. Thus,
+ // a predicate involving a `ENUM` field can be pushed-down as a string column, which is perfectly
+ // legal except that it fails the `ValidTypeMap` check.
+ //
+ // Here we add `BINARY (ENUM)` into `ValidTypeMap` lazily via reflection to workaround this issue.
+ private lazy val relaxParquetValidTypeMap: Unit = {
+ val constructor = Class
+ .forName(classOf[ValidTypeMap].getCanonicalName + "$FullTypeDescriptor")
+ .getDeclaredConstructor(classOf[PrimitiveTypeName], classOf[OriginalType])
+
+ constructor.setAccessible(true)
+ val enumTypeDescriptor = constructor
+ .newInstance(PrimitiveTypeName.BINARY, OriginalType.ENUM)
+ .asInstanceOf[AnyRef]
+
+ val addMethod = classOf[ValidTypeMap].getDeclaredMethods.find(_.getName == "add").get
+ addMethod.setAccessible(true)
+ addMethod.invoke(null, classOf[Binary], enumTypeDescriptor)
+ }
+
/**
* Converts Catalyst predicate expressions to Parquet filter predicates.
*
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index c71c69b6e8..52fac18ba1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -678,7 +678,7 @@ private[sql] object ParquetRelation extends Logging {
val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
val serializedConf = new SerializableConfiguration(sqlContext.sparkContext.hadoopConfiguration)
- // HACK ALERT:
+ // !! HACK ALERT !!
//
// Parquet requires `FileStatus`es to read footers. Here we try to send cached `FileStatus`es
// to executor side to avoid fetching them again. However, `FileStatus` is not `Serializable`
diff --git a/sql/core/src/test/README.md b/sql/core/src/test/README.md
index 3dd9861b48..421c2ea4f7 100644
--- a/sql/core/src/test/README.md
+++ b/sql/core/src/test/README.md
@@ -6,23 +6,19 @@ The following directories and files are used for Parquet compatibility tests:
.
├── README.md # This file
├── avro
-│   ├── parquet-compat.avdl # Testing Avro IDL
-│   └── parquet-compat.avpr # !! NO TOUCH !! Protocol file generated from parquet-compat.avdl
+│   ├── *.avdl # Testing Avro IDL(s)
+│   └── *.avpr # !! NO TOUCH !! Protocol files generated from Avro IDL(s)
├── gen-java # !! NO TOUCH !! Generated Java code
├── scripts
-│   └── gen-code.sh # Script used to generate Java code for Thrift and Avro
+│   ├── gen-avro.sh # Script used to generate Java code for Avro
+│   └── gen-thrift.sh # Script used to generate Java code for Thrift
└── thrift
- └── parquet-compat.thrift # Testing Thrift schema
+ └── *.thrift # Testing Thrift schema(s)
```
-Generated Java code are used in the following test suites:
-
-- `org.apache.spark.sql.parquet.ParquetAvroCompatibilitySuite`
-- `org.apache.spark.sql.parquet.ParquetThriftCompatibilitySuite`
-
To avoid code generation during build time, Java code generated from testing Thrift schema and Avro IDL are also checked in.
-When updating the testing Thrift schema and Avro IDL, please run `gen-code.sh` to update all the generated Java code.
+When updating the testing Thrift schema and Avro IDL, please run `gen-avro.sh` and `gen-thrift.sh` accordingly to update generated Java code.
## Prerequisites
diff --git a/sql/core/src/test/avro/parquet-compat.avdl b/sql/core/src/test/avro/parquet-compat.avdl
index 24729f6143..8070d0a917 100644
--- a/sql/core/src/test/avro/parquet-compat.avdl
+++ b/sql/core/src/test/avro/parquet-compat.avdl
@@ -16,8 +16,19 @@
*/
// This is a test protocol for testing parquet-avro compatibility.
-@namespace("org.apache.spark.sql.parquet.test.avro")
+@namespace("org.apache.spark.sql.execution.datasources.parquet.test.avro")
protocol CompatibilityTest {
+ enum Suit {
+ SPADES,
+ HEARTS,
+ DIAMONDS,
+ CLUBS
+ }
+
+ record ParquetEnum {
+ Suit suit;
+ }
+
record Nested {
array<int> nested_ints_column;
string nested_string_column;
diff --git a/sql/core/src/test/avro/parquet-compat.avpr b/sql/core/src/test/avro/parquet-compat.avpr
index a83b7c990d..0603917650 100644
--- a/sql/core/src/test/avro/parquet-compat.avpr
+++ b/sql/core/src/test/avro/parquet-compat.avpr
@@ -1,7 +1,18 @@
{
"protocol" : "CompatibilityTest",
- "namespace" : "org.apache.spark.sql.parquet.test.avro",
+ "namespace" : "org.apache.spark.sql.execution.datasources.parquet.test.avro",
"types" : [ {
+ "type" : "enum",
+ "name" : "Suit",
+ "symbols" : [ "SPADES", "HEARTS", "DIAMONDS", "CLUBS" ]
+ }, {
+ "type" : "record",
+ "name" : "ParquetEnum",
+ "fields" : [ {
+ "name" : "suit",
+ "type" : "Suit"
+ } ]
+ }, {
"type" : "record",
"name" : "Nested",
"fields" : [ {
diff --git a/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/CompatibilityTest.java b/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/CompatibilityTest.java
index 70dec1a9d3..2368323cb3 100644
--- a/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/CompatibilityTest.java
+++ b/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/CompatibilityTest.java
@@ -8,7 +8,7 @@ package org.apache.spark.sql.execution.datasources.parquet.test.avro;
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public interface CompatibilityTest {
- public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"CompatibilityTest\",\"namespace\":\"org.apache.spark.sql.parquet.test.avro\",\"types\":[{\"type\":\"record\",\"name\":\"Nested\",\"fields\":[{\"name\":\"nested_ints_column\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"nested_string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]},{\"type\":\"record\",\"name\":\"ParquetAvroCompat\",\"fields\":[{\"name\":\"bool_column\",\"type\":\"boolean\"},{\"name\":\"int_column\",\"type\":\"int\"},{\"name\":\"long_column\",\"type\":\"long\"},{\"name\":\"float_column\",\"type\":\"float\"},{\"name\":\"double_column\",\"type\":\"double\"},{\"name\":\"binary_column\",\"type\":\"bytes\"},{\"name\":\"string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"maybe_bool_column\",\"type\":[\"null\",\"boolean\"]},{\"name\":\"maybe_int_column\",\"type\":[\"null\",\"int\"]},{\"name\":\"maybe_long_column\",\"type\":[\"null\",\"long\"]},{\"name\":\"maybe_float_column\",\"type\":[\"null\",\"float\"]},{\"name\":\"maybe_double_column\",\"type\":[\"null\",\"double\"]},{\"name\":\"maybe_binary_column\",\"type\":[\"null\",\"bytes\"]},{\"name\":\"maybe_string_column\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}]},{\"name\":\"strings_column\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}},{\"name\":\"string_to_int_column\",\"type\":{\"type\":\"map\",\"values\":\"int\",\"avro.java.string\":\"String\"}},{\"name\":\"complex_column\",\"type\":{\"type\":\"map\",\"values\":{\"type\":\"array\",\"items\":\"Nested\"},\"avro.java.string\":\"String\"}}]}],\"messages\":{}}");
+ public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"CompatibilityTest\",\"namespace\":\"org.apache.spark.sql.execution.datasources.parquet.test.avro\",\"types\":[{\"type\":\"enum\",\"name\":\"Suit\",\"symbols\":[\"SPADES\",\"HEARTS\",\"DIAMONDS\",\"CLUBS\"]},{\"type\":\"record\",\"name\":\"ParquetEnum\",\"fields\":[{\"name\":\"suit\",\"type\":\"Suit\"}]},{\"type\":\"record\",\"name\":\"Nested\",\"fields\":[{\"name\":\"nested_ints_column\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"nested_string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]},{\"type\":\"record\",\"name\":\"ParquetAvroCompat\",\"fields\":[{\"name\":\"bool_column\",\"type\":\"boolean\"},{\"name\":\"int_column\",\"type\":\"int\"},{\"name\":\"long_column\",\"type\":\"long\"},{\"name\":\"float_column\",\"type\":\"float\"},{\"name\":\"double_column\",\"type\":\"double\"},{\"name\":\"binary_column\",\"type\":\"bytes\"},{\"name\":\"string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"maybe_bool_column\",\"type\":[\"null\",\"boolean\"]},{\"name\":\"maybe_int_column\",\"type\":[\"null\",\"int\"]},{\"name\":\"maybe_long_column\",\"type\":[\"null\",\"long\"]},{\"name\":\"maybe_float_column\",\"type\":[\"null\",\"float\"]},{\"name\":\"maybe_double_column\",\"type\":[\"null\",\"double\"]},{\"name\":\"maybe_binary_column\",\"type\":[\"null\",\"bytes\"]},{\"name\":\"maybe_string_column\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}]},{\"name\":\"strings_column\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}},{\"name\":\"string_to_int_column\",\"type\":{\"type\":\"map\",\"values\":\"int\",\"avro.java.string\":\"String\"}},{\"name\":\"complex_column\",\"type\":{\"type\":\"map\",\"values\":{\"type\":\"array\",\"items\":\"Nested\"},\"avro.java.string\":\"String\"}}]}],\"messages\":{}}");
@SuppressWarnings("all")
public interface Callback extends CompatibilityTest {
diff --git a/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/Nested.java b/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/Nested.java
index a0a406bcd1..a7bf484191 100644
--- a/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/Nested.java
+++ b/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/Nested.java
@@ -3,11 +3,11 @@
*
* DO NOT EDIT DIRECTLY
*/
-package org.apache.spark.sql.execution.datasources.parquet.test.avro;
+package org.apache.spark.sql.execution.datasources.parquet.test.avro;
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class Nested extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
- public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Nested\",\"namespace\":\"org.apache.spark.sql.parquet.test.avro\",\"fields\":[{\"name\":\"nested_ints_column\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"nested_string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}");
+ public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Nested\",\"namespace\":\"org.apache.spark.sql.execution.datasources.parquet.test.avro\",\"fields\":[{\"name\":\"nested_ints_column\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"nested_string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
@Deprecated public java.util.List<java.lang.Integer> nested_ints_column;
@Deprecated public java.lang.String nested_string_column;
diff --git a/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/ParquetAvroCompat.java b/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/ParquetAvroCompat.java
index 6198b00b1e..681cacbd12 100644
--- a/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/ParquetAvroCompat.java
+++ b/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/ParquetAvroCompat.java
@@ -3,11 +3,11 @@
*
* DO NOT EDIT DIRECTLY
*/
-package org.apache.spark.sql.execution.datasources.parquet.test.avro;
+package org.apache.spark.sql.execution.datasources.parquet.test.avro;
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
- public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"ParquetAvroCompat\",\"namespace\":\"org.apache.spark.sql.parquet.test.avro\",\"fields\":[{\"name\":\"bool_column\",\"type\":\"boolean\"},{\"name\":\"int_column\",\"type\":\"int\"},{\"name\":\"long_column\",\"type\":\"long\"},{\"name\":\"float_column\",\"type\":\"float\"},{\"name\":\"double_column\",\"type\":\"double\"},{\"name\":\"binary_column\",\"type\":\"bytes\"},{\"name\":\"string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"maybe_bool_column\",\"type\":[\"null\",\"boolean\"]},{\"name\":\"maybe_int_column\",\"type\":[\"null\",\"int\"]},{\"name\":\"maybe_long_column\",\"type\":[\"null\",\"long\"]},{\"name\":\"maybe_float_column\",\"type\":[\"null\",\"float\"]},{\"name\":\"maybe_double_column\",\"type\":[\"null\",\"double\"]},{\"name\":\"maybe_binary_column\",\"type\":[\"null\",\"bytes\"]},{\"name\":\"maybe_string_column\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}]},{\"name\":\"strings_column\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}},{\"name\":\"string_to_int_column\",\"type\":{\"type\":\"map\",\"values\":\"int\",\"avro.java.string\":\"String\"}},{\"name\":\"complex_column\",\"type\":{\"type\":\"map\",\"values\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Nested\",\"fields\":[{\"name\":\"nested_ints_column\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"nested_string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}},\"avro.java.string\":\"String\"}}]}");
+ public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"ParquetAvroCompat\",\"namespace\":\"org.apache.spark.sql.execution.datasources.parquet.test.avro\",\"fields\":[{\"name\":\"bool_column\",\"type\":\"boolean\"},{\"name\":\"int_column\",\"type\":\"int\"},{\"name\":\"long_column\",\"type\":\"long\"},{\"name\":\"float_column\",\"type\":\"float\"},{\"name\":\"double_column\",\"type\":\"double\"},{\"name\":\"binary_column\",\"type\":\"bytes\"},{\"name\":\"string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"maybe_bool_column\",\"type\":[\"null\",\"boolean\"]},{\"name\":\"maybe_int_column\",\"type\":[\"null\",\"int\"]},{\"name\":\"maybe_long_column\",\"type\":[\"null\",\"long\"]},{\"name\":\"maybe_float_column\",\"type\":[\"null\",\"float\"]},{\"name\":\"maybe_double_column\",\"type\":[\"null\",\"double\"]},{\"name\":\"maybe_binary_column\",\"type\":[\"null\",\"bytes\"]},{\"name\":\"maybe_string_column\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}]},{\"name\":\"strings_column\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}},{\"name\":\"string_to_int_column\",\"type\":{\"type\":\"map\",\"values\":\"int\",\"avro.java.string\":\"String\"}},{\"name\":\"complex_column\",\"type\":{\"type\":\"map\",\"values\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Nested\",\"fields\":[{\"name\":\"nested_ints_column\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"nested_string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}},\"avro.java.string\":\"String\"}}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
@Deprecated public boolean bool_column;
@Deprecated public int int_column;
diff --git a/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/ParquetEnum.java b/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/ParquetEnum.java
new file mode 100644
index 0000000000..05fefe4cee
--- /dev/null
+++ b/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/ParquetEnum.java
@@ -0,0 +1,142 @@
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.spark.sql.execution.datasources.parquet.test.avro;
+@SuppressWarnings("all")
+@org.apache.avro.specific.AvroGenerated
+public class ParquetEnum extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+ public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"ParquetEnum\",\"namespace\":\"org.apache.spark.sql.execution.datasources.parquet.test.avro\",\"fields\":[{\"name\":\"suit\",\"type\":{\"type\":\"enum\",\"name\":\"Suit\",\"symbols\":[\"SPADES\",\"HEARTS\",\"DIAMONDS\",\"CLUBS\"]}}]}");
+ public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+ @Deprecated public org.apache.spark.sql.execution.datasources.parquet.test.avro.Suit suit;
+
+ /**
+ * Default constructor. Note that this does not initialize fields
+ * to their default values from the schema. If that is desired then
+ * one should use <code>newBuilder()</code>.
+ */
+ public ParquetEnum() {}
+
+ /**
+ * All-args constructor.
+ */
+ public ParquetEnum(org.apache.spark.sql.execution.datasources.parquet.test.avro.Suit suit) {
+ this.suit = suit;
+ }
+
+ public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+ // Used by DatumWriter. Applications should not call.
+ public java.lang.Object get(int field$) {
+ switch (field$) {
+ case 0: return suit;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+ // Used by DatumReader. Applications should not call.
+ @SuppressWarnings(value="unchecked")
+ public void put(int field$, java.lang.Object value$) {
+ switch (field$) {
+ case 0: suit = (org.apache.spark.sql.execution.datasources.parquet.test.avro.Suit)value$; break;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+
+ /**
+ * Gets the value of the 'suit' field.
+ */
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.Suit getSuit() {
+ return suit;
+ }
+
+ /**
+ * Sets the value of the 'suit' field.
+ * @param value the value to set.
+ */
+ public void setSuit(org.apache.spark.sql.execution.datasources.parquet.test.avro.Suit value) {
+ this.suit = value;
+ }
+
+ /** Creates a new ParquetEnum RecordBuilder */
+ public static org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetEnum.Builder newBuilder() {
+ return new org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetEnum.Builder();
+ }
+
+ /** Creates a new ParquetEnum RecordBuilder by copying an existing Builder */
+ public static org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetEnum.Builder newBuilder(org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetEnum.Builder other) {
+ return new org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetEnum.Builder(other);
+ }
+
+ /** Creates a new ParquetEnum RecordBuilder by copying an existing ParquetEnum instance */
+ public static org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetEnum.Builder newBuilder(org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetEnum other) {
+ return new org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetEnum.Builder(other);
+ }
+
+ /**
+ * RecordBuilder for ParquetEnum instances.
+ */
+ public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<ParquetEnum>
+ implements org.apache.avro.data.RecordBuilder<ParquetEnum> {
+
+ private org.apache.spark.sql.execution.datasources.parquet.test.avro.Suit suit;
+
+ /** Creates a new Builder */
+ private Builder() {
+ super(org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetEnum.SCHEMA$);
+ }
+
+ /** Creates a Builder by copying an existing Builder */
+ private Builder(org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetEnum.Builder other) {
+ super(other);
+ if (isValidValue(fields()[0], other.suit)) {
+ this.suit = data().deepCopy(fields()[0].schema(), other.suit);
+ fieldSetFlags()[0] = true;
+ }
+ }
+
+ /** Creates a Builder by copying an existing ParquetEnum instance */
+ private Builder(org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetEnum other) {
+ super(org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetEnum.SCHEMA$);
+ if (isValidValue(fields()[0], other.suit)) {
+ this.suit = data().deepCopy(fields()[0].schema(), other.suit);
+ fieldSetFlags()[0] = true;
+ }
+ }
+
+ /** Gets the value of the 'suit' field */
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.Suit getSuit() {
+ return suit;
+ }
+
+ /** Sets the value of the 'suit' field */
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetEnum.Builder setSuit(org.apache.spark.sql.execution.datasources.parquet.test.avro.Suit value) {
+ validate(fields()[0], value);
+ this.suit = value;
+ fieldSetFlags()[0] = true;
+ return this;
+ }
+
+ /** Checks whether the 'suit' field has been set */
+ public boolean hasSuit() {
+ return fieldSetFlags()[0];
+ }
+
+ /** Clears the value of the 'suit' field */
+ public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetEnum.Builder clearSuit() {
+ suit = null;
+ fieldSetFlags()[0] = false;
+ return this;
+ }
+
+ @Override
+ public ParquetEnum build() {
+ try {
+ ParquetEnum record = new ParquetEnum();
+ record.suit = fieldSetFlags()[0] ? this.suit : (org.apache.spark.sql.execution.datasources.parquet.test.avro.Suit) defaultValue(fields()[0]);
+ return record;
+ } catch (Exception e) {
+ throw new org.apache.avro.AvroRuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/Suit.java b/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/Suit.java
new file mode 100644
index 0000000000..00711a0c2a
--- /dev/null
+++ b/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/Suit.java
@@ -0,0 +1,13 @@
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.spark.sql.execution.datasources.parquet.test.avro;
+@SuppressWarnings("all")
+@org.apache.avro.specific.AvroGenerated
+public enum Suit {
+ SPADES, HEARTS, DIAMONDS, CLUBS ;
+ public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"enum\",\"name\":\"Suit\",\"namespace\":\"org.apache.spark.sql.execution.datasources.parquet.test.avro\",\"symbols\":[\"SPADES\",\"HEARTS\",\"DIAMONDS\",\"CLUBS\"]}");
+ public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala
index 4d9c07bb7a..866a975ad5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala
@@ -22,10 +22,12 @@ import java.util.{List => JList, Map => JMap}
import scala.collection.JavaConversions._
+import org.apache.avro.Schema
+import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
-import org.apache.spark.sql.execution.datasources.parquet.test.avro.{Nested, ParquetAvroCompat}
+import org.apache.spark.sql.execution.datasources.parquet.test.avro.{Nested, ParquetAvroCompat, ParquetEnum, Suit}
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.{Row, SQLContext}
@@ -34,52 +36,55 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest {
override val sqlContext: SQLContext = TestSQLContext
- override protected def beforeAll(): Unit = {
- super.beforeAll()
-
- val writer =
- new AvroParquetWriter[ParquetAvroCompat](
- new Path(parquetStore.getCanonicalPath),
- ParquetAvroCompat.getClassSchema)
-
- (0 until 10).foreach(i => writer.write(makeParquetAvroCompat(i)))
- writer.close()
+ private def withWriter[T <: IndexedRecord]
+ (path: String, schema: Schema)
+ (f: AvroParquetWriter[T] => Unit) = {
+ val writer = new AvroParquetWriter[T](new Path(path), schema)
+ try f(writer) finally writer.close()
}
test("Read Parquet file generated by parquet-avro") {
- logInfo(
- s"""Schema of the Parquet file written by parquet-avro:
- |${readParquetSchema(parquetStore.getCanonicalPath)}
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+
+ withWriter[ParquetAvroCompat](path, ParquetAvroCompat.getClassSchema) { writer =>
+ (0 until 10).foreach(i => writer.write(makeParquetAvroCompat(i)))
+ }
+
+ logInfo(
+ s"""Schema of the Parquet file written by parquet-avro:
+ |${readParquetSchema(path)}
""".stripMargin)
- checkAnswer(sqlContext.read.parquet(parquetStore.getCanonicalPath), (0 until 10).map { i =>
- def nullable[T <: AnyRef]: ( => T) => T = makeNullable[T](i)
-
- Row(
- i % 2 == 0,
- i,
- i.toLong * 10,
- i.toFloat + 0.1f,
- i.toDouble + 0.2d,
- s"val_$i".getBytes,
- s"val_$i",
-
- nullable(i % 2 == 0: java.lang.Boolean),
- nullable(i: Integer),
- nullable(i.toLong: java.lang.Long),
- nullable(i.toFloat + 0.1f: java.lang.Float),
- nullable(i.toDouble + 0.2d: java.lang.Double),
- nullable(s"val_$i".getBytes),
- nullable(s"val_$i"),
-
- Seq.tabulate(3)(n => s"arr_${i + n}"),
- Seq.tabulate(3)(n => n.toString -> (i + n: Integer)).toMap,
- Seq.tabulate(3) { n =>
- (i + n).toString -> Seq.tabulate(3) { m =>
- Row(Seq.tabulate(3)(j => i + j + m), s"val_${i + m}")
- }
- }.toMap)
- })
+ checkAnswer(sqlContext.read.parquet(path), (0 until 10).map { i =>
+ def nullable[T <: AnyRef]: ( => T) => T = makeNullable[T](i)
+
+ Row(
+ i % 2 == 0,
+ i,
+ i.toLong * 10,
+ i.toFloat + 0.1f,
+ i.toDouble + 0.2d,
+ s"val_$i".getBytes,
+ s"val_$i",
+
+ nullable(i % 2 == 0: java.lang.Boolean),
+ nullable(i: Integer),
+ nullable(i.toLong: java.lang.Long),
+ nullable(i.toFloat + 0.1f: java.lang.Float),
+ nullable(i.toDouble + 0.2d: java.lang.Double),
+ nullable(s"val_$i".getBytes),
+ nullable(s"val_$i"),
+
+ Seq.tabulate(3)(n => s"arr_${i + n}"),
+ Seq.tabulate(3)(n => n.toString -> (i + n: Integer)).toMap,
+ Seq.tabulate(3) { n =>
+ (i + n).toString -> Seq.tabulate(3) { m =>
+ Row(Seq.tabulate(3)(j => i + j + m), s"val_${i + m}")
+ }
+ }.toMap)
+ })
+ }
}
def makeParquetAvroCompat(i: Int): ParquetAvroCompat = {
@@ -122,4 +127,20 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest {
.build()
}
+
+ test("SPARK-9407 Don't push down predicates involving Parquet ENUM columns") {
+ import sqlContext.implicits._
+
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+
+ withWriter[ParquetEnum](path, ParquetEnum.getClassSchema) { writer =>
+ (0 until 4).foreach { i =>
+ writer.write(ParquetEnum.newBuilder().setSuit(Suit.values.apply(i)).build())
+ }
+ }
+
+ checkAnswer(sqlContext.read.parquet(path).filter('suit === "SPADES"), Row("SPADES"))
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala
index 68f35b1f3a..0ea64aa2a5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala
@@ -16,45 +16,28 @@
*/
package org.apache.spark.sql.execution.datasources.parquet
-import java.io.File
import scala.collection.JavaConversions._
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.schema.MessageType
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.QueryTest
-import org.apache.spark.util.Utils
abstract class ParquetCompatibilityTest extends QueryTest with ParquetTest with BeforeAndAfterAll {
- protected var parquetStore: File = _
-
- /**
- * Optional path to a staging subdirectory which may be created during query processing
- * (Hive does this).
- * Parquet files under this directory will be ignored in [[readParquetSchema()]]
- * @return an optional staging directory to ignore when scanning for parquet files.
- */
- protected def stagingDir: Option[String] = None
-
- override protected def beforeAll(): Unit = {
- parquetStore = Utils.createTempDir(namePrefix = "parquet-compat_")
- parquetStore.delete()
- }
-
- override protected def afterAll(): Unit = {
- Utils.deleteRecursively(parquetStore)
+ def readParquetSchema(path: String): MessageType = {
+ readParquetSchema(path, { path => !path.getName.startsWith("_") })
}
- def readParquetSchema(path: String): MessageType = {
+ def readParquetSchema(path: String, pathFilter: Path => Boolean): MessageType = {
val fsPath = new Path(path)
val fs = fsPath.getFileSystem(configuration)
- val parquetFiles = fs.listStatus(fsPath).toSeq.filterNot { status =>
- status.getPath.getName.startsWith("_") ||
- stagingDir.map(status.getPath.getName.startsWith).getOrElse(false)
- }
+ val parquetFiles = fs.listStatus(fsPath, new PathFilter {
+ override def accept(path: Path): Boolean = pathFilter(path)
+ }).toSeq
+
val footers = ParquetFileReader.readAllFootersInParallel(configuration, parquetFiles, true)
footers.head.getParquetMetadata.getFileMetaData.getSchema
}
diff --git a/sql/core/src/test/scripts/gen-code.sh b/sql/core/src/test/scripts/gen-avro.sh
index 5d8d8ad085..48174b287f 100755
--- a/sql/core/src/test/scripts/gen-code.sh
+++ b/sql/core/src/test/scripts/gen-avro.sh
@@ -22,10 +22,9 @@ cd -
rm -rf $BASEDIR/gen-java
mkdir -p $BASEDIR/gen-java
-thrift\
- --gen java\
- -out $BASEDIR/gen-java\
- $BASEDIR/thrift/parquet-compat.thrift
-
-avro-tools idl $BASEDIR/avro/parquet-compat.avdl > $BASEDIR/avro/parquet-compat.avpr
-avro-tools compile -string protocol $BASEDIR/avro/parquet-compat.avpr $BASEDIR/gen-java
+for input in `ls $BASEDIR/avro/*.avdl`; do
+ filename=$(basename "$input")
+ filename="${filename%.*}"
+ avro-tools idl $input> $BASEDIR/avro/${filename}.avpr
+ avro-tools compile -string protocol $BASEDIR/avro/${filename}.avpr $BASEDIR/gen-java
+done
diff --git a/sql/core/src/test/scripts/gen-thrift.sh b/sql/core/src/test/scripts/gen-thrift.sh
new file mode 100755
index 0000000000..ada432c68a
--- /dev/null
+++ b/sql/core/src/test/scripts/gen-thrift.sh
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+cd $(dirname $0)/..
+BASEDIR=`pwd`
+cd -
+
+rm -rf $BASEDIR/gen-java
+mkdir -p $BASEDIR/gen-java
+
+for input in `ls $BASEDIR/thrift/*.thrift`; do
+ thrift --gen java -out $BASEDIR/gen-java $input
+done
diff --git a/sql/core/src/test/thrift/parquet-compat.thrift b/sql/core/src/test/thrift/parquet-compat.thrift
index fa5ed8c623..98bf778aec 100644
--- a/sql/core/src/test/thrift/parquet-compat.thrift
+++ b/sql/core/src/test/thrift/parquet-compat.thrift
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-namespace java org.apache.spark.sql.parquet.test.thrift
+namespace java org.apache.spark.sql.execution.datasources.parquet.test.thrift
enum Suit {
SPADES,