diff options
author | Sean Zhong <seanzhong@databricks.com> | 2016-09-01 16:31:13 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-09-01 16:31:13 +0800 |
commit | a18c169fd050e71fdb07b153ae0fa5c410d8de27 (patch) | |
tree | be693af0f087329fd457ad0c238ffb2a3a9f8bab /sql/core | |
parent | 536fa911c181958d84f14156f7d57ef5fd68df48 (diff) | |
download | spark-a18c169fd050e71fdb07b153ae0fa5c410d8de27.tar.gz spark-a18c169fd050e71fdb07b153ae0fa5c410d8de27.tar.bz2 spark-a18c169fd050e71fdb07b153ae0fa5c410d8de27.zip |
[SPARK-16283][SQL] Implements percentile_approx aggregation function which supports partial aggregation.
## What changes were proposed in this pull request?
This PR implements aggregation function `percentile_approx`. Function `percentile_approx` returns the approximate percentile(s) of a column at the given percentage(s). A percentile is a watermark value below which a given percentage of the column values fall. For example, the percentile of column `col` at percentage 50% is the median value of column `col`.
### Syntax:
```
# Returns percentile at a given percentage value. The approximation error can be reduced by increasing parameter accuracy, at the cost of memory.
percentile_approx(col, percentage [, accuracy])
# Returns percentile value array at given percentage value array
percentile_approx(col, array(percentage1 [, percentage2]...) [, accuracy])
```
### Features:
1. This function supports partial aggregation.
2. The memory consumption is bounded. The larger `accuracy` parameter we choose, we smaller error we get. The default accuracy value is 10000, to match with Hive default setting. Choose a smaller value for smaller memory footprint.
3. This function supports window function aggregation.
### Example usages:
```
## Returns the 25th percentile value, with default accuracy
SELECT percentile_approx(col, 0.25) FROM table
## Returns an array of percentile value (25th, 50th, 75th), with default accuracy
SELECT percentile_approx(col, array(0.25, 0.5, 0.75)) FROM table
## Returns 25th percentile value, with custom accuracy value 100, larger accuracy parameter yields smaller approximation error
SELECT percentile_approx(col, 0.25, 100) FROM table
## Returns the 25th, and 50th percentile values, with custom accuracy value 100
SELECT percentile_approx(col, array(0.25, 0.5), 100) FROM table
```
### NOTE:
1. The `percentile_approx` implementation is different from Hive, so the result returned on same query maybe slightly different with Hive. This implementation uses `QuantileSummaries` as the underlying probabilistic data structure, and mainly follows paper `Space-efficient Online Computation of Quantile Summaries` by Greenwald, Michael and Khanna, Sanjeev. (http://dx.doi.org/10.1145/375663.375670)`
2. The current implementation of `QuantileSummaries` doesn't support automatic compression. This PR has a rule to do compression automatically at the caller side, but it may not be optimal.
## How was this patch tested?
Unit test, and Sql query test.
## Acknowledgement
1. This PR's work in based on lw-lin's PR https://github.com/apache/spark/pull/14298, with improvements like supporting partial aggregation, fixing out of memory issue.
Author: Sean Zhong <seanzhong@databricks.com>
Closes #14868 from clockfly/appro_percentile_try_2.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala | 226 |
1 files changed, 226 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala new file mode 100644 index 0000000000..37d7c442bb --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY +import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.PercentileDigest +import org.apache.spark.sql.test.SharedSQLContext + +class ApproximatePercentileQuerySuite extends QueryTest with SharedSQLContext { + import testImplicits._ + + private val table = "percentile_test" + + test("percentile_approx, single percentile value") { + withTempView(table) { + (1 to 1000).toDF("col").createOrReplaceTempView(table) + checkAnswer( + spark.sql( + s""" + |SELECT + | percentile_approx(col, 0.25), + | percentile_approx(col, 0.5), + | percentile_approx(col, 0.75d), + | percentile_approx(col, 0.0), + | percentile_approx(col, 1.0), + | percentile_approx(col, 0), + | percentile_approx(col, 1) + |FROM $table + """.stripMargin), + Row(250D, 500D, 750D, 1D, 1000D, 1D, 1000D) + ) + } + } + + test("percentile_approx, array of percentile value") { + withTempView(table) { + (1 to 1000).toDF("col").createOrReplaceTempView(table) + checkAnswer( + spark.sql( + s"""SELECT + | percentile_approx(col, array(0.25, 0.5, 0.75D)), + | count(col), + | percentile_approx(col, array(0.0, 1.0)), + | sum(col) + |FROM $table + """.stripMargin), + Row(Seq(250D, 500D, 750D), 1000, Seq(1D, 1000D), 500500) + ) + } + } + + test("percentile_approx, with different accuracies") { + + withTempView(table) { + (1 to 1000).toDF("col").createOrReplaceTempView(table) + + // With different accuracies + val expectedPercentile = 250D + val accuracies = Array(1, 10, 100, 1000, 10000) + val errors = accuracies.map { accuracy => + val df = spark.sql(s"SELECT percentile_approx(col, 0.25, $accuracy) FROM $table") + val approximatePercentile = df.collect().head.getDouble(0) + val error = Math.abs(approximatePercentile - expectedPercentile) + error + } + + // The larger accuracy value we use, the smaller error we get + assert(errors.sorted.sameElements(errors.reverse)) + } + } + + test("percentile_approx, supports constant folding for parameter accuracy and percentages") { + withTempView(table) { + (1 to 1000).toDF("col").createOrReplaceTempView(table) + checkAnswer( + spark.sql(s"SELECT percentile_approx(col, array(0.25 + 0.25D), 200 + 800D) FROM $table"), + Row(Seq(500D)) + ) + } + } + + test("percentile_approx(), aggregation on empty input table, no group by") { + withTempView(table) { + Seq.empty[Int].toDF("col").createOrReplaceTempView(table) + checkAnswer( + spark.sql(s"SELECT sum(col), percentile_approx(col, 0.5) FROM $table"), + Row(null, null) + ) + } + } + + test("percentile_approx(), aggregation on empty input table, with group by") { + withTempView(table) { + Seq.empty[Int].toDF("col").createOrReplaceTempView(table) + checkAnswer( + spark.sql(s"SELECT sum(col), percentile_approx(col, 0.5) FROM $table GROUP BY col"), + Seq.empty[Row] + ) + } + } + + test("percentile_approx(null), aggregation with group by") { + withTempView(table) { + (1 to 1000).map(x => (x % 3, x)).toDF("key", "value").createOrReplaceTempView(table) + checkAnswer( + spark.sql( + s"""SELECT + | key, + | percentile_approx(null, 0.5) + |FROM $table + |GROUP BY key + """.stripMargin), + Seq( + Row(0, null), + Row(1, null), + Row(2, null)) + ) + } + } + + test("percentile_approx(null), aggregation without group by") { + withTempView(table) { + (1 to 1000).map(x => (x % 3, x)).toDF("key", "value").createOrReplaceTempView(table) + checkAnswer( + spark.sql( + s"""SELECT + | percentile_approx(null, 0.5), + | sum(null), + | percentile_approx(null, 0.5) + |FROM $table + """.stripMargin), + Row(null, null, null) + ) + } + } + + test("percentile_approx(col, ...), input rows contains null, with out group by") { + withTempView(table) { + (1 to 1000).map(new Integer(_)).flatMap(Seq(null: Integer, _)).toDF("col") + .createOrReplaceTempView(table) + checkAnswer( + spark.sql( + s"""SELECT + | percentile_approx(col, 0.5), + | sum(null), + | percentile_approx(col, 0.5) + |FROM $table + """.stripMargin), + Row(500D, null, 500D)) + } + } + + test("percentile_approx(col, ...), input rows contains null, with group by") { + withTempView(table) { + val rand = new java.util.Random() + (1 to 1000) + .map(new Integer(_)) + .map(v => (new Integer(v % 2), v)) + // Add some nulls + .flatMap(Seq(_, (null: Integer, null: Integer))) + .toDF("key", "value").createOrReplaceTempView(table) + checkAnswer( + spark.sql( + s"""SELECT + | percentile_approx(value, 0.5), + | sum(value), + | percentile_approx(value, 0.5) + |FROM $table + |GROUP BY key + """.stripMargin), + Seq( + Row(499.0D, 250000, 499.0D), + Row(500.0D, 250500, 500.0D), + Row(null, null, null)) + ) + } + } + + test("percentile_approx(col, ...) works in window function") { + withTempView(table) { + val data = (1 to 10).map(v => (v % 2, v)) + data.toDF("key", "value").createOrReplaceTempView(table) + + val query = spark.sql( + s""" + |SElECT percentile_approx(value, 0.5) + |OVER + | (PARTITION BY key ORDER BY value ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) + | AS percentile + |FROM $table + """.stripMargin) + + val expected = data.groupBy(_._1).toSeq.flatMap { group => + val (key, values) = group + val sortedValues = values.map(_._2).sorted + + var outputRows = Seq.empty[Row] + var i = 0 + + val percentile = new PercentileDigest(1.0 / DEFAULT_PERCENTILE_ACCURACY) + sortedValues.foreach { value => + percentile.add(value) + outputRows :+= Row(percentile.getPercentiles(Array(0.5D)).head) + } + outputRows + } + + checkAnswer(query, expected) + } + } +} |