aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorSean Zhong <seanzhong@databricks.com>2016-09-01 16:31:13 +0800
committerWenchen Fan <wenchen@databricks.com>2016-09-01 16:31:13 +0800
commita18c169fd050e71fdb07b153ae0fa5c410d8de27 (patch)
treebe693af0f087329fd457ad0c238ffb2a3a9f8bab /sql/core
parent536fa911c181958d84f14156f7d57ef5fd68df48 (diff)
downloadspark-a18c169fd050e71fdb07b153ae0fa5c410d8de27.tar.gz
spark-a18c169fd050e71fdb07b153ae0fa5c410d8de27.tar.bz2
spark-a18c169fd050e71fdb07b153ae0fa5c410d8de27.zip
[SPARK-16283][SQL] Implements percentile_approx aggregation function which supports partial aggregation.
## What changes were proposed in this pull request? This PR implements aggregation function `percentile_approx`. Function `percentile_approx` returns the approximate percentile(s) of a column at the given percentage(s). A percentile is a watermark value below which a given percentage of the column values fall. For example, the percentile of column `col` at percentage 50% is the median value of column `col`. ### Syntax: ``` # Returns percentile at a given percentage value. The approximation error can be reduced by increasing parameter accuracy, at the cost of memory. percentile_approx(col, percentage [, accuracy]) # Returns percentile value array at given percentage value array percentile_approx(col, array(percentage1 [, percentage2]...) [, accuracy]) ``` ### Features: 1. This function supports partial aggregation. 2. The memory consumption is bounded. The larger `accuracy` parameter we choose, we smaller error we get. The default accuracy value is 10000, to match with Hive default setting. Choose a smaller value for smaller memory footprint. 3. This function supports window function aggregation. ### Example usages: ``` ## Returns the 25th percentile value, with default accuracy SELECT percentile_approx(col, 0.25) FROM table ## Returns an array of percentile value (25th, 50th, 75th), with default accuracy SELECT percentile_approx(col, array(0.25, 0.5, 0.75)) FROM table ## Returns 25th percentile value, with custom accuracy value 100, larger accuracy parameter yields smaller approximation error SELECT percentile_approx(col, 0.25, 100) FROM table ## Returns the 25th, and 50th percentile values, with custom accuracy value 100 SELECT percentile_approx(col, array(0.25, 0.5), 100) FROM table ``` ### NOTE: 1. The `percentile_approx` implementation is different from Hive, so the result returned on same query maybe slightly different with Hive. This implementation uses `QuantileSummaries` as the underlying probabilistic data structure, and mainly follows paper `Space-efficient Online Computation of Quantile Summaries` by Greenwald, Michael and Khanna, Sanjeev. (http://dx.doi.org/10.1145/375663.375670)` 2. The current implementation of `QuantileSummaries` doesn't support automatic compression. This PR has a rule to do compression automatically at the caller side, but it may not be optimal. ## How was this patch tested? Unit test, and Sql query test. ## Acknowledgement 1. This PR's work in based on lw-lin's PR https://github.com/apache/spark/pull/14298, with improvements like supporting partial aggregation, fixing out of memory issue. Author: Sean Zhong <seanzhong@databricks.com> Closes #14868 from clockfly/appro_percentile_try_2.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala226
1 files changed, 226 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala
new file mode 100644
index 0000000000..37d7c442bb
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY
+import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.PercentileDigest
+import org.apache.spark.sql.test.SharedSQLContext
+
+class ApproximatePercentileQuerySuite extends QueryTest with SharedSQLContext {
+ import testImplicits._
+
+ private val table = "percentile_test"
+
+ test("percentile_approx, single percentile value") {
+ withTempView(table) {
+ (1 to 1000).toDF("col").createOrReplaceTempView(table)
+ checkAnswer(
+ spark.sql(
+ s"""
+ |SELECT
+ | percentile_approx(col, 0.25),
+ | percentile_approx(col, 0.5),
+ | percentile_approx(col, 0.75d),
+ | percentile_approx(col, 0.0),
+ | percentile_approx(col, 1.0),
+ | percentile_approx(col, 0),
+ | percentile_approx(col, 1)
+ |FROM $table
+ """.stripMargin),
+ Row(250D, 500D, 750D, 1D, 1000D, 1D, 1000D)
+ )
+ }
+ }
+
+ test("percentile_approx, array of percentile value") {
+ withTempView(table) {
+ (1 to 1000).toDF("col").createOrReplaceTempView(table)
+ checkAnswer(
+ spark.sql(
+ s"""SELECT
+ | percentile_approx(col, array(0.25, 0.5, 0.75D)),
+ | count(col),
+ | percentile_approx(col, array(0.0, 1.0)),
+ | sum(col)
+ |FROM $table
+ """.stripMargin),
+ Row(Seq(250D, 500D, 750D), 1000, Seq(1D, 1000D), 500500)
+ )
+ }
+ }
+
+ test("percentile_approx, with different accuracies") {
+
+ withTempView(table) {
+ (1 to 1000).toDF("col").createOrReplaceTempView(table)
+
+ // With different accuracies
+ val expectedPercentile = 250D
+ val accuracies = Array(1, 10, 100, 1000, 10000)
+ val errors = accuracies.map { accuracy =>
+ val df = spark.sql(s"SELECT percentile_approx(col, 0.25, $accuracy) FROM $table")
+ val approximatePercentile = df.collect().head.getDouble(0)
+ val error = Math.abs(approximatePercentile - expectedPercentile)
+ error
+ }
+
+ // The larger accuracy value we use, the smaller error we get
+ assert(errors.sorted.sameElements(errors.reverse))
+ }
+ }
+
+ test("percentile_approx, supports constant folding for parameter accuracy and percentages") {
+ withTempView(table) {
+ (1 to 1000).toDF("col").createOrReplaceTempView(table)
+ checkAnswer(
+ spark.sql(s"SELECT percentile_approx(col, array(0.25 + 0.25D), 200 + 800D) FROM $table"),
+ Row(Seq(500D))
+ )
+ }
+ }
+
+ test("percentile_approx(), aggregation on empty input table, no group by") {
+ withTempView(table) {
+ Seq.empty[Int].toDF("col").createOrReplaceTempView(table)
+ checkAnswer(
+ spark.sql(s"SELECT sum(col), percentile_approx(col, 0.5) FROM $table"),
+ Row(null, null)
+ )
+ }
+ }
+
+ test("percentile_approx(), aggregation on empty input table, with group by") {
+ withTempView(table) {
+ Seq.empty[Int].toDF("col").createOrReplaceTempView(table)
+ checkAnswer(
+ spark.sql(s"SELECT sum(col), percentile_approx(col, 0.5) FROM $table GROUP BY col"),
+ Seq.empty[Row]
+ )
+ }
+ }
+
+ test("percentile_approx(null), aggregation with group by") {
+ withTempView(table) {
+ (1 to 1000).map(x => (x % 3, x)).toDF("key", "value").createOrReplaceTempView(table)
+ checkAnswer(
+ spark.sql(
+ s"""SELECT
+ | key,
+ | percentile_approx(null, 0.5)
+ |FROM $table
+ |GROUP BY key
+ """.stripMargin),
+ Seq(
+ Row(0, null),
+ Row(1, null),
+ Row(2, null))
+ )
+ }
+ }
+
+ test("percentile_approx(null), aggregation without group by") {
+ withTempView(table) {
+ (1 to 1000).map(x => (x % 3, x)).toDF("key", "value").createOrReplaceTempView(table)
+ checkAnswer(
+ spark.sql(
+ s"""SELECT
+ | percentile_approx(null, 0.5),
+ | sum(null),
+ | percentile_approx(null, 0.5)
+ |FROM $table
+ """.stripMargin),
+ Row(null, null, null)
+ )
+ }
+ }
+
+ test("percentile_approx(col, ...), input rows contains null, with out group by") {
+ withTempView(table) {
+ (1 to 1000).map(new Integer(_)).flatMap(Seq(null: Integer, _)).toDF("col")
+ .createOrReplaceTempView(table)
+ checkAnswer(
+ spark.sql(
+ s"""SELECT
+ | percentile_approx(col, 0.5),
+ | sum(null),
+ | percentile_approx(col, 0.5)
+ |FROM $table
+ """.stripMargin),
+ Row(500D, null, 500D))
+ }
+ }
+
+ test("percentile_approx(col, ...), input rows contains null, with group by") {
+ withTempView(table) {
+ val rand = new java.util.Random()
+ (1 to 1000)
+ .map(new Integer(_))
+ .map(v => (new Integer(v % 2), v))
+ // Add some nulls
+ .flatMap(Seq(_, (null: Integer, null: Integer)))
+ .toDF("key", "value").createOrReplaceTempView(table)
+ checkAnswer(
+ spark.sql(
+ s"""SELECT
+ | percentile_approx(value, 0.5),
+ | sum(value),
+ | percentile_approx(value, 0.5)
+ |FROM $table
+ |GROUP BY key
+ """.stripMargin),
+ Seq(
+ Row(499.0D, 250000, 499.0D),
+ Row(500.0D, 250500, 500.0D),
+ Row(null, null, null))
+ )
+ }
+ }
+
+ test("percentile_approx(col, ...) works in window function") {
+ withTempView(table) {
+ val data = (1 to 10).map(v => (v % 2, v))
+ data.toDF("key", "value").createOrReplaceTempView(table)
+
+ val query = spark.sql(
+ s"""
+ |SElECT percentile_approx(value, 0.5)
+ |OVER
+ | (PARTITION BY key ORDER BY value ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
+ | AS percentile
+ |FROM $table
+ """.stripMargin)
+
+ val expected = data.groupBy(_._1).toSeq.flatMap { group =>
+ val (key, values) = group
+ val sortedValues = values.map(_._2).sorted
+
+ var outputRows = Seq.empty[Row]
+ var i = 0
+
+ val percentile = new PercentileDigest(1.0 / DEFAULT_PERCENTILE_ACCURACY)
+ sortedValues.foreach { value =>
+ percentile.add(value)
+ outputRows :+= Row(percentile.getPercentiles(Array(0.5D)).head)
+ }
+ outputRows
+ }
+
+ checkAnswer(query, expected)
+ }
+ }
+}