diff options
author | Cheng Lian <lian@databricks.com> | 2016-01-25 15:05:05 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-01-25 15:05:05 -0800 |
commit | 6f0f1d9e04a8db47e2f6f8fcfe9dea9de0f633da (patch) | |
tree | e008931434bb051449db9f7fcc8523bb88060b93 /common/sketch/src/test | |
parent | dcae355c64d7f6fdf61df2feefe464eb96c4cf5e (diff) | |
download | spark-6f0f1d9e04a8db47e2f6f8fcfe9dea9de0f633da.tar.gz spark-6f0f1d9e04a8db47e2f6f8fcfe9dea9de0f633da.tar.bz2 spark-6f0f1d9e04a8db47e2f6f8fcfe9dea9de0f633da.zip |
[SPARK-12934][SQL] Count-min sketch serialization
This PR adds serialization support for `CountMinSketch`.
A version number is added to version the serialized binary format.
Author: Cheng Lian <lian@databricks.com>
Closes #10893 from liancheng/cms-serialization.
Diffstat (limited to 'common/sketch/src/test')
-rw-r--r-- | common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala | 47 |
1 files changed, 45 insertions, 2 deletions
diff --git a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala index ec5b4eddec..b9c7f5c23a 100644 --- a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala +++ b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.util.sketch +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} + import scala.reflect.ClassTag import scala.util.Random @@ -29,9 +31,22 @@ class CountMinSketchSuite extends FunSuite { // scalastyle:ignore funsuite private val seed = 42 + // Serializes and deserializes a given `CountMinSketch`, then checks whether the deserialized + // version is equivalent to the original one. + private def checkSerDe(sketch: CountMinSketch): Unit = { + val out = new ByteArrayOutputStream() + sketch.writeTo(out) + + val in = new ByteArrayInputStream(out.toByteArray) + val deserialized = CountMinSketch.readFrom(in) + + assert(sketch === deserialized) + } + def testAccuracy[T: ClassTag](typeName: String)(itemGenerator: Random => T): Unit = { test(s"accuracy - $typeName") { - val r = new Random() + // Uses fixed seed to ensure reproducible test execution + val r = new Random(31) val numAllItems = 1000000 val allItems = Array.fill(numAllItems)(itemGenerator(r)) @@ -45,7 +60,10 @@ class CountMinSketchSuite extends FunSuite { // scalastyle:ignore funsuite } val sketch = CountMinSketch.create(epsOfTotalCount, confidence, seed) + checkSerDe(sketch) + sampledItemIndices.foreach(i => sketch.add(allItems(i))) + checkSerDe(sketch) val probCorrect = { val numErrors = allItems.map { item => @@ -66,7 +84,9 @@ class CountMinSketchSuite extends FunSuite { // scalastyle:ignore funsuite def testMergeInPlace[T: ClassTag](typeName: String)(itemGenerator: Random => T): Unit = { test(s"mergeInPlace - $typeName") { - val r = new Random() + // Uses fixed seed to ensure reproducible test execution + val r = new Random(31) + val numToMerge = 5 val numItemsPerSketch = 100000 val perSketchItems = Array.fill(numToMerge, numItemsPerSketch) { @@ -75,11 +95,16 @@ class CountMinSketchSuite extends FunSuite { // scalastyle:ignore funsuite val sketches = perSketchItems.map { items => val sketch = CountMinSketch.create(epsOfTotalCount, confidence, seed) + checkSerDe(sketch) + items.foreach(sketch.add) + checkSerDe(sketch) + sketch } val mergedSketch = sketches.reduce(_ mergeInPlace _) + checkSerDe(mergedSketch) val expectedSketch = { val sketch = CountMinSketch.create(epsOfTotalCount, confidence, seed) @@ -109,4 +134,22 @@ class CountMinSketchSuite extends FunSuite { // scalastyle:ignore funsuite testItemType[Long]("Long") { _.nextLong() } testItemType[String]("String") { r => r.nextString(r.nextInt(20)) } + + test("incompatible merge") { + intercept[IncompatibleMergeException] { + CountMinSketch.create(10, 10, 1).mergeInPlace(null) + } + + intercept[IncompatibleMergeException] { + val sketch1 = CountMinSketch.create(10, 20, 1) + val sketch2 = CountMinSketch.create(10, 20, 2) + sketch1.mergeInPlace(sketch2) + } + + intercept[IncompatibleMergeException] { + val sketch1 = CountMinSketch.create(10, 10, 1) + val sketch2 = CountMinSketch.create(10, 20, 2) + sketch1.mergeInPlace(sketch2) + } + } } |