aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorKazuaki Ishizaki <ishizaki@jp.ibm.com>2017-03-26 09:20:22 +0200
committerHerman van Hovell <hvanhovell@databricks.com>2017-03-26 09:20:22 +0200
commit93bb0b911b6c790fa369b39da51a83d8f62da909 (patch)
tree650554cc784f06fda812758a537a4ec0bc96766c /sql/core
parent2422c86f2ce2dd649b1d63062ec5c5fc1716c519 (diff)
downloadspark-93bb0b911b6c790fa369b39da51a83d8f62da909.tar.gz
spark-93bb0b911b6c790fa369b39da51a83d8f62da909.tar.bz2
spark-93bb0b911b6c790fa369b39da51a83d8f62da909.zip
[SPARK-20046][SQL] Facilitate loop optimizations in a JIT compiler regarding sqlContext.read.parquet()
## What changes were proposed in this pull request? This PR improves performance of operations with `sqlContext.read.parquet()` by changing Java code generated by Catalyst. This PR is inspired by [the blog article](https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html) and [this stackoverflow entry](http://stackoverflow.com/questions/40629435/fast-parquet-row-count-in-spark). This PR changes generated code in the following two points. 1. Replace a while-loop with long instance variables a for-loop with int local variables 2. Suppress generation of `shouldStop()` method if this method is unnecessary (e.g. `append()` is not generated). These points facilitates compiler optimizations in a JIT compiler by feeding the simplified Java code into the JIT compiler. The performance of `sqlContext.read.parquet().count` is improved by 1.09x. Benchmark program: ```java val dir = "/dev/shm/parquet" val N = 1000 * 1000 * 40 val iters = 20 val benchmark = new Benchmark("Parquet", N * iters, minNumIters = 5, warmupTime = 30.seconds) sparkSession.range(n).write.mode("overwrite").parquet(dir) benchmark.addCase("count") { i: Int => var n = 0 var len = 0L while (n < iters) { len += sparkSession.read.parquet(dir).count n += 1 } } benchmark.run ``` Performance result without this PR ``` OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-47-generic Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz Parquet: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ w/o this PR 1152 / 1211 694.7 1.4 1.0X ``` Performance result with this PR ``` OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-47-generic Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz Parquet: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ with this PR 1053 / 1121 760.0 1.3 1.0X ``` Here is a comparison between generated code w/o and with this PR. Only the method ```agg_doAggregateWithoutKey``` is changed. Generated code without this PR ```java /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private boolean agg_initAgg; /* 009 */ private boolean agg_bufIsNull; /* 010 */ private long agg_bufValue; /* 011 */ private scala.collection.Iterator scan_input; /* 012 */ private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows; /* 013 */ private org.apache.spark.sql.execution.metric.SQLMetric scan_scanTime; /* 014 */ private long scan_scanTime1; /* 015 */ private org.apache.spark.sql.execution.vectorized.ColumnarBatch scan_batch; /* 016 */ private int scan_batchIdx; /* 017 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows; /* 018 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime; /* 019 */ private UnsafeRow agg_result; /* 020 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter; /* 022 */ /* 023 */ public GeneratedIterator(Object[] references) { /* 024 */ this.references = references; /* 025 */ } /* 026 */ /* 027 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 028 */ partitionIndex = index; /* 029 */ this.inputs = inputs; /* 030 */ agg_initAgg = false; /* 031 */ /* 032 */ scan_input = inputs[0]; /* 033 */ this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0]; /* 034 */ this.scan_scanTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[1]; /* 035 */ scan_scanTime1 = 0; /* 036 */ scan_batch = null; /* 037 */ scan_batchIdx = 0; /* 038 */ this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2]; /* 039 */ this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3]; /* 040 */ agg_result = new UnsafeRow(1); /* 041 */ this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0); /* 042 */ this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1); /* 043 */ /* 044 */ } /* 045 */ /* 046 */ private void agg_doAggregateWithoutKey() throws java.io.IOException { /* 047 */ // initialize aggregation buffer /* 048 */ agg_bufIsNull = false; /* 049 */ agg_bufValue = 0L; /* 050 */ /* 051 */ if (scan_batch == null) { /* 052 */ scan_nextBatch(); /* 053 */ } /* 054 */ while (scan_batch != null) { /* 055 */ int numRows = scan_batch.numRows(); /* 056 */ while (scan_batchIdx < numRows) { /* 057 */ int scan_rowIdx = scan_batchIdx++; /* 058 */ // do aggregate /* 059 */ // common sub-expressions /* 060 */ /* 061 */ // evaluate aggregate function /* 062 */ boolean agg_isNull1 = false; /* 063 */ /* 064 */ long agg_value1 = -1L; /* 065 */ agg_value1 = agg_bufValue + 1L; /* 066 */ // update aggregation buffer /* 067 */ agg_bufIsNull = false; /* 068 */ agg_bufValue = agg_value1; /* 069 */ if (shouldStop()) return; /* 070 */ } /* 071 */ scan_batch = null; /* 072 */ scan_nextBatch(); /* 073 */ } /* 074 */ scan_scanTime.add(scan_scanTime1 / (1000 * 1000)); /* 075 */ scan_scanTime1 = 0; /* 076 */ /* 077 */ } /* 078 */ /* 079 */ private void scan_nextBatch() throws java.io.IOException { /* 080 */ long getBatchStart = System.nanoTime(); /* 081 */ if (scan_input.hasNext()) { /* 082 */ scan_batch = (org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next(); /* 083 */ scan_numOutputRows.add(scan_batch.numRows()); /* 084 */ scan_batchIdx = 0; /* 085 */ /* 086 */ } /* 087 */ scan_scanTime1 += System.nanoTime() - getBatchStart; /* 088 */ } /* 089 */ /* 090 */ protected void processNext() throws java.io.IOException { /* 091 */ while (!agg_initAgg) { /* 092 */ agg_initAgg = true; /* 093 */ long agg_beforeAgg = System.nanoTime(); /* 094 */ agg_doAggregateWithoutKey(); /* 095 */ agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000); /* 096 */ /* 097 */ // output the result /* 098 */ /* 099 */ agg_numOutputRows.add(1); /* 100 */ agg_rowWriter.zeroOutNullBytes(); /* 101 */ /* 102 */ if (agg_bufIsNull) { /* 103 */ agg_rowWriter.setNullAt(0); /* 104 */ } else { /* 105 */ agg_rowWriter.write(0, agg_bufValue); /* 106 */ } /* 107 */ append(agg_result); /* 108 */ } /* 109 */ } /* 110 */ } ``` Generated code with this PR ```java /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private boolean agg_initAgg; /* 009 */ private boolean agg_bufIsNull; /* 010 */ private long agg_bufValue; /* 011 */ private scala.collection.Iterator scan_input; /* 012 */ private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows; /* 013 */ private org.apache.spark.sql.execution.metric.SQLMetric scan_scanTime; /* 014 */ private long scan_scanTime1; /* 015 */ private org.apache.spark.sql.execution.vectorized.ColumnarBatch scan_batch; /* 016 */ private int scan_batchIdx; /* 017 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows; /* 018 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime; /* 019 */ private UnsafeRow agg_result; /* 020 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter; /* 022 */ /* 023 */ public GeneratedIterator(Object[] references) { /* 024 */ this.references = references; /* 025 */ } /* 026 */ /* 027 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 028 */ partitionIndex = index; /* 029 */ this.inputs = inputs; /* 030 */ agg_initAgg = false; /* 031 */ /* 032 */ scan_input = inputs[0]; /* 033 */ this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0]; /* 034 */ this.scan_scanTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[1]; /* 035 */ scan_scanTime1 = 0; /* 036 */ scan_batch = null; /* 037 */ scan_batchIdx = 0; /* 038 */ this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2]; /* 039 */ this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3]; /* 040 */ agg_result = new UnsafeRow(1); /* 041 */ this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0); /* 042 */ this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1); /* 043 */ /* 044 */ } /* 045 */ /* 046 */ private void agg_doAggregateWithoutKey() throws java.io.IOException { /* 047 */ // initialize aggregation buffer /* 048 */ agg_bufIsNull = false; /* 049 */ agg_bufValue = 0L; /* 050 */ /* 051 */ if (scan_batch == null) { /* 052 */ scan_nextBatch(); /* 053 */ } /* 054 */ while (scan_batch != null) { /* 055 */ int numRows = scan_batch.numRows(); /* 056 */ int scan_localEnd = numRows - scan_batchIdx; /* 057 */ for (int scan_localIdx = 0; scan_localIdx < scan_localEnd; scan_localIdx++) { /* 058 */ int scan_rowIdx = scan_batchIdx + scan_localIdx; /* 059 */ // do aggregate /* 060 */ // common sub-expressions /* 061 */ /* 062 */ // evaluate aggregate function /* 063 */ boolean agg_isNull1 = false; /* 064 */ /* 065 */ long agg_value1 = -1L; /* 066 */ agg_value1 = agg_bufValue + 1L; /* 067 */ // update aggregation buffer /* 068 */ agg_bufIsNull = false; /* 069 */ agg_bufValue = agg_value1; /* 070 */ // shouldStop check is eliminated /* 071 */ } /* 072 */ scan_batchIdx = numRows; /* 073 */ scan_batch = null; /* 074 */ scan_nextBatch(); /* 075 */ } /* 079 */ } /* 080 */ /* 081 */ private void scan_nextBatch() throws java.io.IOException { /* 082 */ long getBatchStart = System.nanoTime(); /* 083 */ if (scan_input.hasNext()) { /* 084 */ scan_batch = (org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next(); /* 085 */ scan_numOutputRows.add(scan_batch.numRows()); /* 086 */ scan_batchIdx = 0; /* 087 */ /* 088 */ } /* 089 */ scan_scanTime1 += System.nanoTime() - getBatchStart; /* 090 */ } /* 091 */ /* 092 */ protected void processNext() throws java.io.IOException { /* 093 */ while (!agg_initAgg) { /* 094 */ agg_initAgg = true; /* 095 */ long agg_beforeAgg = System.nanoTime(); /* 096 */ agg_doAggregateWithoutKey(); /* 097 */ agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000); /* 098 */ /* 099 */ // output the result /* 100 */ /* 101 */ agg_numOutputRows.add(1); /* 102 */ agg_rowWriter.zeroOutNullBytes(); /* 103 */ /* 104 */ if (agg_bufIsNull) { /* 105 */ agg_rowWriter.setNullAt(0); /* 106 */ } else { /* 107 */ agg_rowWriter.write(0, agg_bufValue); /* 108 */ } /* 109 */ append(agg_result); /* 110 */ } /* 111 */ } /* 112 */ } ``` ## How was this patch tested? Tested existing test suites Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes #17378 from kiszk/SPARK-20046.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala18
1 files changed, 14 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
index 04fba17be4..e86116680a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
@@ -111,17 +111,27 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport {
val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable)
}
+ val localIdx = ctx.freshName("localIdx")
+ val localEnd = ctx.freshName("localEnd")
+ val numRows = ctx.freshName("numRows")
+ val shouldStop = if (isShouldStopRequired) {
+ s"if (shouldStop()) { $idx = $rowidx + 1; return; }"
+ } else {
+ "// shouldStop check is eliminated"
+ }
s"""
|if ($batch == null) {
| $nextBatch();
|}
|while ($batch != null) {
- | int numRows = $batch.numRows();
- | while ($idx < numRows) {
- | int $rowidx = $idx++;
+ | int $numRows = $batch.numRows();
+ | int $localEnd = $numRows - $idx;
+ | for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
+ | int $rowidx = $idx + $localIdx;
| ${consume(ctx, columnsBatchInput).trim}
- | if (shouldStop()) return;
+ | $shouldStop
| }
+ | $idx = $numRows;
| $batch = null;
| $nextBatch();
|}