aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-03-10 15:48:23 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-03-10 15:48:23 -0700
commit91a9d093bdc1d8d0bc26a70fa11d1e9dc3e82820 (patch)
tree7230b510c424b9ca4ddd48aab1c119840b3addaf /core
parent557cfd0f4ded3f78dd52e20bf4f15c7678dbca88 (diff)
parent664e5fd24b64d5062b461a9e4f0b72cec106cdbc (diff)
downloadspark-91a9d093bdc1d8d0bc26a70fa11d1e9dc3e82820.tar.gz
spark-91a9d093bdc1d8d0bc26a70fa11d1e9dc3e82820.tar.bz2
spark-91a9d093bdc1d8d0bc26a70fa11d1e9dc3e82820.zip
Merge pull request #512 from patelh/fix-kryo-serializer
Fix reference bug in Kryo serializer, add test, update version
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/KryoSerializer.scala29
-rw-r--r--core/src/test/scala/spark/KryoSerializerSuite.scala1
2 files changed, 19 insertions, 11 deletions
diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala
index 0bd73e936b..d723ab7b1e 100644
--- a/core/src/main/scala/spark/KryoSerializer.scala
+++ b/core/src/main/scala/spark/KryoSerializer.scala
@@ -157,27 +157,34 @@ class KryoSerializer extends spark.serializer.Serializer with Logging {
// Register maps with a special serializer since they have complex internal structure
class ScalaMapSerializer(buildMap: Array[(Any, Any)] => scala.collection.Map[Any, Any])
- extends KSerializer[Array[(Any, Any)] => scala.collection.Map[Any, Any]] {
+ extends KSerializer[Array[(Any, Any)] => scala.collection.Map[Any, Any]] {
+
+ //hack, look at https://groups.google.com/forum/#!msg/kryo-users/Eu5V4bxCfws/k-8UQ22y59AJ
+ private final val FAKE_REFERENCE = new Object()
override def write(
- kryo: Kryo,
- output: KryoOutput,
- obj: Array[(Any, Any)] => scala.collection.Map[Any, Any]) {
+ kryo: Kryo,
+ output: KryoOutput,
+ obj: Array[(Any, Any)] => scala.collection.Map[Any, Any]) {
val map = obj.asInstanceOf[scala.collection.Map[Any, Any]]
- kryo.writeObject(output, map.size.asInstanceOf[java.lang.Integer])
+ output.writeInt(map.size)
for ((k, v) <- map) {
kryo.writeClassAndObject(output, k)
kryo.writeClassAndObject(output, v)
}
}
override def read (
- kryo: Kryo,
- input: KryoInput,
- cls: Class[Array[(Any, Any)] => scala.collection.Map[Any, Any]])
+ kryo: Kryo,
+ input: KryoInput,
+ cls: Class[Array[(Any, Any)] => scala.collection.Map[Any, Any]])
: Array[(Any, Any)] => scala.collection.Map[Any, Any] = {
- val size = kryo.readObject(input, classOf[java.lang.Integer]).intValue
+ kryo.reference(FAKE_REFERENCE)
+ val size = input.readInt()
val elems = new Array[(Any, Any)](size)
- for (i <- 0 until size)
- elems(i) = (kryo.readClassAndObject(input), kryo.readClassAndObject(input))
+ for (i <- 0 until size) {
+ val k = kryo.readClassAndObject(input)
+ val v = kryo.readClassAndObject(input)
+ elems(i)=(k,v)
+ }
buildMap(elems).asInstanceOf[Array[(Any, Any)] => scala.collection.Map[Any, Any]]
}
}
diff --git a/core/src/test/scala/spark/KryoSerializerSuite.scala b/core/src/test/scala/spark/KryoSerializerSuite.scala
index 06d446ea24..327e2ff848 100644
--- a/core/src/test/scala/spark/KryoSerializerSuite.scala
+++ b/core/src/test/scala/spark/KryoSerializerSuite.scala
@@ -82,6 +82,7 @@ class KryoSerializerSuite extends FunSuite {
check(mutable.HashMap(1 -> "one", 2 -> "two"))
check(mutable.HashMap("one" -> 1, "two" -> 2))
check(List(Some(mutable.HashMap(1->1, 2->2)), None, Some(mutable.HashMap(3->4))))
+ check(List(mutable.HashMap("one" -> 1, "two" -> 2),mutable.HashMap(1->"one",2->"two",3->"three")))
}
test("custom registrator") {