aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src
diff options
context:
space:
mode:
authorTimothy Hunter <timhunter@databricks.com>2016-09-11 08:03:45 +0100
committerSean Owen <sowen@cloudera.com>2016-09-11 08:03:45 +0100
commit180796ecb3a00facde2d98affdb5aa38dd258875 (patch)
treec082d3431c1a266e9526b480188324a19186b7da /sql/catalyst/src
parent29ba9578f44c7caa8451386cee1f03f4e0ed8fc7 (diff)
downloadspark-180796ecb3a00facde2d98affdb5aa38dd258875.tar.gz
spark-180796ecb3a00facde2d98affdb5aa38dd258875.tar.bz2
spark-180796ecb3a00facde2d98affdb5aa38dd258875.zip
[SPARK-17439][SQL] Fixing compression issues with approximate quantiles and adding more tests
## What changes were proposed in this pull request? This PR build on #14976 and fixes a correctness bug that would cause the wrong quantile to be returned for small target errors. ## How was this patch tested? This PR adds 8 unit tests that were failing without the fix. Author: Timothy Hunter <timhunter@databricks.com> Author: Sean Owen <sowen@cloudera.com> Closes #15002 from thunterdb/ml-1783.
Diffstat (limited to 'sql/catalyst/src')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala16
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala29
2 files changed, 39 insertions, 6 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
index 7512ace188..fd62bd511f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.util
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import org.apache.spark.sql.catalyst.util.QuantileSummaries.Stats
@@ -61,7 +61,12 @@ class QuantileSummaries(
def insert(x: Double): QuantileSummaries = {
headSampled += x
if (headSampled.size >= defaultHeadSize) {
- this.withHeadBufferInserted
+ val result = this.withHeadBufferInserted
+ if (result.sampled.length >= compressThreshold) {
+ result.compress()
+ } else {
+ result
+ }
} else {
this
}
@@ -236,7 +241,7 @@ object QuantileSummaries {
if (currentSamples.isEmpty) {
return Array.empty[Stats]
}
- val res: ArrayBuffer[Stats] = ArrayBuffer.empty
+ val res = ListBuffer.empty[Stats]
// Start for the last element, which is always part of the set.
// The head contains the current new head, that may be merged with the current element.
var head = currentSamples.last
@@ -258,7 +263,10 @@ object QuantileSummaries {
}
res.prepend(head)
// If necessary, add the minimum element:
- res.prepend(currentSamples.head)
+ val currHead = currentSamples.head
+ if (currHead.value < head.value) {
+ res.prepend(currentSamples.head)
+ }
res.toArray
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
index 89b2a22a3d..5e90970b1b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
@@ -40,6 +40,20 @@ class QuantileSummariesSuite extends SparkFunSuite {
summary.compress()
}
+ /**
+ * Interleaves compression and insertions.
+ */
+ private def buildCompressSummary(
+ data: Seq[Double],
+ epsi: Double,
+ threshold: Int): QuantileSummaries = {
+ var summary = new QuantileSummaries(threshold, epsi)
+ data.foreach { x =>
+ summary = summary.insert(x).compress()
+ }
+ summary
+ }
+
private def checkQuantile(quant: Double, data: Seq[Double], summary: QuantileSummaries): Unit = {
val approx = summary.query(quant)
// The rank of the approximation.
@@ -54,8 +68,8 @@ class QuantileSummariesSuite extends SparkFunSuite {
for {
(seq_name, data) <- Seq(increasing, decreasing, random)
- epsi <- Seq(0.1, 0.0001)
- compression <- Seq(1000, 10)
+ epsi <- Seq(0.1, 0.0001) // With a significant value and with full precision
+ compression <- Seq(1000, 10) // This interleaves n so that we test without and with compression
} {
test(s"Extremas with epsi=$epsi and seq=$seq_name, compression=$compression") {
@@ -75,6 +89,17 @@ class QuantileSummariesSuite extends SparkFunSuite {
checkQuantile(0.1, data, s)
checkQuantile(0.001, data, s)
}
+
+ test(s"Some quantile values with epsi=$epsi and seq=$seq_name, compression=$compression " +
+ s"(interleaved)") {
+ val s = buildCompressSummary(data, epsi, compression)
+ assert(s.count == data.size, s"Found count=${s.count} but data size=${data.size}")
+ checkQuantile(0.9999, data, s)
+ checkQuantile(0.9, data, s)
+ checkQuantile(0.5, data, s)
+ checkQuantile(0.1, data, s)
+ checkQuantile(0.001, data, s)
+ }
}
// Tests for merging procedure