aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorSameer Agarwal <sameer@databricks.com>2016-06-02 10:58:00 -0700
committerReynold Xin <rxin@databricks.com>2016-06-02 10:58:00 -0700
commit09b3c56c91831b3e8d909521b8f3ffbce4eb0395 (patch)
treeafa9b5044409b49bf03c86bac7b2832311b26691 /sql/catalyst
parent7c07d176f3d65235f9376898a7b10b01268c867c (diff)
downloadspark-09b3c56c91831b3e8d909521b8f3ffbce4eb0395.tar.gz
spark-09b3c56c91831b3e8d909521b8f3ffbce4eb0395.tar.bz2
spark-09b3c56c91831b3e8d909521b8f3ffbce4eb0395.zip
[SPARK-14752][SQL] Explicitly implement KryoSerialization for LazilyGenerateOrdering
## What changes were proposed in this pull request? This patch fixes a number of `com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException` exceptions reported in [SPARK-15604], [SPARK-14752] etc. (while executing sparkSQL queries with the kryo serializer) by explicitly implementing `KryoSerialization` for `LazilyGenerateOrdering`. ## How was this patch tested? 1. Modified `OrderingSuite` so that all tests in the suite also test kryo serialization (for both interpreted and generated ordering). 2. Manually verified TPC-DS q1. Author: Sameer Agarwal <sameer@databricks.com> Closes #13466 from sameeragarwal/kryo.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala14
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala14
2 files changed, 23 insertions, 5 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
index c10829d4f1..f4d35d232e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
@@ -19,6 +19,9 @@ package org.apache.spark.sql.catalyst.expressions.codegen
import java.io.ObjectInputStream
+import com.esotericsoftware.kryo.{Kryo, KryoSerializable}
+import com.esotericsoftware.kryo.io.{Input, Output}
+
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
@@ -147,7 +150,8 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
/**
* A lazily generated row ordering comparator.
*/
-class LazilyGeneratedOrdering(val ordering: Seq[SortOrder]) extends Ordering[InternalRow] {
+class LazilyGeneratedOrdering(val ordering: Seq[SortOrder])
+ extends Ordering[InternalRow] with KryoSerializable {
def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
this(ordering.map(BindReferences.bindReference(_, inputSchema)))
@@ -163,6 +167,14 @@ class LazilyGeneratedOrdering(val ordering: Seq[SortOrder]) extends Ordering[Int
in.defaultReadObject()
generatedOrdering = GenerateOrdering.generate(ordering)
}
+
+ override def write(kryo: Kryo, out: Output): Unit = Utils.tryOrIOException {
+ kryo.writeObject(out, ordering.toArray)
+ }
+
+ override def read(kryo: Kryo, in: Input): Unit = Utils.tryOrIOException {
+ generatedOrdering = GenerateOrdering.generate(kryo.readObject(in, classOf[Array[SortOrder]]))
+ }
}
object LazilyGeneratedOrdering {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala
index b190d3a00d..8cc2ab46c0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala
@@ -19,11 +19,12 @@ package org.apache.spark.sql.catalyst.expressions
import scala.math._
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.{RandomDataGenerator, Row}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering
+import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateOrdering, LazilyGeneratedOrdering}
import org.apache.spark.sql.types._
class OrderingSuite extends SparkFunSuite with ExpressionEvalHelper {
@@ -44,9 +45,14 @@ class OrderingSuite extends SparkFunSuite with ExpressionEvalHelper {
case Ascending => signum(expected)
case Descending => -1 * signum(expected)
}
+
+ val kryo = new KryoSerializer(new SparkConf).newInstance()
val intOrdering = new InterpretedOrdering(sortOrder :: Nil)
- val genOrdering = GenerateOrdering.generate(sortOrder :: Nil)
- Seq(intOrdering, genOrdering).foreach { ordering =>
+ val genOrdering = new LazilyGeneratedOrdering(sortOrder :: Nil)
+ val kryoIntOrdering = kryo.deserialize[InterpretedOrdering](kryo.serialize(intOrdering))
+ val kryoGenOrdering = kryo.deserialize[LazilyGeneratedOrdering](kryo.serialize(genOrdering))
+
+ Seq(intOrdering, genOrdering, kryoIntOrdering, kryoGenOrdering).foreach { ordering =>
assert(ordering.compare(rowA, rowA) === 0)
assert(ordering.compare(rowB, rowB) === 0)
assert(signum(ordering.compare(rowA, rowB)) === expectedCompareResult)