diff options
author | Sameer Agarwal <sameer@databricks.com> | 2016-06-02 10:58:00 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-06-02 10:58:00 -0700 |
commit | 09b3c56c91831b3e8d909521b8f3ffbce4eb0395 (patch) | |
tree | afa9b5044409b49bf03c86bac7b2832311b26691 | |
parent | 7c07d176f3d65235f9376898a7b10b01268c867c (diff) | |
download | spark-09b3c56c91831b3e8d909521b8f3ffbce4eb0395.tar.gz spark-09b3c56c91831b3e8d909521b8f3ffbce4eb0395.tar.bz2 spark-09b3c56c91831b3e8d909521b8f3ffbce4eb0395.zip |
[SPARK-14752][SQL] Explicitly implement KryoSerialization for LazilyGenerateOrdering
## What changes were proposed in this pull request?
This patch fixes a number of `com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException` exceptions reported in [SPARK-15604], [SPARK-14752] etc. (while executing sparkSQL queries with the kryo serializer) by explicitly implementing `KryoSerialization` for `LazilyGenerateOrdering`.
## How was this patch tested?
1. Modified `OrderingSuite` so that all tests in the suite also test kryo serialization (for both interpreted and generated ordering).
2. Manually verified TPC-DS q1.
Author: Sameer Agarwal <sameer@databricks.com>
Closes #13466 from sameeragarwal/kryo.
2 files changed, 23 insertions, 5 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala index c10829d4f1..f4d35d232e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala @@ -19,6 +19,9 @@ package org.apache.spark.sql.catalyst.expressions.codegen import java.io.ObjectInputStream +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} +import com.esotericsoftware.kryo.io.{Input, Output} + import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -147,7 +150,8 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR /** * A lazily generated row ordering comparator. */ -class LazilyGeneratedOrdering(val ordering: Seq[SortOrder]) extends Ordering[InternalRow] { +class LazilyGeneratedOrdering(val ordering: Seq[SortOrder]) + extends Ordering[InternalRow] with KryoSerializable { def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) = this(ordering.map(BindReferences.bindReference(_, inputSchema))) @@ -163,6 +167,14 @@ class LazilyGeneratedOrdering(val ordering: Seq[SortOrder]) extends Ordering[Int in.defaultReadObject() generatedOrdering = GenerateOrdering.generate(ordering) } + + override def write(kryo: Kryo, out: Output): Unit = Utils.tryOrIOException { + kryo.writeObject(out, ordering.toArray) + } + + override def read(kryo: Kryo, in: Input): Unit = Utils.tryOrIOException { + generatedOrdering = GenerateOrdering.generate(kryo.readObject(in, classOf[Array[SortOrder]])) + } } object LazilyGeneratedOrdering { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala index b190d3a00d..8cc2ab46c0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala @@ -19,11 +19,12 @@ package org.apache.spark.sql.catalyst.expressions import scala.math._ -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.serializer.KryoSerializer import org.apache.spark.sql.{RandomDataGenerator, Row} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.dsl.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateOrdering, LazilyGeneratedOrdering} import org.apache.spark.sql.types._ class OrderingSuite extends SparkFunSuite with ExpressionEvalHelper { @@ -44,9 +45,14 @@ class OrderingSuite extends SparkFunSuite with ExpressionEvalHelper { case Ascending => signum(expected) case Descending => -1 * signum(expected) } + + val kryo = new KryoSerializer(new SparkConf).newInstance() val intOrdering = new InterpretedOrdering(sortOrder :: Nil) - val genOrdering = GenerateOrdering.generate(sortOrder :: Nil) - Seq(intOrdering, genOrdering).foreach { ordering => + val genOrdering = new LazilyGeneratedOrdering(sortOrder :: Nil) + val kryoIntOrdering = kryo.deserialize[InterpretedOrdering](kryo.serialize(intOrdering)) + val kryoGenOrdering = kryo.deserialize[LazilyGeneratedOrdering](kryo.serialize(genOrdering)) + + Seq(intOrdering, genOrdering, kryoIntOrdering, kryoGenOrdering).foreach { ordering => assert(ordering.compare(rowA, rowA) === 0) assert(ordering.compare(rowB, rowB) === 0) assert(signum(ordering.compare(rowA, rowB)) === expectedCompareResult) |