diff options
author | Michael Armbrust <michael@databricks.com> | 2014-08-27 00:59:23 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-08-27 00:59:23 -0700 |
commit | e1139dd60e0692e8adb1337c1f605165ce4b8895 (patch) | |
tree | 71988800d8a4072a71b552eb8ec23579129b9156 | |
parent | 3e2864e40472b32e6a7eec5ba3bc83562d2a1a62 (diff) | |
download | spark-e1139dd60e0692e8adb1337c1f605165ce4b8895.tar.gz spark-e1139dd60e0692e8adb1337c1f605165ce4b8895.tar.bz2 spark-e1139dd60e0692e8adb1337c1f605165ce4b8895.zip |
[SPARK-3237][SQL] Fix parquet filters with UDFs
Author: Michael Armbrust <michael@databricks.com>
Closes #2153 from marmbrus/parquetFilters and squashes the following commits:
712731a [Michael Armbrust] Use closure serializer for sending filters.
1e83f80 [Michael Armbrust] Clean udf functions.
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala | 4 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala | 8 |
2 files changed, 10 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala index 63ac2a608b..0b3c1df453 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala @@ -18,10 +18,14 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.types.DataType +import org.apache.spark.util.ClosureCleaner case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expression]) extends Expression { + // Clean function when not called with default no-arg constructor. + if (function != null) { ClosureCleaner.clean(function) } + type EvaluatedType = Any def nullable = true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala index 2298a9b933..fe28e0d726 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.parquet +import java.nio.ByteBuffer + import org.apache.hadoop.conf.Configuration import parquet.filter._ @@ -25,6 +27,7 @@ import parquet.column.ColumnReader import com.google.common.io.BaseEncoding +import org.apache.spark.SparkEnv import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.expressions.{Predicate => CatalystPredicate} import org.apache.spark.sql.catalyst.expressions._ @@ -237,7 +240,8 @@ object ParquetFilters { */ def serializeFilterExpressions(filters: Seq[Expression], conf: Configuration): Unit = { if (filters.length > 0) { - val serialized: Array[Byte] = SparkSqlSerializer.serialize(filters) + val serialized: Array[Byte] = + SparkEnv.get.closureSerializer.newInstance().serialize(filters).array() val encoded: String = BaseEncoding.base64().encode(serialized) conf.set(PARQUET_FILTER_DATA, encoded) } @@ -252,7 +256,7 @@ object ParquetFilters { val data = conf.get(PARQUET_FILTER_DATA) if (data != null) { val decoded: Array[Byte] = BaseEncoding.base64().decode(data) - SparkSqlSerializer.deserialize(decoded) + SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(decoded)) } else { Seq() } |