aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-03-26 18:19:49 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-03-26 18:19:49 -0700
commite15e57413e07e5d4787514702f735bba0c30cae5 (patch)
treeb20d0dbbfe10507d8ada07f459c25d13a0311e26
parent32cbdfd2887f7a792f360ac3224f8c38cc97d21f (diff)
downloadspark-e15e57413e07e5d4787514702f735bba0c30cae5.tar.gz
spark-e15e57413e07e5d4787514702f735bba0c30cae5.tar.bz2
spark-e15e57413e07e5d4787514702f735bba0c30cae5.zip
[SQL] Add a custom serializer for maps since they do not have a no-arg constructor.
Author: Michael Armbrust <michael@databricks.com> Closes #243 from marmbrus/mapSer and squashes the following commits: 54045f7 [Michael Armbrust] Add a custom serializer for maps since they do not have a no-arg constructor.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala18
1 files changed, 18 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
index 1c3196ae2e..915f551fb2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
@@ -32,6 +32,7 @@ class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) {
kryo.setRegistrationRequired(false)
kryo.register(classOf[MutablePair[_, _]])
kryo.register(classOf[Array[Any]])
+ kryo.register(classOf[scala.collection.immutable.Map$Map1], new MapSerializer)
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow])
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow])
kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]])
@@ -70,3 +71,20 @@ class BigDecimalSerializer extends Serializer[BigDecimal] {
BigDecimal(input.readString())
}
}
+
+/**
+ * Maps do not have a no arg constructor and so cannot be serialized by default. So, we serialize
+ * them as `Array[(k,v)]`.
+ */
+class MapSerializer extends Serializer[Map[_,_]] {
+ def write(kryo: Kryo, output: Output, map: Map[_,_]) {
+ kryo.writeObject(output, map.flatMap(e => Seq(e._1, e._2)).toArray)
+ }
+
+ def read(kryo: Kryo, input: Input, tpe: Class[Map[_,_]]): Map[_,_] = {
+ kryo.readObject(input, classOf[Array[Any]])
+ .sliding(2,2)
+ .map { case Array(k,v) => (k,v) }
+ .toMap
+ }
+}