aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorZheng RuiFeng <ruifengz@foxmail.com>2017-03-20 18:25:59 -0700
committerXiao Li <gatorsmile@gmail.com>2017-03-20 18:25:59 -0700
commit10691d36de902e3771af20aed40336b4f99de719 (patch)
treed823a3f7e499b99c1447d0bf6a4102d6ece64a2a /sql/catalyst
parentc2d1761a57f5d175913284533b3d0417e8718688 (diff)
downloadspark-10691d36de902e3771af20aed40336b4f99de719.tar.gz
spark-10691d36de902e3771af20aed40336b4f99de719.tar.bz2
spark-10691d36de902e3771af20aed40336b4f99de719.zip
[SPARK-19573][SQL] Make NaN/null handling consistent in approxQuantile
## What changes were proposed in this pull request? update `StatFunctions.multipleApproxQuantiles` to handle NaN/null ## How was this patch tested? existing tests and added tests Author: Zheng RuiFeng <ruifengz@foxmail.com> Closes #16971 from zhengruifeng/quantiles_nan.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala12
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala46
3 files changed, 40 insertions, 21 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala
index db062f1a54..1ec2e4a9e9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala
@@ -245,7 +245,8 @@ object ApproximatePercentile {
val result = new Array[Double](percentages.length)
var i = 0
while (i < percentages.length) {
- result(i) = summaries.query(percentages(i))
+ // Since summaries.count != 0, the query here never return None.
+ result(i) = summaries.query(percentages(i)).get
i += 1
}
result
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
index 04f4ff2a92..af543b04ba 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
@@ -176,17 +176,19 @@ class QuantileSummaries(
* @param quantile the target quantile
* @return
*/
- def query(quantile: Double): Double = {
+ def query(quantile: Double): Option[Double] = {
require(quantile >= 0 && quantile <= 1.0, "quantile should be in the range [0.0, 1.0]")
require(headSampled.isEmpty,
"Cannot operate on an uncompressed summary, call compress() first")
+ if (sampled.isEmpty) return None
+
if (quantile <= relativeError) {
- return sampled.head.value
+ return Some(sampled.head.value)
}
if (quantile >= 1 - relativeError) {
- return sampled.last.value
+ return Some(sampled.last.value)
}
// Target rank
@@ -200,11 +202,11 @@ class QuantileSummaries(
minRank += curSample.g
val maxRank = minRank + curSample.delta
if (maxRank - targetError <= rank && rank <= minRank + targetError) {
- return curSample.value
+ return Some(curSample.value)
}
i += 1
}
- sampled.last.value
+ Some(sampled.last.value)
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
index 5e90970b1b..df579d5ec1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
@@ -55,15 +55,19 @@ class QuantileSummariesSuite extends SparkFunSuite {
}
private def checkQuantile(quant: Double, data: Seq[Double], summary: QuantileSummaries): Unit = {
- val approx = summary.query(quant)
- // The rank of the approximation.
- val rank = data.count(_ < approx) // has to be <, not <= to be exact
- val lower = math.floor((quant - summary.relativeError) * data.size)
- val upper = math.ceil((quant + summary.relativeError) * data.size)
- val msg =
- s"$rank not in [$lower $upper], requested quantile: $quant, approx returned: $approx"
- assert(rank >= lower, msg)
- assert(rank <= upper, msg)
+ if (data.nonEmpty) {
+ val approx = summary.query(quant).get
+ // The rank of the approximation.
+ val rank = data.count(_ < approx) // has to be <, not <= to be exact
+ val lower = math.floor((quant - summary.relativeError) * data.size)
+ val upper = math.ceil((quant + summary.relativeError) * data.size)
+ val msg =
+ s"$rank not in [$lower $upper], requested quantile: $quant, approx returned: $approx"
+ assert(rank >= lower, msg)
+ assert(rank <= upper, msg)
+ } else {
+ assert(summary.query(quant).isEmpty)
+ }
}
for {
@@ -74,9 +78,9 @@ class QuantileSummariesSuite extends SparkFunSuite {
test(s"Extremas with epsi=$epsi and seq=$seq_name, compression=$compression") {
val s = buildSummary(data, epsi, compression)
- val min_approx = s.query(0.0)
+ val min_approx = s.query(0.0).get
assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx")
- val max_approx = s.query(1.0)
+ val max_approx = s.query(1.0).get
assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx")
}
@@ -100,6 +104,18 @@ class QuantileSummariesSuite extends SparkFunSuite {
checkQuantile(0.1, data, s)
checkQuantile(0.001, data, s)
}
+
+ test(s"Tests on empty data with epsi=$epsi and seq=$seq_name, compression=$compression") {
+ val emptyData = Seq.empty[Double]
+ val s = buildSummary(emptyData, epsi, compression)
+ assert(s.count == 0, s"Found count=${s.count} but data size=0")
+ assert(s.sampled.isEmpty, s"if QuantileSummaries is empty, sampled should be empty")
+ checkQuantile(0.9999, emptyData, s)
+ checkQuantile(0.9, emptyData, s)
+ checkQuantile(0.5, emptyData, s)
+ checkQuantile(0.1, emptyData, s)
+ checkQuantile(0.001, emptyData, s)
+ }
}
// Tests for merging procedure
@@ -118,9 +134,9 @@ class QuantileSummariesSuite extends SparkFunSuite {
val s1 = buildSummary(data1, epsi, compression)
val s2 = buildSummary(data2, epsi, compression)
val s = s1.merge(s2)
- val min_approx = s.query(0.0)
+ val min_approx = s.query(0.0).get
assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx")
- val max_approx = s.query(1.0)
+ val max_approx = s.query(1.0).get
assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx")
checkQuantile(0.9999, data, s)
checkQuantile(0.9, data, s)
@@ -137,9 +153,9 @@ class QuantileSummariesSuite extends SparkFunSuite {
val s1 = buildSummary(data11, epsi, compression)
val s2 = buildSummary(data12, epsi, compression)
val s = s1.merge(s2)
- val min_approx = s.query(0.0)
+ val min_approx = s.query(0.0).get
assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx")
- val max_approx = s.query(1.0)
+ val max_approx = s.query(1.0).get
assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx")
checkQuantile(0.9999, data, s)
checkQuantile(0.9, data, s)