From cc33460a51d2890fe8f50f5b6b87003d6d210f04 Mon Sep 17 00:00:00 2001 From: Sean Zhong Date: Tue, 23 Aug 2016 14:57:00 +0800 Subject: [SPARK-17188][SQL] Moves class QuantileSummaries to project catalyst for implementing percentile_approx ## What changes were proposed in this pull request? This is a sub-task of [SPARK-16283](https://issues.apache.org/jira/browse/SPARK-16283) (Implement percentile_approx SQL function), which moves class QuantileSummaries to project catalyst so that it can be reused when implementing aggregation function `percentile_approx`. ## How was this patch tested? This PR only does class relocation, class implementation is not changed. Author: Sean Zhong Closes #14754 from clockfly/move_QuantileSummaries_to_catalyst. --- .../sql/catalyst/util/QuantileSummaries.scala | 264 +++++++++++++++++++++ .../sql/catalyst/util/QuantileSummariesSuite.scala | 126 ++++++++++ .../spark/sql/execution/stat/StatFunctions.scala | 247 +------------------ .../sql/execution/stat/ApproxQuantileSuite.scala | 129 ---------- 4 files changed, 391 insertions(+), 375 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala new file mode 100644 index 0000000000..493b5faf9e --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.util.QuantileSummaries.Stats + +/** + * Helper class to compute approximate quantile summary. + * This implementation is based on the algorithm proposed in the paper: + * "Space-efficient Online Computation of Quantile Summaries" by Greenwald, Michael + * and Khanna, Sanjeev. (http://dx.doi.org/10.1145/375663.375670) + * + * In order to optimize for speed, it maintains an internal buffer of the last seen samples, + * and only inserts them after crossing a certain size threshold. This guarantees a near-constant + * runtime complexity compared to the original algorithm. + * + * @param compressThreshold the compression threshold. + * After the internal buffer of statistics crosses this size, it attempts to compress the + * statistics together. + * @param relativeError the target relative error. + * It is uniform across the complete range of values. + * @param sampled a buffer of quantile statistics. + * See the G-K article for more details. + * @param count the count of all the elements *inserted in the sampled buffer* + * (excluding the head buffer) + */ +class QuantileSummaries( + val compressThreshold: Int, + val relativeError: Double, + val sampled: Array[Stats] = Array.empty, + val count: Long = 0L) extends Serializable { + + // a buffer of latest samples seen so far + private val headSampled: ArrayBuffer[Double] = ArrayBuffer.empty + + import QuantileSummaries._ + + /** + * Returns a summary with the given observation inserted into the summary. + * This method may either modify in place the current summary (and return the same summary, + * modified in place), or it may create a new summary from scratch it necessary. + * @param x the new observation to insert into the summary + */ + def insert(x: Double): QuantileSummaries = { + headSampled.append(x) + if (headSampled.size >= defaultHeadSize) { + this.withHeadBufferInserted + } else { + this + } + } + + /** + * Inserts an array of (unsorted samples) in a batch, sorting the array first to traverse + * the summary statistics in a single batch. + * + * This method does not modify the current object and returns if necessary a new copy. + * + * @return a new quantile summary object. + */ + private def withHeadBufferInserted: QuantileSummaries = { + if (headSampled.isEmpty) { + return this + } + var currentCount = count + val sorted = headSampled.toArray.sorted + val newSamples: ArrayBuffer[Stats] = new ArrayBuffer[Stats]() + // The index of the next element to insert + var sampleIdx = 0 + // The index of the sample currently being inserted. + var opsIdx: Int = 0 + while(opsIdx < sorted.length) { + val currentSample = sorted(opsIdx) + // Add all the samples before the next observation. + while(sampleIdx < sampled.size && sampled(sampleIdx).value <= currentSample) { + newSamples.append(sampled(sampleIdx)) + sampleIdx += 1 + } + + // If it is the first one to insert, of if it is the last one + currentCount += 1 + val delta = + if (newSamples.isEmpty || (sampleIdx == sampled.size && opsIdx == sorted.length - 1)) { + 0 + } else { + math.floor(2 * relativeError * currentCount).toInt + } + + val tuple = Stats(currentSample, 1, delta) + newSamples.append(tuple) + opsIdx += 1 + } + + // Add all the remaining existing samples + while(sampleIdx < sampled.size) { + newSamples.append(sampled(sampleIdx)) + sampleIdx += 1 + } + new QuantileSummaries(compressThreshold, relativeError, newSamples.toArray, currentCount) + } + + /** + * Returns a new summary that compresses the summary statistics and the head buffer. + * + * This implements the COMPRESS function of the GK algorithm. It does not modify the object. + * + * @return a new summary object with compressed statistics + */ + def compress(): QuantileSummaries = { + // Inserts all the elements first + val inserted = this.withHeadBufferInserted + assert(inserted.headSampled.isEmpty) + assert(inserted.count == count + headSampled.size) + val compressed = + compressImmut(inserted.sampled, mergeThreshold = 2 * relativeError * inserted.count) + new QuantileSummaries(compressThreshold, relativeError, compressed, inserted.count) + } + + private def shallowCopy: QuantileSummaries = { + new QuantileSummaries(compressThreshold, relativeError, sampled, count) + } + + /** + * Merges two (compressed) summaries together. + * + * Returns a new summary. + */ + def merge(other: QuantileSummaries): QuantileSummaries = { + require(headSampled.isEmpty, "Current buffer needs to be compressed before merge") + require(other.headSampled.isEmpty, "Other buffer needs to be compressed before merge") + if (other.count == 0) { + this.shallowCopy + } else if (count == 0) { + other.shallowCopy + } else { + // Merge the two buffers. + // The GK algorithm is a bit unclear about it, but it seems there is no need to adjust the + // statistics during the merging: the invariants are still respected after the merge. + // TODO: could replace full sort by ordered merge, the two lists are known to be sorted + // already. + val res = (sampled ++ other.sampled).sortBy(_.value) + val comp = compressImmut(res, mergeThreshold = 2 * relativeError * count) + new QuantileSummaries( + other.compressThreshold, other.relativeError, comp, other.count + count) + } + } + + /** + * Runs a query for a given quantile. + * The result follows the approximation guarantees detailed above. + * The query can only be run on a compressed summary: you need to call compress() before using + * it. + * + * @param quantile the target quantile + * @return + */ + def query(quantile: Double): Double = { + require(quantile >= 0 && quantile <= 1.0, "quantile should be in the range [0.0, 1.0]") + require(headSampled.isEmpty, + "Cannot operate on an uncompressed summary, call compress() first") + + if (quantile <= relativeError) { + return sampled.head.value + } + + if (quantile >= 1 - relativeError) { + return sampled.last.value + } + + // Target rank + val rank = math.ceil(quantile * count).toInt + val targetError = math.ceil(relativeError * count) + // Minimum rank at current sample + var minRank = 0 + var i = 1 + while (i < sampled.size - 1) { + val curSample = sampled(i) + minRank += curSample.g + val maxRank = minRank + curSample.delta + if (maxRank - targetError <= rank && rank <= minRank + targetError) { + return curSample.value + } + i += 1 + } + sampled.last.value + } +} + +object QuantileSummaries { + // TODO(tjhunter) more tuning could be done one the constants here, but for now + // the main cost of the algorithm is accessing the data in SQL. + /** + * The default value for the compression threshold. + */ + val defaultCompressThreshold: Int = 10000 + + /** + * The size of the head buffer. + */ + val defaultHeadSize: Int = 50000 + + /** + * The default value for the relative error (1%). + * With this value, the best extreme percentiles that can be approximated are 1% and 99%. + */ + val defaultRelativeError: Double = 0.01 + + /** + * Statistics from the Greenwald-Khanna paper. + * @param value the sampled value + * @param g the minimum rank jump from the previous value's minimum rank + * @param delta the maximum span of the rank. + */ + case class Stats(value: Double, g: Int, delta: Int) + + private def compressImmut( + currentSamples: IndexedSeq[Stats], + mergeThreshold: Double): Array[Stats] = { + if (currentSamples.isEmpty) { + return Array.empty[Stats] + } + val res: ArrayBuffer[Stats] = ArrayBuffer.empty + // Start for the last element, which is always part of the set. + // The head contains the current new head, that may be merged with the current element. + var head = currentSamples.last + var i = currentSamples.size - 2 + // Do not compress the last element + while (i >= 1) { + // The current sample: + val sample1 = currentSamples(i) + // Do we need to compress? + if (sample1.g + head.g + head.delta < mergeThreshold) { + // Do not insert yet, just merge the current element into the head. + head = head.copy(g = head.g + sample1.g) + } else { + // Prepend the current head, and keep the current sample as target for merging. + res.prepend(head) + head = sample1 + } + i -= 1 + } + res.prepend(head) + // If necessary, add the minimum element: + res.prepend(currentSamples.head) + res.toArray + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala new file mode 100644 index 0000000000..89b2a22a3d --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import scala.util.Random + +import org.apache.spark.SparkFunSuite + +class QuantileSummariesSuite extends SparkFunSuite { + + private val r = new Random(1) + private val n = 100 + private val increasing = "increasing" -> (0 until n).map(_.toDouble) + private val decreasing = "decreasing" -> (n until 0 by -1).map(_.toDouble) + private val random = "random" -> Seq.fill(n)(math.ceil(r.nextDouble() * 1000)) + + private def buildSummary( + data: Seq[Double], + epsi: Double, + threshold: Int): QuantileSummaries = { + var summary = new QuantileSummaries(threshold, epsi) + data.foreach { x => + summary = summary.insert(x) + } + summary.compress() + } + + private def checkQuantile(quant: Double, data: Seq[Double], summary: QuantileSummaries): Unit = { + val approx = summary.query(quant) + // The rank of the approximation. + val rank = data.count(_ < approx) // has to be <, not <= to be exact + val lower = math.floor((quant - summary.relativeError) * data.size) + val upper = math.ceil((quant + summary.relativeError) * data.size) + val msg = + s"$rank not in [$lower $upper], requested quantile: $quant, approx returned: $approx" + assert(rank >= lower, msg) + assert(rank <= upper, msg) + } + + for { + (seq_name, data) <- Seq(increasing, decreasing, random) + epsi <- Seq(0.1, 0.0001) + compression <- Seq(1000, 10) + } { + + test(s"Extremas with epsi=$epsi and seq=$seq_name, compression=$compression") { + val s = buildSummary(data, epsi, compression) + val min_approx = s.query(0.0) + assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx") + val max_approx = s.query(1.0) + assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx") + } + + test(s"Some quantile values with epsi=$epsi and seq=$seq_name, compression=$compression") { + val s = buildSummary(data, epsi, compression) + assert(s.count == data.size, s"Found count=${s.count} but data size=${data.size}") + checkQuantile(0.9999, data, s) + checkQuantile(0.9, data, s) + checkQuantile(0.5, data, s) + checkQuantile(0.1, data, s) + checkQuantile(0.001, data, s) + } + } + + // Tests for merging procedure + for { + (seq_name, data) <- Seq(increasing, decreasing, random) + epsi <- Seq(0.1, 0.0001) + compression <- Seq(1000, 10) + } { + + val (data1, data2) = { + val l = data.size + data.take(l / 2) -> data.drop(l / 2) + } + + test(s"Merging ordered lists with epsi=$epsi and seq=$seq_name, compression=$compression") { + val s1 = buildSummary(data1, epsi, compression) + val s2 = buildSummary(data2, epsi, compression) + val s = s1.merge(s2) + val min_approx = s.query(0.0) + assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx") + val max_approx = s.query(1.0) + assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx") + checkQuantile(0.9999, data, s) + checkQuantile(0.9, data, s) + checkQuantile(0.5, data, s) + checkQuantile(0.1, data, s) + checkQuantile(0.001, data, s) + } + + val (data11, data12) = { + data.sliding(2).map(_.head).toSeq -> data.sliding(2).map(_.last).toSeq + } + + test(s"Merging interleaved lists with epsi=$epsi and seq=$seq_name, compression=$compression") { + val s1 = buildSummary(data11, epsi, compression) + val s2 = buildSummary(data12, epsi, compression) + val s = s1.merge(s2) + val min_approx = s.query(0.0) + assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx") + val max_approx = s.query(1.0) + assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx") + checkQuantile(0.9999, data, s) + checkQuantile(0.9, data, s) + checkQuantile(0.5, data, s) + checkQuantile(0.1, data, s) + checkQuantile(0.001, data, s) + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index 7c58c4897f..822f49ecab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -17,20 +17,17 @@ package org.apache.spark.sql.execution.stat -import scala.collection.mutable.ArrayBuffer - import org.apache.spark.internal.Logging import org.apache.spark.sql.{Column, DataFrame, Dataset, Row} import org.apache.spark.sql.catalyst.expressions.{Cast, GenericMutableRow} import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.catalyst.util.QuantileSummaries import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String object StatFunctions extends Logging { - import QuantileSummaries.Stats - /** * Calculates the approximate quantiles of multiple numerical columns of a DataFrame in one pass. * @@ -95,248 +92,6 @@ object StatFunctions extends Logging { summaries.map { summary => probabilities.map(summary.query) } } - /** - * Helper class to compute approximate quantile summary. - * This implementation is based on the algorithm proposed in the paper: - * "Space-efficient Online Computation of Quantile Summaries" by Greenwald, Michael - * and Khanna, Sanjeev. (http://dx.doi.org/10.1145/375663.375670) - * - * In order to optimize for speed, it maintains an internal buffer of the last seen samples, - * and only inserts them after crossing a certain size threshold. This guarantees a near-constant - * runtime complexity compared to the original algorithm. - * - * @param compressThreshold the compression threshold. - * After the internal buffer of statistics crosses this size, it attempts to compress the - * statistics together. - * @param relativeError the target relative error. - * It is uniform across the complete range of values. - * @param sampled a buffer of quantile statistics. - * See the G-K article for more details. - * @param count the count of all the elements *inserted in the sampled buffer* - * (excluding the head buffer) - */ - class QuantileSummaries( - val compressThreshold: Int, - val relativeError: Double, - val sampled: Array[Stats] = Array.empty, - val count: Long = 0L) extends Serializable { - - // a buffer of latest samples seen so far - private val headSampled: ArrayBuffer[Double] = ArrayBuffer.empty - - import QuantileSummaries._ - - /** - * Returns a summary with the given observation inserted into the summary. - * This method may either modify in place the current summary (and return the same summary, - * modified in place), or it may create a new summary from scratch it necessary. - * @param x the new observation to insert into the summary - */ - def insert(x: Double): QuantileSummaries = { - headSampled.append(x) - if (headSampled.size >= defaultHeadSize) { - this.withHeadBufferInserted - } else { - this - } - } - - /** - * Inserts an array of (unsorted samples) in a batch, sorting the array first to traverse - * the summary statistics in a single batch. - * - * This method does not modify the current object and returns if necessary a new copy. - * - * @return a new quantile summary object. - */ - private def withHeadBufferInserted: QuantileSummaries = { - if (headSampled.isEmpty) { - return this - } - var currentCount = count - val sorted = headSampled.toArray.sorted - val newSamples: ArrayBuffer[Stats] = new ArrayBuffer[Stats]() - // The index of the next element to insert - var sampleIdx = 0 - // The index of the sample currently being inserted. - var opsIdx: Int = 0 - while(opsIdx < sorted.length) { - val currentSample = sorted(opsIdx) - // Add all the samples before the next observation. - while(sampleIdx < sampled.size && sampled(sampleIdx).value <= currentSample) { - newSamples.append(sampled(sampleIdx)) - sampleIdx += 1 - } - - // If it is the first one to insert, of if it is the last one - currentCount += 1 - val delta = - if (newSamples.isEmpty || (sampleIdx == sampled.size && opsIdx == sorted.length - 1)) { - 0 - } else { - math.floor(2 * relativeError * currentCount).toInt - } - - val tuple = Stats(currentSample, 1, delta) - newSamples.append(tuple) - opsIdx += 1 - } - - // Add all the remaining existing samples - while(sampleIdx < sampled.size) { - newSamples.append(sampled(sampleIdx)) - sampleIdx += 1 - } - new QuantileSummaries(compressThreshold, relativeError, newSamples.toArray, currentCount) - } - - /** - * Returns a new summary that compresses the summary statistics and the head buffer. - * - * This implements the COMPRESS function of the GK algorithm. It does not modify the object. - * - * @return a new summary object with compressed statistics - */ - def compress(): QuantileSummaries = { - // Inserts all the elements first - val inserted = this.withHeadBufferInserted - assert(inserted.headSampled.isEmpty) - assert(inserted.count == count + headSampled.size) - val compressed = - compressImmut(inserted.sampled, mergeThreshold = 2 * relativeError * inserted.count) - new QuantileSummaries(compressThreshold, relativeError, compressed, inserted.count) - } - - private def shallowCopy: QuantileSummaries = { - new QuantileSummaries(compressThreshold, relativeError, sampled, count) - } - - /** - * Merges two (compressed) summaries together. - * - * Returns a new summary. - */ - def merge(other: QuantileSummaries): QuantileSummaries = { - require(headSampled.isEmpty, "Current buffer needs to be compressed before merge") - require(other.headSampled.isEmpty, "Other buffer needs to be compressed before merge") - if (other.count == 0) { - this.shallowCopy - } else if (count == 0) { - other.shallowCopy - } else { - // Merge the two buffers. - // The GK algorithm is a bit unclear about it, but it seems there is no need to adjust the - // statistics during the merging: the invariants are still respected after the merge. - // TODO: could replace full sort by ordered merge, the two lists are known to be sorted - // already. - val res = (sampled ++ other.sampled).sortBy(_.value) - val comp = compressImmut(res, mergeThreshold = 2 * relativeError * count) - new QuantileSummaries( - other.compressThreshold, other.relativeError, comp, other.count + count) - } - } - - /** - * Runs a query for a given quantile. - * The result follows the approximation guarantees detailed above. - * The query can only be run on a compressed summary: you need to call compress() before using - * it. - * - * @param quantile the target quantile - * @return - */ - def query(quantile: Double): Double = { - require(quantile >= 0 && quantile <= 1.0, "quantile should be in the range [0.0, 1.0]") - require(headSampled.isEmpty, - "Cannot operate on an uncompressed summary, call compress() first") - - if (quantile <= relativeError) { - return sampled.head.value - } - - if (quantile >= 1 - relativeError) { - return sampled.last.value - } - - // Target rank - val rank = math.ceil(quantile * count).toInt - val targetError = math.ceil(relativeError * count) - // Minimum rank at current sample - var minRank = 0 - var i = 1 - while (i < sampled.size - 1) { - val curSample = sampled(i) - minRank += curSample.g - val maxRank = minRank + curSample.delta - if (maxRank - targetError <= rank && rank <= minRank + targetError) { - return curSample.value - } - i += 1 - } - sampled.last.value - } - } - - object QuantileSummaries { - // TODO(tjhunter) more tuning could be done one the constants here, but for now - // the main cost of the algorithm is accessing the data in SQL. - /** - * The default value for the compression threshold. - */ - val defaultCompressThreshold: Int = 10000 - - /** - * The size of the head buffer. - */ - val defaultHeadSize: Int = 50000 - - /** - * The default value for the relative error (1%). - * With this value, the best extreme percentiles that can be approximated are 1% and 99%. - */ - val defaultRelativeError: Double = 0.01 - - /** - * Statistics from the Greenwald-Khanna paper. - * @param value the sampled value - * @param g the minimum rank jump from the previous value's minimum rank - * @param delta the maximum span of the rank. - */ - case class Stats(value: Double, g: Int, delta: Int) - - private def compressImmut( - currentSamples: IndexedSeq[Stats], - mergeThreshold: Double): Array[Stats] = { - if (currentSamples.isEmpty) { - return Array.empty[Stats] - } - val res: ArrayBuffer[Stats] = ArrayBuffer.empty - // Start for the last element, which is always part of the set. - // The head contains the current new head, that may be merged with the current element. - var head = currentSamples.last - var i = currentSamples.size - 2 - // Do not compress the last element - while (i >= 1) { - // The current sample: - val sample1 = currentSamples(i) - // Do we need to compress? - if (sample1.g + head.g + head.delta < mergeThreshold) { - // Do not insert yet, just merge the current element into the head. - head = head.copy(g = head.g + sample1.g) - } else { - // Prepend the current head, and keep the current sample as target for merging. - res.prepend(head) - head = sample1 - } - i -= 1 - } - res.prepend(head) - // If necessary, add the minimum element: - res.prepend(currentSamples.head) - res.toArray - } - } - /** Calculate the Pearson Correlation Coefficient for the given columns */ def pearsonCorrelation(df: DataFrame, cols: Seq[String]): Double = { val counts = collectStatisticalData(df, cols, "correlation") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala deleted file mode 100644 index 0a989d026c..0000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.stat - -import scala.util.Random - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.execution.stat.StatFunctions.QuantileSummaries - - -class ApproxQuantileSuite extends SparkFunSuite { - - private val r = new Random(1) - private val n = 100 - private val increasing = "increasing" -> (0 until n).map(_.toDouble) - private val decreasing = "decreasing" -> (n until 0 by -1).map(_.toDouble) - private val random = "random" -> Seq.fill(n)(math.ceil(r.nextDouble() * 1000)) - - private def buildSummary( - data: Seq[Double], - epsi: Double, - threshold: Int): QuantileSummaries = { - var summary = new QuantileSummaries(threshold, epsi) - data.foreach { x => - summary = summary.insert(x) - } - summary.compress() - } - - private def checkQuantile(quant: Double, data: Seq[Double], summary: QuantileSummaries): Unit = { - val approx = summary.query(quant) - // The rank of the approximation. - val rank = data.count(_ < approx) // has to be <, not <= to be exact - val lower = math.floor((quant - summary.relativeError) * data.size) - val upper = math.ceil((quant + summary.relativeError) * data.size) - val msg = - s"$rank not in [$lower $upper], requested quantile: $quant, approx returned: $approx" - assert(rank >= lower, msg) - assert(rank <= upper, msg) - } - - for { - (seq_name, data) <- Seq(increasing, decreasing, random) - epsi <- Seq(0.1, 0.0001) - compression <- Seq(1000, 10) - } { - - test(s"Extremas with epsi=$epsi and seq=$seq_name, compression=$compression") { - val s = buildSummary(data, epsi, compression) - val min_approx = s.query(0.0) - assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx") - val max_approx = s.query(1.0) - assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx") - } - - test(s"Some quantile values with epsi=$epsi and seq=$seq_name, compression=$compression") { - val s = buildSummary(data, epsi, compression) - assert(s.count == data.size, s"Found count=${s.count} but data size=${data.size}") - checkQuantile(0.9999, data, s) - checkQuantile(0.9, data, s) - checkQuantile(0.5, data, s) - checkQuantile(0.1, data, s) - checkQuantile(0.001, data, s) - } - } - - // Tests for merging procedure - for { - (seq_name, data) <- Seq(increasing, decreasing, random) - epsi <- Seq(0.1, 0.0001) - compression <- Seq(1000, 10) - } { - - val (data1, data2) = { - val l = data.size - data.take(l / 2) -> data.drop(l / 2) - } - - test(s"Merging ordered lists with epsi=$epsi and seq=$seq_name, compression=$compression") { - val s1 = buildSummary(data1, epsi, compression) - val s2 = buildSummary(data2, epsi, compression) - val s = s1.merge(s2) - val min_approx = s.query(0.0) - assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx") - val max_approx = s.query(1.0) - assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx") - checkQuantile(0.9999, data, s) - checkQuantile(0.9, data, s) - checkQuantile(0.5, data, s) - checkQuantile(0.1, data, s) - checkQuantile(0.001, data, s) - } - - val (data11, data12) = { - data.sliding(2).map(_.head).toSeq -> data.sliding(2).map(_.last).toSeq - } - - test(s"Merging interleaved lists with epsi=$epsi and seq=$seq_name, compression=$compression") { - val s1 = buildSummary(data11, epsi, compression) - val s2 = buildSummary(data12, epsi, compression) - val s = s1.merge(s2) - val min_approx = s.query(0.0) - assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx") - val max_approx = s.query(1.0) - assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx") - checkQuantile(0.9999, data, s) - checkQuantile(0.9, data, s) - checkQuantile(0.5, data, s) - checkQuantile(0.1, data, s) - checkQuantile(0.001, data, s) - } - } - -} -- cgit v1.2.3