aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-12-26 22:10:20 +0800
committerWenchen Fan <wenchen@databricks.com>2016-12-26 22:10:20 +0800
commit8a7db8a608a9e27b10f205cc1b4ed5f2c3e83799 (patch)
treed7ce89b11fb0dbb9becb7fbf2e3358afc34fd0b3 /sql/core/src/test
parent7026ee23e0a684e13f9d7dfbb8f85e810106d022 (diff)
downloadspark-8a7db8a608a9e27b10f205cc1b4ed5f2c3e83799.tar.gz
spark-8a7db8a608a9e27b10f205cc1b4ed5f2c3e83799.tar.bz2
spark-8a7db8a608a9e27b10f205cc1b4ed5f2c3e83799.zip
[SPARK-18980][SQL] implement Aggregator with TypedImperativeAggregate
## What changes were proposed in this pull request? Currently we implement `Aggregator` with `DeclarativeAggregate`, which will serialize/deserialize the buffer object every time we process an input. This PR implements `Aggregator` with `TypedImperativeAggregate` and avoids to serialize/deserialize buffer object many times. The benchmark shows we get about 2 times speed up. For simple buffer object that doesn't need serialization, we still go with `DeclarativeAggregate`, to avoid performance regression. ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #16383 from cloud-fan/aggregator.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala6
2 files changed, 10 insertions, 8 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala
index c11605d175..66d94d6016 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala
@@ -192,14 +192,14 @@ object DatasetBenchmark {
benchmark2.run()
/*
- OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 3.10.0-327.18.2.el7.x86_64
- Intel Xeon E3-12xx v2 (Ivy Bridge)
+ Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1
+ Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
aggregate: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
- RDD sum 1420 / 1523 70.4 14.2 1.0X
- DataFrame sum 31 / 49 3214.3 0.3 45.6X
- Dataset sum using Aggregator 3216 / 3257 31.1 32.2 0.4X
- Dataset complex Aggregator 7948 / 8461 12.6 79.5 0.2X
+ RDD sum 1913 / 1942 52.3 19.1 1.0X
+ DataFrame sum 46 / 61 2157.7 0.5 41.3X
+ Dataset sum using Aggregator 4656 / 4758 21.5 46.6 0.4X
+ Dataset complex Aggregator 6636 / 7039 15.1 66.4 0.3X
*/
benchmark3.run()
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala
index 70c39518ab..b76f168220 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala
@@ -240,7 +240,7 @@ object TypedImperativeAggregateSuite {
new MaxValue(Int.MinValue)
}
- override def update(buffer: MaxValue, input: InternalRow): Unit = {
+ override def update(buffer: MaxValue, input: InternalRow): MaxValue = {
child.eval(input) match {
case inputValue: Int =>
if (inputValue > buffer.value) {
@@ -249,13 +249,15 @@ object TypedImperativeAggregateSuite {
}
case null => // skip
}
+ buffer
}
- override def merge(bufferMax: MaxValue, inputMax: MaxValue): Unit = {
+ override def merge(bufferMax: MaxValue, inputMax: MaxValue): MaxValue = {
if (inputMax.value > bufferMax.value) {
bufferMax.value = inputMax.value
bufferMax.isValueSet = bufferMax.isValueSet || inputMax.isValueSet
}
+ bufferMax
}
override def eval(bufferMax: MaxValue): Any = {