aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-10-21 09:02:20 +0800
committerCheng Lian <lian@databricks.com>2015-10-21 09:02:59 +0800
commit89e6db6150704deab46232352d1986bc1449883b (patch)
treed292f300840ea50c661bcf156384f8eed1fd3755 /sql
parentaea7142c9802d1e855443c01621ebc8d57be8c5e (diff)
downloadspark-89e6db6150704deab46232352d1986bc1449883b.tar.gz
spark-89e6db6150704deab46232352d1986bc1449883b.tar.bz2
spark-89e6db6150704deab46232352d1986bc1449883b.zip
[SPARK-11153][SQL] Disables Parquet filter push-down for string and binary columns
Due to PARQUET-251, `BINARY` columns in existing Parquet files may be written with corrupted statistics information. This information is used by filter push-down optimization. Since Spark 1.5 turns on Parquet filter push-down by default, we may end up with wrong query results. PARQUET-251 has been fixed in parquet-mr 1.8.1, but Spark 1.5 is still using 1.7.0. This affects all Spark SQL data types that can be mapped to Parquet {{BINARY}}, namely: - `StringType` - `BinaryType` - `DecimalType` (But Spark SQL doesn't support pushing down filters involving `DecimalType` columns for now.) To avoid wrong query results, we should disable filter push-down for columns of `StringType` and `BinaryType` until we upgrade to parquet-mr 1.8. Author: Cheng Lian <lian@databricks.com> Closes #9152 from liancheng/spark-11153.workaround-parquet-251. (cherry picked from commit 0887e5e87891e8e22f534ca6d0406daf86ec2dad) Signed-off-by: Cheng Lian <lian@databricks.com>
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala27
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala6
2 files changed, 31 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 78040d99fb..0771432937 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -53,6 +53,8 @@ private[sql] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+ // See https://issues.apache.org/jira/browse/SPARK-11153
+ /*
// Binary.fromString and Binary.fromByteArray don't accept null values
case StringType =>
(n: String, v: Any) => FilterApi.eq(
@@ -62,6 +64,7 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
+ */
}
private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -75,6 +78,9 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+
+ // See https://issues.apache.org/jira/browse/SPARK-11153
+ /*
case StringType =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
@@ -83,6 +89,7 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
+ */
}
private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -94,6 +101,9 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+
+ // See https://issues.apache.org/jira/browse/SPARK-11153
+ /*
case StringType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n),
@@ -101,6 +111,7 @@ private[sql] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+ */
}
private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -112,6 +123,9 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+
+ // See https://issues.apache.org/jira/browse/SPARK-11153
+ /*
case StringType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n),
@@ -119,6 +133,7 @@ private[sql] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+ */
}
private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -130,6 +145,9 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+
+ // See https://issues.apache.org/jira/browse/SPARK-11153
+ /*
case StringType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n),
@@ -137,6 +155,7 @@ private[sql] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+ */
}
private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -148,6 +167,9 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+
+ // See https://issues.apache.org/jira/browse/SPARK-11153
+ /*
case StringType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n),
@@ -155,6 +177,7 @@ private[sql] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+ */
}
private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = {
@@ -170,6 +193,9 @@ private[sql] object ParquetFilters {
case DoubleType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(doubleColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Double]]))
+
+ // See https://issues.apache.org/jira/browse/SPARK-11153
+ /*
case StringType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(binaryColumn(n),
@@ -178,6 +204,7 @@ private[sql] object ParquetFilters {
(n: String, v: Set[Any]) =>
FilterApi.userDefined(binaryColumn(n),
SetInFilter(v.map(e => Binary.fromByteArray(e.asInstanceOf[Array[Byte]]))))
+ */
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 7a23f57f40..13fdd555a4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -219,7 +219,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
- test("filter pushdown - string") {
+ // See https://issues.apache.org/jira/browse/SPARK-11153
+ ignore("filter pushdown - string") {
withParquetDataFrame((1 to 4).map(i => Tuple1(i.toString))) { implicit df =>
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate(
@@ -247,7 +248,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
- test("filter pushdown - binary") {
+ // See https://issues.apache.org/jira/browse/SPARK-11153
+ ignore("filter pushdown - binary") {
implicit class IntToBinary(int: Int) {
def b: Array[Byte] = int.toString.getBytes("UTF-8")
}