aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-08-27 00:59:23 -0700
committerMichael Armbrust <michael@databricks.com>2014-08-27 00:59:54 -0700
commitca01de1b98ae17d9f85dbd07e3546c985061c8a5 (patch)
tree101d24a7320b951e1d06cc8ed30ea2247238799b /sql/core
parent5cf1e440137006eedd6846ac8fa57ccf9fd1958d (diff)
downloadspark-ca01de1b98ae17d9f85dbd07e3546c985061c8a5.tar.gz
spark-ca01de1b98ae17d9f85dbd07e3546c985061c8a5.tar.bz2
spark-ca01de1b98ae17d9f85dbd07e3546c985061c8a5.zip
[SPARK-3237][SQL] Fix parquet filters with UDFs
Author: Michael Armbrust <michael@databricks.com> Closes #2153 from marmbrus/parquetFilters and squashes the following commits: 712731a [Michael Armbrust] Use closure serializer for sending filters. 1e83f80 [Michael Armbrust] Clean udf functions. (cherry picked from commit e1139dd60e0692e8adb1337c1f605165ce4b8895) Signed-off-by: Michael Armbrust <michael@databricks.com>
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala8
1 files changed, 6 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
index 2298a9b933..fe28e0d726 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.parquet
+import java.nio.ByteBuffer
+
import org.apache.hadoop.conf.Configuration
import parquet.filter._
@@ -25,6 +27,7 @@ import parquet.column.ColumnReader
import com.google.common.io.BaseEncoding
+import org.apache.spark.SparkEnv
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.expressions.{Predicate => CatalystPredicate}
import org.apache.spark.sql.catalyst.expressions._
@@ -237,7 +240,8 @@ object ParquetFilters {
*/
def serializeFilterExpressions(filters: Seq[Expression], conf: Configuration): Unit = {
if (filters.length > 0) {
- val serialized: Array[Byte] = SparkSqlSerializer.serialize(filters)
+ val serialized: Array[Byte] =
+ SparkEnv.get.closureSerializer.newInstance().serialize(filters).array()
val encoded: String = BaseEncoding.base64().encode(serialized)
conf.set(PARQUET_FILTER_DATA, encoded)
}
@@ -252,7 +256,7 @@ object ParquetFilters {
val data = conf.get(PARQUET_FILTER_DATA)
if (data != null) {
val decoded: Array[Byte] = BaseEncoding.base64().decode(data)
- SparkSqlSerializer.deserialize(decoded)
+ SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(decoded))
} else {
Seq()
}