aboutsummaryrefslogtreecommitdiff
path: root/R/pkg/inst/tests
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@questtec.nl>2016-05-12 13:56:00 -0700
committerReynold Xin <rxin@databricks.com>2016-05-12 13:56:00 -0700
commitbb1362eb3b36b553dca246b95f59ba7fd8adcc8a (patch)
treed47608b04e9bc54f6a77cf6648f157982aa8788a /R/pkg/inst/tests
parenta57aadae84aca27e5f02ac0bd64fd0ea34a64b61 (diff)
downloadspark-bb1362eb3b36b553dca246b95f59ba7fd8adcc8a.tar.gz
spark-bb1362eb3b36b553dca246b95f59ba7fd8adcc8a.tar.bz2
spark-bb1362eb3b36b553dca246b95f59ba7fd8adcc8a.zip
[SPARK-10605][SQL] Create native collect_list/collect_set aggregates
## What changes were proposed in this pull request? We currently use the Hive implementations for the collect_list/collect_set aggregate functions. This has a few major drawbacks: the use of HiveUDAF (which has quite a bit of overhead) and the lack of support for struct datatypes. This PR adds native implementation of these functions to Spark. The size of the collected list/set may vary, this means we cannot use the fast, Tungsten, aggregation path to perform the aggregation, and that we fallback to the slower sort based path. Another big issue with these operators is that when the size of the collected list/set grows too large, we can start experiencing large GC pauzes and OOMEs. This `collect*` aggregates implemented in this PR rely on the sort based aggregate path for correctness. They maintain their own internal buffer which holds the rows for one group at a time. The sortbased aggregation path is triggered by disabling `partialAggregation` for these aggregates (which is kinda funny); this technique is also employed in `org.apache.spark.sql.hiveHiveUDAFFunction`. I have done some performance testing: ```scala import org.apache.spark.sql.{Dataset, Row} sql("create function collect_list2 as 'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList'") val df = range(0, 10000000).select($"id", (rand(213123L) * 100000).cast("int").as("grp")) df.select(countDistinct($"grp")).show def benchmark(name: String, plan: Dataset[Row], maxItr: Int = 5): Unit = { // Do not measure planning. plan1.queryExecution.executedPlan // Execute the plan a number of times and average the result. val start = System.nanoTime var i = 0 while (i < maxItr) { plan.rdd.foreach(row => Unit) i += 1 } val time = (System.nanoTime - start) / (maxItr * 1000000L) println(s"[$name] $maxItr iterations completed in an average time of $time ms.") } val plan1 = df.groupBy($"grp").agg(collect_list($"id")) val plan2 = df.groupBy($"grp").agg(callUDF("collect_list2", $"id")) benchmark("Spark collect_list", plan1) ... > [Spark collect_list] 5 iterations completed in an average time of 3371 ms. benchmark("Hive collect_list", plan2) ... > [Hive collect_list] 5 iterations completed in an average time of 9109 ms. ``` Performance is improved by a factor 2-3. ## How was this patch tested? Added tests to `DataFrameAggregateSuite`. Author: Herman van Hovell <hvanhovell@questtec.nl> Closes #12874 from hvanhovell/implode.
Diffstat (limited to 'R/pkg/inst/tests')
0 files changed, 0 insertions, 0 deletions