diff options
author | Volodymyr Lyubinets <vlyubin@gmail.com> | 2015-04-10 12:09:54 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-04-10 12:09:54 -0700 |
commit | b9baa4cd9f6e9fc58161f79744b6b7729894d920 (patch) | |
tree | 95f284abcfe02865198a8f0f8bf508c957140b73 /sql | |
parent | 9f5ed99d644949443d19c4895de6e0ece4be24d0 (diff) | |
download | spark-b9baa4cd9f6e9fc58161f79744b6b7729894d920.tar.gz spark-b9baa4cd9f6e9fc58161f79744b6b7729894d920.tar.bz2 spark-b9baa4cd9f6e9fc58161f79744b6b7729894d920.zip |
[SQL] [SPARK-6794] Use kryo-based SparkSqlSerializer for GeneralHashedRelation
Benchmarking results: http://pastie.org/private/1dneo1mta5zpsw6gmsoeq
Author: Volodymyr Lyubinets <vlyubin@gmail.com>
Closes #5433 from vlyubin/joins and squashes the following commits:
d70c829 [Volodymyr Lyubinets] Addressed review feedback
527eac6 [Volodymyr Lyubinets] Use kryo-based SparkSqlSerializer for GeneralHashedRelation
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala | 3 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala | 47 |
2 files changed, 44 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala index 967bd76b30..347e2f4a1a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala @@ -26,14 +26,13 @@ import scala.reflect.ClassTag import com.clearspring.analytics.stream.cardinality.HyperLogLog import com.esotericsoftware.kryo.io.{Input, Output} import com.esotericsoftware.kryo.{Serializer, Kryo} -import com.twitter.chill.{AllScalaRegistrar, ResourcePool} +import com.twitter.chill.ResourcePool import org.apache.spark.{SparkEnv, SparkConf} import org.apache.spark.serializer.{SerializerInstance, KryoSerializer} import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.util.collection.OpenHashSet import org.apache.spark.util.MutablePair -import org.apache.spark.util.Utils import org.apache.spark.sql.catalyst.expressions.codegen.{IntegerHashSet, LongHashSet} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 2fa1cf5add..ab84c123e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql.execution.joins +import java.io.{ObjectInput, ObjectOutput, Externalizable} import java.util.{HashMap => JavaHashMap} import org.apache.spark.sql.catalyst.expressions.{Projection, Row} +import org.apache.spark.sql.execution.SparkSqlSerializer import org.apache.spark.util.collection.CompactBuffer @@ -29,16 +31,43 @@ import org.apache.spark.util.collection.CompactBuffer */ private[joins] sealed trait HashedRelation { def get(key: Row): CompactBuffer[Row] + + // This is a helper method to implement Externalizable, and is used by + // GeneralHashedRelation and UniqueKeyHashedRelation + protected def writeBytes(out: ObjectOutput, serialized: Array[Byte]): Unit = { + out.writeInt(serialized.length) // Write the length of serialized bytes first + out.write(serialized) + } + + // This is a helper method to implement Externalizable, and is used by + // GeneralHashedRelation and UniqueKeyHashedRelation + protected def readBytes(in: ObjectInput): Array[Byte] = { + val serializedSize = in.readInt() // Read the length of serialized bytes first + val bytes = new Array[Byte](serializedSize) + in.readFully(bytes) + bytes + } } /** * A general [[HashedRelation]] backed by a hash map that maps the key into a sequence of values. */ -private[joins] final class GeneralHashedRelation(hashTable: JavaHashMap[Row, CompactBuffer[Row]]) - extends HashedRelation with Serializable { +private[joins] final class GeneralHashedRelation( + private var hashTable: JavaHashMap[Row, CompactBuffer[Row]]) + extends HashedRelation with Externalizable { + + def this() = this(null) // Needed for serialization override def get(key: Row): CompactBuffer[Row] = hashTable.get(key) + + override def writeExternal(out: ObjectOutput): Unit = { + writeBytes(out, SparkSqlSerializer.serialize(hashTable)) + } + + override def readExternal(in: ObjectInput): Unit = { + hashTable = SparkSqlSerializer.deserialize(readBytes(in)) + } } @@ -46,8 +75,10 @@ private[joins] final class GeneralHashedRelation(hashTable: JavaHashMap[Row, Com * A specialized [[HashedRelation]] that maps key into a single value. This implementation * assumes the key is unique. */ -private[joins] final class UniqueKeyHashedRelation(hashTable: JavaHashMap[Row, Row]) - extends HashedRelation with Serializable { +private[joins] final class UniqueKeyHashedRelation(private var hashTable: JavaHashMap[Row, Row]) + extends HashedRelation with Externalizable { + + def this() = this(null) // Needed for serialization override def get(key: Row): CompactBuffer[Row] = { val v = hashTable.get(key) @@ -55,6 +86,14 @@ private[joins] final class UniqueKeyHashedRelation(hashTable: JavaHashMap[Row, R } def getValue(key: Row): Row = hashTable.get(key) + + override def writeExternal(out: ObjectOutput): Unit = { + writeBytes(out, SparkSqlSerializer.serialize(hashTable)) + } + + override def readExternal(in: ObjectInput): Unit = { + hashTable = SparkSqlSerializer.deserialize(readBytes(in)) + } } |