diff options
author | Hossein Falaki <falaki@gmail.com> | 2013-10-18 15:30:45 -0700 |
---|---|---|
committer | Hossein Falaki <falaki@gmail.com> | 2013-10-18 15:30:45 -0700 |
commit | 2d511ab320a85eccafbb9e51a2183b07114bbaa1 (patch) | |
tree | c7e6c9ed84d16dcfadcbabc46e61ab873f7c6d4e /core | |
parent | 13227aaa28ba7bb29b94a598b6efd45c7264d78b (diff) | |
download | spark-2d511ab320a85eccafbb9e51a2183b07114bbaa1.tar.gz spark-2d511ab320a85eccafbb9e51a2183b07114bbaa1.tar.bz2 spark-2d511ab320a85eccafbb9e51a2183b07114bbaa1.zip |
Made SerializableHyperLogLog Externalizable and added Kryo tests
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala | 11 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala | 4 |
2 files changed, 10 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala index 28a8accb33..9cfd41407f 100644 --- a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala +++ b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala @@ -17,26 +17,27 @@ package org.apache.spark.util -import java.io.{ObjectOutputStream, ObjectInputStream} +import java.io.{Externalizable, ObjectOutput, ObjectInput} import com.clearspring.analytics.stream.cardinality.{ICardinality, HyperLogLog} /** - * A wrapper around com.clearspring.analytics.stream.cardinality.HyperLogLog that is serializable. + * A wrapper around [[com.clearspring.analytics.stream.cardinality.HyperLogLog]] that is serializable. */ private[spark] -class SerializableHyperLogLog(@transient var value: ICardinality) extends Serializable { +class SerializableHyperLogLog(var value: ICardinality) extends Externalizable { + def this() = this(null) // For deserialization def merge(other: SerializableHyperLogLog) = new SerializableHyperLogLog(value.merge(other.value)) - private def readObject(in: ObjectInputStream) { + def readExternal(in: ObjectInput) { val byteLength = in.readInt() val bytes = new Array[Byte](byteLength) in.readFully(bytes) value = HyperLogLog.Builder.build(bytes) } - private def writeObject(out: ObjectOutputStream) { + def writeExternal(out: ObjectOutput) { val bytes = value.getBytes() out.writeInt(bytes.length) out.write(bytes) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index c016c51171..18529710fe 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -172,6 +172,10 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { assert (sc.parallelize( Array((1, 11), (2, 22), (3, 33)) ).collect().head === (1, 11)) } + test("kryo with SerializableHyperLogLog") { + assert(sc.parallelize( Array(1, 2, 3, 2, 3, 3, 2, 3, 1) ).countDistinct(0.01) === 3) + } + test("kryo with reduce") { val control = 1 :: 2 :: Nil val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_)) |