aboutsummaryrefslogtreecommitdiff
path: root/common/sketch/src/test
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-01-25 15:05:05 -0800
committerReynold Xin <rxin@databricks.com>2016-01-25 15:05:05 -0800
commit6f0f1d9e04a8db47e2f6f8fcfe9dea9de0f633da (patch)
treee008931434bb051449db9f7fcc8523bb88060b93 /common/sketch/src/test
parentdcae355c64d7f6fdf61df2feefe464eb96c4cf5e (diff)
downloadspark-6f0f1d9e04a8db47e2f6f8fcfe9dea9de0f633da.tar.gz
spark-6f0f1d9e04a8db47e2f6f8fcfe9dea9de0f633da.tar.bz2
spark-6f0f1d9e04a8db47e2f6f8fcfe9dea9de0f633da.zip
[SPARK-12934][SQL] Count-min sketch serialization
This PR adds serialization support for `CountMinSketch`. A version number is added to version the serialized binary format. Author: Cheng Lian <lian@databricks.com> Closes #10893 from liancheng/cms-serialization.
Diffstat (limited to 'common/sketch/src/test')
-rw-r--r--common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala47
1 files changed, 45 insertions, 2 deletions
diff --git a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala
index ec5b4eddec..b9c7f5c23a 100644
--- a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala
+++ b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.util.sketch
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+
import scala.reflect.ClassTag
import scala.util.Random
@@ -29,9 +31,22 @@ class CountMinSketchSuite extends FunSuite { // scalastyle:ignore funsuite
private val seed = 42
+ // Serializes and deserializes a given `CountMinSketch`, then checks whether the deserialized
+ // version is equivalent to the original one.
+ private def checkSerDe(sketch: CountMinSketch): Unit = {
+ val out = new ByteArrayOutputStream()
+ sketch.writeTo(out)
+
+ val in = new ByteArrayInputStream(out.toByteArray)
+ val deserialized = CountMinSketch.readFrom(in)
+
+ assert(sketch === deserialized)
+ }
+
def testAccuracy[T: ClassTag](typeName: String)(itemGenerator: Random => T): Unit = {
test(s"accuracy - $typeName") {
- val r = new Random()
+ // Uses fixed seed to ensure reproducible test execution
+ val r = new Random(31)
val numAllItems = 1000000
val allItems = Array.fill(numAllItems)(itemGenerator(r))
@@ -45,7 +60,10 @@ class CountMinSketchSuite extends FunSuite { // scalastyle:ignore funsuite
}
val sketch = CountMinSketch.create(epsOfTotalCount, confidence, seed)
+ checkSerDe(sketch)
+
sampledItemIndices.foreach(i => sketch.add(allItems(i)))
+ checkSerDe(sketch)
val probCorrect = {
val numErrors = allItems.map { item =>
@@ -66,7 +84,9 @@ class CountMinSketchSuite extends FunSuite { // scalastyle:ignore funsuite
def testMergeInPlace[T: ClassTag](typeName: String)(itemGenerator: Random => T): Unit = {
test(s"mergeInPlace - $typeName") {
- val r = new Random()
+ // Uses fixed seed to ensure reproducible test execution
+ val r = new Random(31)
+
val numToMerge = 5
val numItemsPerSketch = 100000
val perSketchItems = Array.fill(numToMerge, numItemsPerSketch) {
@@ -75,11 +95,16 @@ class CountMinSketchSuite extends FunSuite { // scalastyle:ignore funsuite
val sketches = perSketchItems.map { items =>
val sketch = CountMinSketch.create(epsOfTotalCount, confidence, seed)
+ checkSerDe(sketch)
+
items.foreach(sketch.add)
+ checkSerDe(sketch)
+
sketch
}
val mergedSketch = sketches.reduce(_ mergeInPlace _)
+ checkSerDe(mergedSketch)
val expectedSketch = {
val sketch = CountMinSketch.create(epsOfTotalCount, confidence, seed)
@@ -109,4 +134,22 @@ class CountMinSketchSuite extends FunSuite { // scalastyle:ignore funsuite
testItemType[Long]("Long") { _.nextLong() }
testItemType[String]("String") { r => r.nextString(r.nextInt(20)) }
+
+ test("incompatible merge") {
+ intercept[IncompatibleMergeException] {
+ CountMinSketch.create(10, 10, 1).mergeInPlace(null)
+ }
+
+ intercept[IncompatibleMergeException] {
+ val sketch1 = CountMinSketch.create(10, 20, 1)
+ val sketch2 = CountMinSketch.create(10, 20, 2)
+ sketch1.mergeInPlace(sketch2)
+ }
+
+ intercept[IncompatibleMergeException] {
+ val sketch1 = CountMinSketch.create(10, 10, 1)
+ val sketch2 = CountMinSketch.create(10, 20, 2)
+ sketch1.mergeInPlace(sketch2)
+ }
+ }
}