diff options
author | Ergin Seyfe <eseyfe@fb.com> | 2016-09-14 09:51:14 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-09-14 09:51:14 +0100 |
commit | 4cea9da2ae88b40a5503111f8f37051e2372163e (patch) | |
tree | e040fbb42d09904fe1123fc3af2069c0a8cbfde2 | |
parent | 18b4f035f40359b3164456d0dab52dbc762ea3b4 (diff) | |
download | spark-4cea9da2ae88b40a5503111f8f37051e2372163e.tar.gz spark-4cea9da2ae88b40a5503111f8f37051e2372163e.tar.bz2 spark-4cea9da2ae88b40a5503111f8f37051e2372163e.zip |
[SPARK-17480][SQL] Improve performance by removing or caching List.length which is O(n)
## What changes were proposed in this pull request?
Scala's List.length method is O(N) and it makes the gatherCompressibilityStats function O(N^2). Eliminate the List.length calls by writing it in Scala way.
https://github.com/scala/scala/blob/2.10.x/src/library/scala/collection/LinearSeqOptimized.scala#L36
As suggested. Extended the fix to HiveInspectors and AggregationIterator classes as well.
## How was this patch tested?
Profiled a Spark job and found that CompressibleColumnBuilder is using 39% of the CPU. Out of this 39% CompressibleColumnBuilder->gatherCompressibilityStats is using 23% of it. 6.24% of the CPU is spend on List.length which is called inside gatherCompressibilityStats.
After this change we started to save 6.24% of the CPU.
Author: Ergin Seyfe <eseyfe@fb.com>
Closes #15032 from seyfe/gatherCompressibilityStats.
3 files changed, 9 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala index dfed084fe6..f335912ba2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala @@ -73,9 +73,10 @@ abstract class AggregationIterator( startingInputBufferOffset: Int): Array[AggregateFunction] = { var mutableBufferOffset = 0 var inputBufferOffset: Int = startingInputBufferOffset - val functions = new Array[AggregateFunction](expressions.length) + val expressionsLength = expressions.length + val functions = new Array[AggregateFunction](expressionsLength) var i = 0 - while (i < expressions.length) { + while (i < expressionsLength) { val func = expressions(i).aggregateFunction val funcWithBoundReferences: AggregateFunction = expressions(i).mode match { case Partial | Complete if func.isInstanceOf[ImperativeAggregate] => @@ -171,7 +172,7 @@ abstract class AggregationIterator( case PartialMerge | Final => (buffer: MutableRow, row: InternalRow) => ae.merge(buffer, row) } - } + }.toArray // This projection is used to merge buffer values for all expression-based aggregates. val aggregationBufferSchema = functions.flatMap(_.aggBufferAttributes) val updateProjection = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala index 63eae1b868..0f4680e502 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala @@ -66,11 +66,7 @@ private[columnar] trait CompressibleColumnBuilder[T <: AtomicType] } private def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = { - var i = 0 - while (i < compressionEncoders.length) { - compressionEncoders(i).gatherCompressibilityStats(row, ordinal) - i += 1 - } + compressionEncoders.foreach(_.gatherCompressibilityStats(row, ordinal)) } abstract override def appendFrom(row: InternalRow, ordinal: Int): Unit = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index bf5cc17a68..4e74452f6c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -756,7 +756,8 @@ private[hive] trait HiveInspectors { cache: Array[AnyRef], dataTypes: Array[DataType]): Array[AnyRef] = { var i = 0 - while (i < inspectors.length) { + val length = inspectors.length + while (i < length) { cache(i) = wrap(row.get(i, dataTypes(i)), inspectors(i), dataTypes(i)) i += 1 } @@ -769,7 +770,8 @@ private[hive] trait HiveInspectors { cache: Array[AnyRef], dataTypes: Array[DataType]): Array[AnyRef] = { var i = 0 - while (i < inspectors.length) { + val length = inspectors.length + while (i < length) { cache(i) = wrap(row(i), inspectors(i), dataTypes(i)) i += 1 } |