aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2017-02-06 09:10:55 +0100
committerReynold Xin <rxin@databricks.com>2017-02-06 09:10:55 +0100
commit7730426cb95eec2652a9ea979ae2c4faf7e585f2 (patch)
tree9d227166a8326277ae5cb795e212ea3b8f1beb10
parent65b10ffb3883cfed5b182db20b55a52ee0d89cba (diff)
downloadspark-7730426cb95eec2652a9ea979ae2c4faf7e585f2.tar.gz
spark-7730426cb95eec2652a9ea979ae2c4faf7e585f2.tar.bz2
spark-7730426cb95eec2652a9ea979ae2c4faf7e585f2.zip
[SPARK-19409][SPARK-17213] Cleanup Parquet workarounds/hacks due to bugs of old Parquet versions
## What changes were proposed in this pull request? We've already upgraded parquet-mr to 1.8.2. This PR does some further cleanup by removing a workaround of PARQUET-686 and a hack due to PARQUET-363 and PARQUET-278. All three Parquet issues are fixed in parquet-mr 1.8.2. ## How was this patch tested? Existing unit tests. Author: Cheng Lian <lian@databricks.com> Closes #16791 from liancheng/parquet-1.8.2-cleanup.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala17
2 files changed, 2 insertions, 33 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 2efeb807a5..a6a6cef586 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -41,8 +41,6 @@ private[parquet] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
- // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
- /*
// Binary.fromString and Binary.fromByteArray don't accept null values
case StringType =>
(n: String, v: Any) => FilterApi.eq(
@@ -52,7 +50,6 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
- */
}
private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -67,8 +64,6 @@ private[parquet] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
- // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
- /*
case StringType =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
@@ -77,7 +72,6 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
- */
}
private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -90,8 +84,6 @@ private[parquet] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
- // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
- /*
case StringType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n),
@@ -99,7 +91,6 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
- */
}
private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -112,8 +103,6 @@ private[parquet] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
- // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
- /*
case StringType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n),
@@ -121,7 +110,6 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
- */
}
private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -134,8 +122,6 @@ private[parquet] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
- // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
- /*
case StringType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n),
@@ -143,7 +129,6 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
- */
}
private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -156,8 +141,6 @@ private[parquet] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
- // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
- /*
case StringType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n),
@@ -165,7 +148,6 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
- */
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index b4f36ce375..66d4027edf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -546,21 +546,8 @@ private[parquet] class ParquetSchemaConverter(
private[parquet] object ParquetSchemaConverter {
val SPARK_PARQUET_SCHEMA_NAME = "spark_schema"
- // !! HACK ALERT !!
- //
- // PARQUET-363 & PARQUET-278: parquet-mr 1.8.1 doesn't allow constructing empty GroupType,
- // which prevents us to avoid selecting any columns for queries like `SELECT COUNT(*) FROM t`.
- // This issue has been fixed in parquet-mr 1.8.2-SNAPSHOT.
- //
- // To workaround this problem, here we first construct a `MessageType` with a single dummy
- // field, and then remove the field to obtain an empty `MessageType`.
- //
- // TODO Reverts this change after upgrading parquet-mr to 1.8.2+
- val EMPTY_MESSAGE = Types
- .buildMessage()
- .required(PrimitiveType.PrimitiveTypeName.INT32).named("dummy")
- .named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
- EMPTY_MESSAGE.getFields.clear()
+ val EMPTY_MESSAGE: MessageType =
+ Types.buildMessage().named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
def checkFieldName(name: String): Unit = {
// ,;{}()\n\t= and space are special characters in Parquet schema