diff options
author | Zheng RuiFeng <ruifengz@foxmail.com> | 2017-03-20 18:25:59 -0700 |
---|---|---|
committer | Xiao Li <gatorsmile@gmail.com> | 2017-03-20 18:25:59 -0700 |
commit | 10691d36de902e3771af20aed40336b4f99de719 (patch) | |
tree | d823a3f7e499b99c1447d0bf6a4102d6ece64a2a /sql/catalyst | |
parent | c2d1761a57f5d175913284533b3d0417e8718688 (diff) | |
download | spark-10691d36de902e3771af20aed40336b4f99de719.tar.gz spark-10691d36de902e3771af20aed40336b4f99de719.tar.bz2 spark-10691d36de902e3771af20aed40336b4f99de719.zip |
[SPARK-19573][SQL] Make NaN/null handling consistent in approxQuantile
## What changes were proposed in this pull request?
update `StatFunctions.multipleApproxQuantiles` to handle NaN/null
## How was this patch tested?
existing tests and added tests
Author: Zheng RuiFeng <ruifengz@foxmail.com>
Closes #16971 from zhengruifeng/quantiles_nan.
Diffstat (limited to 'sql/catalyst')
3 files changed, 40 insertions, 21 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala index db062f1a54..1ec2e4a9e9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala @@ -245,7 +245,8 @@ object ApproximatePercentile { val result = new Array[Double](percentages.length) var i = 0 while (i < percentages.length) { - result(i) = summaries.query(percentages(i)) + // Since summaries.count != 0, the query here never return None. + result(i) = summaries.query(percentages(i)).get i += 1 } result diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala index 04f4ff2a92..af543b04ba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala @@ -176,17 +176,19 @@ class QuantileSummaries( * @param quantile the target quantile * @return */ - def query(quantile: Double): Double = { + def query(quantile: Double): Option[Double] = { require(quantile >= 0 && quantile <= 1.0, "quantile should be in the range [0.0, 1.0]") require(headSampled.isEmpty, "Cannot operate on an uncompressed summary, call compress() first") + if (sampled.isEmpty) return None + if (quantile <= relativeError) { - return sampled.head.value + return Some(sampled.head.value) } if (quantile >= 1 - relativeError) { - return sampled.last.value + return Some(sampled.last.value) } // Target rank @@ -200,11 +202,11 @@ class QuantileSummaries( minRank += curSample.g val maxRank = minRank + curSample.delta if (maxRank - targetError <= rank && rank <= minRank + targetError) { - return curSample.value + return Some(curSample.value) } i += 1 } - sampled.last.value + Some(sampled.last.value) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala index 5e90970b1b..df579d5ec1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala @@ -55,15 +55,19 @@ class QuantileSummariesSuite extends SparkFunSuite { } private def checkQuantile(quant: Double, data: Seq[Double], summary: QuantileSummaries): Unit = { - val approx = summary.query(quant) - // The rank of the approximation. - val rank = data.count(_ < approx) // has to be <, not <= to be exact - val lower = math.floor((quant - summary.relativeError) * data.size) - val upper = math.ceil((quant + summary.relativeError) * data.size) - val msg = - s"$rank not in [$lower $upper], requested quantile: $quant, approx returned: $approx" - assert(rank >= lower, msg) - assert(rank <= upper, msg) + if (data.nonEmpty) { + val approx = summary.query(quant).get + // The rank of the approximation. + val rank = data.count(_ < approx) // has to be <, not <= to be exact + val lower = math.floor((quant - summary.relativeError) * data.size) + val upper = math.ceil((quant + summary.relativeError) * data.size) + val msg = + s"$rank not in [$lower $upper], requested quantile: $quant, approx returned: $approx" + assert(rank >= lower, msg) + assert(rank <= upper, msg) + } else { + assert(summary.query(quant).isEmpty) + } } for { @@ -74,9 +78,9 @@ class QuantileSummariesSuite extends SparkFunSuite { test(s"Extremas with epsi=$epsi and seq=$seq_name, compression=$compression") { val s = buildSummary(data, epsi, compression) - val min_approx = s.query(0.0) + val min_approx = s.query(0.0).get assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx") - val max_approx = s.query(1.0) + val max_approx = s.query(1.0).get assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx") } @@ -100,6 +104,18 @@ class QuantileSummariesSuite extends SparkFunSuite { checkQuantile(0.1, data, s) checkQuantile(0.001, data, s) } + + test(s"Tests on empty data with epsi=$epsi and seq=$seq_name, compression=$compression") { + val emptyData = Seq.empty[Double] + val s = buildSummary(emptyData, epsi, compression) + assert(s.count == 0, s"Found count=${s.count} but data size=0") + assert(s.sampled.isEmpty, s"if QuantileSummaries is empty, sampled should be empty") + checkQuantile(0.9999, emptyData, s) + checkQuantile(0.9, emptyData, s) + checkQuantile(0.5, emptyData, s) + checkQuantile(0.1, emptyData, s) + checkQuantile(0.001, emptyData, s) + } } // Tests for merging procedure @@ -118,9 +134,9 @@ class QuantileSummariesSuite extends SparkFunSuite { val s1 = buildSummary(data1, epsi, compression) val s2 = buildSummary(data2, epsi, compression) val s = s1.merge(s2) - val min_approx = s.query(0.0) + val min_approx = s.query(0.0).get assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx") - val max_approx = s.query(1.0) + val max_approx = s.query(1.0).get assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx") checkQuantile(0.9999, data, s) checkQuantile(0.9, data, s) @@ -137,9 +153,9 @@ class QuantileSummariesSuite extends SparkFunSuite { val s1 = buildSummary(data11, epsi, compression) val s2 = buildSummary(data12, epsi, compression) val s = s1.merge(s2) - val min_approx = s.query(0.0) + val min_approx = s.query(0.0).get assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx") - val max_approx = s.query(1.0) + val max_approx = s.query(1.0).get assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx") checkQuantile(0.9999, data, s) checkQuantile(0.9, data, s) |