aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorHossein Falaki <falaki@gmail.com>2013-10-18 15:30:45 -0700
committerHossein Falaki <falaki@gmail.com>2013-10-18 15:30:45 -0700
commit2d511ab320a85eccafbb9e51a2183b07114bbaa1 (patch)
treec7e6c9ed84d16dcfadcbabc46e61ab873f7c6d4e /core
parent13227aaa28ba7bb29b94a598b6efd45c7264d78b (diff)
downloadspark-2d511ab320a85eccafbb9e51a2183b07114bbaa1.tar.gz
spark-2d511ab320a85eccafbb9e51a2183b07114bbaa1.tar.bz2
spark-2d511ab320a85eccafbb9e51a2183b07114bbaa1.zip
Made SerializableHyperLogLog Externalizable and added Kryo tests
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala4
2 files changed, 10 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
index 28a8accb33..9cfd41407f 100644
--- a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
+++ b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
@@ -17,26 +17,27 @@
package org.apache.spark.util
-import java.io.{ObjectOutputStream, ObjectInputStream}
+import java.io.{Externalizable, ObjectOutput, ObjectInput}
import com.clearspring.analytics.stream.cardinality.{ICardinality, HyperLogLog}
/**
- * A wrapper around com.clearspring.analytics.stream.cardinality.HyperLogLog that is serializable.
+ * A wrapper around [[com.clearspring.analytics.stream.cardinality.HyperLogLog]] that is serializable.
*/
private[spark]
-class SerializableHyperLogLog(@transient var value: ICardinality) extends Serializable {
+class SerializableHyperLogLog(var value: ICardinality) extends Externalizable {
+ def this() = this(null) // For deserialization
def merge(other: SerializableHyperLogLog) = new SerializableHyperLogLog(value.merge(other.value))
- private def readObject(in: ObjectInputStream) {
+ def readExternal(in: ObjectInput) {
val byteLength = in.readInt()
val bytes = new Array[Byte](byteLength)
in.readFully(bytes)
value = HyperLogLog.Builder.build(bytes)
}
- private def writeObject(out: ObjectOutputStream) {
+ def writeExternal(out: ObjectOutput) {
val bytes = value.getBytes()
out.writeInt(bytes.length)
out.write(bytes)
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index c016c51171..18529710fe 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -172,6 +172,10 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
assert (sc.parallelize( Array((1, 11), (2, 22), (3, 33)) ).collect().head === (1, 11))
}
+ test("kryo with SerializableHyperLogLog") {
+ assert(sc.parallelize( Array(1, 2, 3, 2, 3, 3, 2, 3, 1) ).countDistinct(0.01) === 3)
+ }
+
test("kryo with reduce") {
val control = 1 :: 2 :: Nil
val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_))