From 86c2d393a56bf1e5114bc5a781253c0460efb8af Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sat, 17 Sep 2016 16:52:30 +0100 Subject: [SPARK-17480][SQL][FOLLOWUP] Fix more instances which calls List.length/size which is O(n) ## What changes were proposed in this pull request? This PR fixes all the instances which was fixed in the previous PR. To make sure, I manually debugged and also checked the Scala source. `length` in [LinearSeqOptimized.scala#L49-L57](https://github.com/scala/scala/blob/2.11.x/src/library/scala/collection/LinearSeqOptimized.scala#L49-L57) is O(n). Also, `size` calls `length` via [SeqLike.scala#L106](https://github.com/scala/scala/blob/2.11.x/src/library/scala/collection/SeqLike.scala#L106). For debugging, I have created these as below: ```scala ArrayBuffer(1, 2, 3) Array(1, 2, 3) List(1, 2, 3) Seq(1, 2, 3) ``` and then called `size` and `length` for each to debug. ## How was this patch tested? I ran the bash as below on Mac ```bash find . -name *.scala -type f -exec grep -il "while (.*\\.length)" {} \; | grep "src/main" find . -name *.scala -type f -exec grep -il "while (.*\\.size)" {} \; | grep "src/main" ``` and then checked each. Author: hyukjinkwon Closes #15093 from HyukjinKwon/SPARK-17480-followup. --- .../spark/sql/catalyst/analysis/Analyzer.scala | 28 +++++++--------------- .../expressions/conditionalExpressions.scala | 3 ++- .../spark/sql/catalyst/expressions/ordering.scala | 3 ++- .../sql/catalyst/util/QuantileSummaries.scala | 10 ++++---- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 2 +- .../org/apache/spark/sql/hive/HiveInspectors.scala | 6 +++-- .../org/apache/spark/sql/hive/TableReader.scala | 3 ++- .../scala/org/apache/spark/sql/hive/hiveUDFs.scala | 3 ++- .../apache/spark/sql/hive/orc/OrcFileFormat.scala | 6 +++-- 9 files changed, 31 insertions(+), 33 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 5210f42c55..cc62d5e7c8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1663,27 +1663,17 @@ class Analyzer( } }.toSeq - // Third, for every Window Spec, we add a Window operator and set currentChild as the - // child of it. - var currentChild = child - var i = 0 - while (i < groupedWindowExpressions.size) { - val ((partitionSpec, orderSpec), windowExpressions) = groupedWindowExpressions(i) - // Set currentChild to the newly created Window operator. - currentChild = - Window( - windowExpressions, - partitionSpec, - orderSpec, - currentChild) - - // Move to next Window Spec. - i += 1 - } + // Third, we aggregate them by adding each Window operator for each Window Spec and then + // setting this to the child of the next Window operator. + val windowOps = + groupedWindowExpressions.foldLeft(child) { + case (last, ((partitionSpec, orderSpec), windowExpressions)) => + Window(windowExpressions, partitionSpec, orderSpec, last) + } - // Finally, we create a Project to output currentChild's output + // Finally, we create a Project to output windowOps's output // newExpressionsWithWindowFunctions. - Project(currentChild.output ++ newExpressionsWithWindowFunctions, currentChild) + Project(windowOps.output ++ newExpressionsWithWindowFunctions, windowOps) } // end of addWindow // We have to use transformDown at here to make sure the rule of diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala index 1dd70bcfcf..71d4e9a3c9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala @@ -125,7 +125,8 @@ abstract class CaseWhenBase( override def eval(input: InternalRow): Any = { var i = 0 - while (i < branches.size) { + val size = branches.size + while (i < size) { if (java.lang.Boolean.TRUE.equals(branches(i)._1.eval(input))) { return branches(i)._2.eval(input) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala index 79d2052c38..e24a3de3cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala @@ -31,7 +31,8 @@ class InterpretedOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow def compare(a: InternalRow, b: InternalRow): Int = { var i = 0 - while (i < ordering.size) { + val size = ordering.size + while (i < size) { val order = ordering(i) val left = order.child.eval(a) val right = order.child.eval(b) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala index fd62bd511f..27928c493d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala @@ -91,10 +91,10 @@ class QuantileSummaries( var sampleIdx = 0 // The index of the sample currently being inserted. var opsIdx: Int = 0 - while(opsIdx < sorted.length) { + while (opsIdx < sorted.length) { val currentSample = sorted(opsIdx) // Add all the samples before the next observation. - while(sampleIdx < sampled.size && sampled(sampleIdx).value <= currentSample) { + while (sampleIdx < sampled.length && sampled(sampleIdx).value <= currentSample) { newSamples += sampled(sampleIdx) sampleIdx += 1 } @@ -102,7 +102,7 @@ class QuantileSummaries( // If it is the first one to insert, of if it is the last one currentCount += 1 val delta = - if (newSamples.isEmpty || (sampleIdx == sampled.size && opsIdx == sorted.length - 1)) { + if (newSamples.isEmpty || (sampleIdx == sampled.length && opsIdx == sorted.length - 1)) { 0 } else { math.floor(2 * relativeError * currentCount).toInt @@ -114,7 +114,7 @@ class QuantileSummaries( } // Add all the remaining existing samples - while(sampleIdx < sampled.size) { + while (sampleIdx < sampled.length) { newSamples += sampled(sampleIdx) sampleIdx += 1 } @@ -195,7 +195,7 @@ class QuantileSummaries( // Minimum rank at current sample var minRank = 0 var i = 1 - while (i < sampled.size - 1) { + while (i < sampled.length - 1) { val curSample = sampled(i) minRank += curSample.g val maxRank = minRank + curSample.delta diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index b09fd511a9..3db1d1f109 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -369,7 +369,7 @@ object JdbcUtils extends Logging { val bytes = rs.getBytes(pos + 1) var ans = 0L var j = 0 - while (j < bytes.size) { + while (j < bytes.length) { ans = 256 * ans + (255 & bytes(j)) j = j + 1 } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 4e74452f6c..e4b963efea 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -703,7 +703,8 @@ private[hive] trait HiveInspectors { // 1. create the pojo (most likely) object val result = x.create() var i = 0 - while (i < fieldRefs.size) { + val size = fieldRefs.size + while (i < size) { // 2. set the property for the pojo val tpe = structType(i).dataType x.setStructFieldData( @@ -720,7 +721,8 @@ private[hive] trait HiveInspectors { val row = a.asInstanceOf[InternalRow] val result = new java.util.ArrayList[AnyRef](fieldRefs.size) var i = 0 - while (i < fieldRefs.size) { + val size = fieldRefs.size + while (i < size) { val tpe = structType(i).dataType result.add(wrap(row.get(i, tpe), fieldRefs.get(i).getFieldObjectInspector, tpe)) i += 1 diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index b4808fdbed..ec7e53efc8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -427,7 +427,8 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging { iterator.map { value => val raw = converter.convert(rawDeser.deserialize(value)) var i = 0 - while (i < fieldRefs.length) { + val length = fieldRefs.length + while (i < length) { val fieldValue = soi.getStructFieldData(raw, fieldRefs(i)) if (fieldValue == null) { mutableRow.setNullAt(fieldOrdinals(i)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 9347aeb8e0..962dd5a52e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -153,7 +153,8 @@ private[hive] case class HiveGenericUDF( returnInspector // Make sure initialized. var i = 0 - while (i < children.length) { + val length = children.length + while (i < length) { val idx = i deferredObjects(i).asInstanceOf[DeferredObjectAdapter] .set(() => children(idx).eval(input)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 286197b50e..03b508e11a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -190,7 +190,8 @@ private[orc] class OrcSerializer(dataSchema: StructType, conf: Configuration) row: InternalRow): Unit = { val fieldRefs = oi.getAllStructFieldRefs var i = 0 - while (i < fieldRefs.size) { + val size = fieldRefs.size + while (i < size) { oi.setStructFieldData( struct, @@ -289,7 +290,8 @@ private[orc] object OrcRelation extends HiveInspectors { iterator.map { value => val raw = deserializer.deserialize(value) var i = 0 - while (i < fieldRefs.length) { + val length = fieldRefs.length + while (i < length) { val fieldValue = oi.getStructFieldData(raw, fieldRefs(i)) if (fieldValue == null) { mutableRow.setNullAt(fieldOrdinals(i)) -- cgit v1.2.3