aboutsummaryrefslogtreecommitdiff
path: root/.github
diff options
context:
space:
mode:
authorDongjoon Hyun <dongjoon@apache.org>2016-09-30 21:05:06 -0700
committerHerman van Hovell <hvanhovell@databricks.com>2016-09-30 21:05:06 -0700
commitaef506e39a41cfe7198162c324a11ef2f01136c3 (patch)
tree0a246c3bead7b0ed0c095e46880c5367628a38c7 /.github
parenta26afd52198523dbd51dc94053424494638c7de5 (diff)
downloadspark-aef506e39a41cfe7198162c324a11ef2f01136c3.tar.gz
spark-aef506e39a41cfe7198162c324a11ef2f01136c3.tar.bz2
spark-aef506e39a41cfe7198162c324a11ef2f01136c3.zip
[SPARK-17739][SQL] Collapse adjacent similar Window operators
## What changes were proposed in this pull request? Currently, Spark does not collapse adjacent windows with the same partitioning and sorting. This PR implements `CollapseWindow` optimizer to do the followings. 1. If the partition specs and order specs are the same, collapse into the parent. 2. If the partition specs are the same and one order spec is a prefix of the other, collapse to the more specific one. For example: ```scala val df = spark.range(1000).select($"id" % 100 as "grp", $"id", rand() as "col1", rand() as "col2") // Add summary statistics for all columns import org.apache.spark.sql.expressions.Window val cols = Seq("id", "col1", "col2") val window = Window.partitionBy($"grp").orderBy($"id") val result = cols.foldLeft(df) { (base, name) => base.withColumn(s"${name}_avg", avg(col(name)).over(window)) .withColumn(s"${name}_stddev", stddev(col(name)).over(window)) .withColumn(s"${name}_min", min(col(name)).over(window)) .withColumn(s"${name}_max", max(col(name)).over(window)) } ``` **Before** ```scala scala> result.explain == Physical Plan == Window [max(col2#19) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_max#234], [grp#17L], [id#14L ASC NULLS FIRST] +- Window [min(col2#19) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_min#216], [grp#17L], [id#14L ASC NULLS FIRST] +- Window [stddev_samp(col2#19) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_stddev#191], [grp#17L], [id#14L ASC NULLS FIRST] +- Window [avg(col2#19) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_avg#167], [grp#17L], [id#14L ASC NULLS FIRST] +- Window [max(col1#18) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_max#152], [grp#17L], [id#14L ASC NULLS FIRST] +- Window [min(col1#18) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_min#138], [grp#17L], [id#14L ASC NULLS FIRST] +- Window [stddev_samp(col1#18) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_stddev#117], [grp#17L], [id#14L ASC NULLS FIRST] +- Window [avg(col1#18) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_avg#97], [grp#17L], [id#14L ASC NULLS FIRST] +- Window [max(id#14L) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_max#86L], [grp#17L], [id#14L ASC NULLS FIRST] +- Window [min(id#14L) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_min#76L], [grp#17L], [id#14L ASC NULLS FIRST] +- *Project [grp#17L, id#14L, col1#18, col2#19, id_avg#26, id_stddev#42] +- Window [stddev_samp(_w0#59) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_stddev#42], [grp#17L], [id#14L ASC NULLS FIRST] +- *Project [grp#17L, id#14L, col1#18, col2#19, id_avg#26, cast(id#14L as double) AS _w0#59] +- Window [avg(id#14L) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_avg#26], [grp#17L], [id#14L ASC NULLS FIRST] +- *Sort [grp#17L ASC NULLS FIRST, id#14L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(grp#17L, 200) +- *Project [(id#14L % 100) AS grp#17L, id#14L, rand(-6329949029880411066) AS col1#18, rand(-7251358484380073081) AS col2#19] +- *Range (0, 1000, step=1, splits=Some(8)) ``` **After** ```scala scala> result.explain == Physical Plan == Window [max(col2#5) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_max#220, min(col2#5) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_min#202, stddev_samp(col2#5) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_stddev#177, avg(col2#5) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_avg#153, max(col1#4) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_max#138, min(col1#4) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_min#124, stddev_samp(col1#4) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_stddev#103, avg(col1#4) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_avg#83, max(id#0L) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_max#72L, min(id#0L) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_min#62L], [grp#3L], [id#0L ASC NULLS FIRST] +- *Project [grp#3L, id#0L, col1#4, col2#5, id_avg#12, id_stddev#28] +- Window [stddev_samp(_w0#45) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_stddev#28], [grp#3L], [id#0L ASC NULLS FIRST] +- *Project [grp#3L, id#0L, col1#4, col2#5, id_avg#12, cast(id#0L as double) AS _w0#45] +- Window [avg(id#0L) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_avg#12], [grp#3L], [id#0L ASC NULLS FIRST] +- *Sort [grp#3L ASC NULLS FIRST, id#0L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(grp#3L, 200) +- *Project [(id#0L % 100) AS grp#3L, id#0L, rand(6537478539664068821) AS col1#4, rand(-8961093871295252795) AS col2#5] +- *Range (0, 1000, step=1, splits=Some(8)) ``` ## How was this patch tested? Pass the Jenkins tests with a newly added testsuite. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #15317 from dongjoon-hyun/SPARK-17739.
Diffstat (limited to '.github')
0 files changed, 0 insertions, 0 deletions