From 180796ecb3a00facde2d98affdb5aa38dd258875 Mon Sep 17 00:00:00 2001 From: Timothy Hunter Date: Sun, 11 Sep 2016 08:03:45 +0100 Subject: [SPARK-17439][SQL] Fixing compression issues with approximate quantiles and adding more tests ## What changes were proposed in this pull request? This PR build on #14976 and fixes a correctness bug that would cause the wrong quantile to be returned for small target errors. ## How was this patch tested? This PR adds 8 unit tests that were failing without the fix. Author: Timothy Hunter Author: Sean Owen Closes #15002 from thunterdb/ml-1783. --- .../sql/catalyst/util/QuantileSummaries.scala | 16 +++++++++--- .../sql/catalyst/util/QuantileSummariesSuite.scala | 29 ++++++++++++++++++++-- 2 files changed, 39 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala index 7512ace188..fd62bd511f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.util -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, ListBuffer} import org.apache.spark.sql.catalyst.util.QuantileSummaries.Stats @@ -61,7 +61,12 @@ class QuantileSummaries( def insert(x: Double): QuantileSummaries = { headSampled += x if (headSampled.size >= defaultHeadSize) { - this.withHeadBufferInserted + val result = this.withHeadBufferInserted + if (result.sampled.length >= compressThreshold) { + result.compress() + } else { + result + } } else { this } @@ -236,7 +241,7 @@ object QuantileSummaries { if (currentSamples.isEmpty) { return Array.empty[Stats] } - val res: ArrayBuffer[Stats] = ArrayBuffer.empty + val res = ListBuffer.empty[Stats] // Start for the last element, which is always part of the set. // The head contains the current new head, that may be merged with the current element. var head = currentSamples.last @@ -258,7 +263,10 @@ object QuantileSummaries { } res.prepend(head) // If necessary, add the minimum element: - res.prepend(currentSamples.head) + val currHead = currentSamples.head + if (currHead.value < head.value) { + res.prepend(currentSamples.head) + } res.toArray } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala index 89b2a22a3d..5e90970b1b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala @@ -40,6 +40,20 @@ class QuantileSummariesSuite extends SparkFunSuite { summary.compress() } + /** + * Interleaves compression and insertions. + */ + private def buildCompressSummary( + data: Seq[Double], + epsi: Double, + threshold: Int): QuantileSummaries = { + var summary = new QuantileSummaries(threshold, epsi) + data.foreach { x => + summary = summary.insert(x).compress() + } + summary + } + private def checkQuantile(quant: Double, data: Seq[Double], summary: QuantileSummaries): Unit = { val approx = summary.query(quant) // The rank of the approximation. @@ -54,8 +68,8 @@ class QuantileSummariesSuite extends SparkFunSuite { for { (seq_name, data) <- Seq(increasing, decreasing, random) - epsi <- Seq(0.1, 0.0001) - compression <- Seq(1000, 10) + epsi <- Seq(0.1, 0.0001) // With a significant value and with full precision + compression <- Seq(1000, 10) // This interleaves n so that we test without and with compression } { test(s"Extremas with epsi=$epsi and seq=$seq_name, compression=$compression") { @@ -75,6 +89,17 @@ class QuantileSummariesSuite extends SparkFunSuite { checkQuantile(0.1, data, s) checkQuantile(0.001, data, s) } + + test(s"Some quantile values with epsi=$epsi and seq=$seq_name, compression=$compression " + + s"(interleaved)") { + val s = buildCompressSummary(data, epsi, compression) + assert(s.count == data.size, s"Found count=${s.count} but data size=${data.size}") + checkQuantile(0.9999, data, s) + checkQuantile(0.9, data, s) + checkQuantile(0.5, data, s) + checkQuantile(0.1, data, s) + checkQuantile(0.001, data, s) + } } // Tests for merging procedure -- cgit v1.2.3