aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-08-27 00:59:23 -0700
committerMichael Armbrust <michael@databricks.com>2014-08-27 00:59:23 -0700
commite1139dd60e0692e8adb1337c1f605165ce4b8895 (patch)
tree71988800d8a4072a71b552eb8ec23579129b9156 /sql/core/src/main
parent3e2864e40472b32e6a7eec5ba3bc83562d2a1a62 (diff)
downloadspark-e1139dd60e0692e8adb1337c1f605165ce4b8895.tar.gz
spark-e1139dd60e0692e8adb1337c1f605165ce4b8895.tar.bz2
spark-e1139dd60e0692e8adb1337c1f605165ce4b8895.zip
[SPARK-3237][SQL] Fix parquet filters with UDFs
Author: Michael Armbrust <michael@databricks.com> Closes #2153 from marmbrus/parquetFilters and squashes the following commits: 712731a [Michael Armbrust] Use closure serializer for sending filters. 1e83f80 [Michael Armbrust] Clean udf functions.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala8
1 files changed, 6 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
index 2298a9b933..fe28e0d726 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.parquet
+import java.nio.ByteBuffer
+
import org.apache.hadoop.conf.Configuration
import parquet.filter._
@@ -25,6 +27,7 @@ import parquet.column.ColumnReader
import com.google.common.io.BaseEncoding
+import org.apache.spark.SparkEnv
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.expressions.{Predicate => CatalystPredicate}
import org.apache.spark.sql.catalyst.expressions._
@@ -237,7 +240,8 @@ object ParquetFilters {
*/
def serializeFilterExpressions(filters: Seq[Expression], conf: Configuration): Unit = {
if (filters.length > 0) {
- val serialized: Array[Byte] = SparkSqlSerializer.serialize(filters)
+ val serialized: Array[Byte] =
+ SparkEnv.get.closureSerializer.newInstance().serialize(filters).array()
val encoded: String = BaseEncoding.base64().encode(serialized)
conf.set(PARQUET_FILTER_DATA, encoded)
}
@@ -252,7 +256,7 @@ object ParquetFilters {
val data = conf.get(PARQUET_FILTER_DATA)
if (data != null) {
val decoded: Array[Byte] = BaseEncoding.base64().decode(data)
- SparkSqlSerializer.deserialize(decoded)
+ SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(decoded))
} else {
Seq()
}