aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorVolodymyr Lyubinets <vlyubin@gmail.com>2015-04-10 12:09:54 -0700
committerMichael Armbrust <michael@databricks.com>2015-04-10 12:09:54 -0700
commitb9baa4cd9f6e9fc58161f79744b6b7729894d920 (patch)
tree95f284abcfe02865198a8f0f8bf508c957140b73 /sql
parent9f5ed99d644949443d19c4895de6e0ece4be24d0 (diff)
downloadspark-b9baa4cd9f6e9fc58161f79744b6b7729894d920.tar.gz
spark-b9baa4cd9f6e9fc58161f79744b6b7729894d920.tar.bz2
spark-b9baa4cd9f6e9fc58161f79744b6b7729894d920.zip
[SQL] [SPARK-6794] Use kryo-based SparkSqlSerializer for GeneralHashedRelation
Benchmarking results: http://pastie.org/private/1dneo1mta5zpsw6gmsoeq Author: Volodymyr Lyubinets <vlyubin@gmail.com> Closes #5433 from vlyubin/joins and squashes the following commits: d70c829 [Volodymyr Lyubinets] Addressed review feedback 527eac6 [Volodymyr Lyubinets] Use kryo-based SparkSqlSerializer for GeneralHashedRelation
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala47
2 files changed, 44 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
index 967bd76b30..347e2f4a1a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
@@ -26,14 +26,13 @@ import scala.reflect.ClassTag
import com.clearspring.analytics.stream.cardinality.HyperLogLog
import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Serializer, Kryo}
-import com.twitter.chill.{AllScalaRegistrar, ResourcePool}
+import com.twitter.chill.ResourcePool
import org.apache.spark.{SparkEnv, SparkConf}
import org.apache.spark.serializer.{SerializerInstance, KryoSerializer}
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.util.collection.OpenHashSet
import org.apache.spark.util.MutablePair
-import org.apache.spark.util.Utils
import org.apache.spark.sql.catalyst.expressions.codegen.{IntegerHashSet, LongHashSet}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 2fa1cf5add..ab84c123e0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -17,9 +17,11 @@
package org.apache.spark.sql.execution.joins
+import java.io.{ObjectInput, ObjectOutput, Externalizable}
import java.util.{HashMap => JavaHashMap}
import org.apache.spark.sql.catalyst.expressions.{Projection, Row}
+import org.apache.spark.sql.execution.SparkSqlSerializer
import org.apache.spark.util.collection.CompactBuffer
@@ -29,16 +31,43 @@ import org.apache.spark.util.collection.CompactBuffer
*/
private[joins] sealed trait HashedRelation {
def get(key: Row): CompactBuffer[Row]
+
+ // This is a helper method to implement Externalizable, and is used by
+ // GeneralHashedRelation and UniqueKeyHashedRelation
+ protected def writeBytes(out: ObjectOutput, serialized: Array[Byte]): Unit = {
+ out.writeInt(serialized.length) // Write the length of serialized bytes first
+ out.write(serialized)
+ }
+
+ // This is a helper method to implement Externalizable, and is used by
+ // GeneralHashedRelation and UniqueKeyHashedRelation
+ protected def readBytes(in: ObjectInput): Array[Byte] = {
+ val serializedSize = in.readInt() // Read the length of serialized bytes first
+ val bytes = new Array[Byte](serializedSize)
+ in.readFully(bytes)
+ bytes
+ }
}
/**
* A general [[HashedRelation]] backed by a hash map that maps the key into a sequence of values.
*/
-private[joins] final class GeneralHashedRelation(hashTable: JavaHashMap[Row, CompactBuffer[Row]])
- extends HashedRelation with Serializable {
+private[joins] final class GeneralHashedRelation(
+ private var hashTable: JavaHashMap[Row, CompactBuffer[Row]])
+ extends HashedRelation with Externalizable {
+
+ def this() = this(null) // Needed for serialization
override def get(key: Row): CompactBuffer[Row] = hashTable.get(key)
+
+ override def writeExternal(out: ObjectOutput): Unit = {
+ writeBytes(out, SparkSqlSerializer.serialize(hashTable))
+ }
+
+ override def readExternal(in: ObjectInput): Unit = {
+ hashTable = SparkSqlSerializer.deserialize(readBytes(in))
+ }
}
@@ -46,8 +75,10 @@ private[joins] final class GeneralHashedRelation(hashTable: JavaHashMap[Row, Com
* A specialized [[HashedRelation]] that maps key into a single value. This implementation
* assumes the key is unique.
*/
-private[joins] final class UniqueKeyHashedRelation(hashTable: JavaHashMap[Row, Row])
- extends HashedRelation with Serializable {
+private[joins] final class UniqueKeyHashedRelation(private var hashTable: JavaHashMap[Row, Row])
+ extends HashedRelation with Externalizable {
+
+ def this() = this(null) // Needed for serialization
override def get(key: Row): CompactBuffer[Row] = {
val v = hashTable.get(key)
@@ -55,6 +86,14 @@ private[joins] final class UniqueKeyHashedRelation(hashTable: JavaHashMap[Row, R
}
def getValue(key: Row): Row = hashTable.get(key)
+
+ override def writeExternal(out: ObjectOutput): Unit = {
+ writeBytes(out, SparkSqlSerializer.serialize(hashTable))
+ }
+
+ override def readExternal(in: ObjectInput): Unit = {
+ hashTable = SparkSqlSerializer.deserialize(readBytes(in))
+ }
}