aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BenchmarkWideTable.scala53
3 files changed, 59 insertions, 5 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 16fb1f6837..4bd9ee03f9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -584,15 +584,18 @@ class CodegenContext {
* @param expressions the codes to evaluate expressions.
*/
def splitExpressions(row: String, expressions: Seq[String]): String = {
- if (row == null) {
+ if (row == null || currentVars != null) {
// Cannot split these expressions because they are not created from a row object.
return expressions.mkString("\n")
}
val blocks = new ArrayBuffer[String]()
val blockBuilder = new StringBuilder()
for (code <- expressions) {
- // We can't know how many byte code will be generated, so use the number of bytes as limit
- if (blockBuilder.length > 64 * 1000) {
+ // We can't know how many bytecode will be generated, so use the length of source code
+ // as metric. A method should not go beyond 8K, otherwise it will not be JITted, should
+ // also not be too small, or it will have many function calls (for wide table), see the
+ // results in BenchmarkWideTable.
+ if (blockBuilder.length > 1024) {
blocks.append(blockBuilder.toString())
blockBuilder.clear()
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index cfc47aba88..bd7efa606e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -603,8 +603,6 @@ case class HashAggregateExec(
// create grouping key
ctx.currentVars = input
- // make sure that the generated code will not be splitted as multiple functions
- ctx.INPUT_ROW = null
val unsafeRowKeyCode = GenerateUnsafeProjection.createCode(
ctx, groupingExpressions.map(e => BindReferences.bindReference[Expression](e, child.output)))
val vectorizedRowKeys = ctx.generateExpressions(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BenchmarkWideTable.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BenchmarkWideTable.scala
new file mode 100644
index 0000000000..9dcaca0ca9
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BenchmarkWideTable.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.spark.util.Benchmark
+
+
+/**
+ * Benchmark to measure performance for wide table.
+ * To run this:
+ * build/sbt "sql/test-only *benchmark.BenchmarkWideTable"
+ *
+ * Benchmarks in this file are skipped in normal builds.
+ */
+class BenchmarkWideTable extends BenchmarkBase {
+
+ ignore("project on wide table") {
+ val N = 1 << 20
+ val df = sparkSession.range(N)
+ val columns = (0 until 400).map{ i => s"id as id$i"}
+ val benchmark = new Benchmark("projection on wide table", N)
+ benchmark.addCase("wide table", numIters = 5) { iter =>
+ df.selectExpr(columns : _*).queryExecution.toRdd.count()
+ }
+ benchmark.run()
+
+ /**
+ * Here are some numbers with different split threshold:
+ *
+ * Split threshold methods Rate(M/s) Per Row(ns)
+ * 10 400 0.4 2279
+ * 100 200 0.6 1554
+ * 1k 37 0.9 1116
+ * 8k 5 0.5 2025
+ * 64k 1 0.0 21649
+ */
+ }
+}