diff options
author | Kazuaki Ishizaki <ishizaki@jp.ibm.com> | 2017-03-26 09:20:22 +0200 |
---|---|---|
committer | Herman van Hovell <hvanhovell@databricks.com> | 2017-03-26 09:20:22 +0200 |
commit | 93bb0b911b6c790fa369b39da51a83d8f62da909 (patch) | |
tree | 650554cc784f06fda812758a537a4ec0bc96766c /dev/scalastyle | |
parent | 2422c86f2ce2dd649b1d63062ec5c5fc1716c519 (diff) | |
download | spark-93bb0b911b6c790fa369b39da51a83d8f62da909.tar.gz spark-93bb0b911b6c790fa369b39da51a83d8f62da909.tar.bz2 spark-93bb0b911b6c790fa369b39da51a83d8f62da909.zip |
[SPARK-20046][SQL] Facilitate loop optimizations in a JIT compiler regarding sqlContext.read.parquet()
## What changes were proposed in this pull request?
This PR improves performance of operations with `sqlContext.read.parquet()` by changing Java code generated by Catalyst. This PR is inspired by [the blog article](https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html) and [this stackoverflow entry](http://stackoverflow.com/questions/40629435/fast-parquet-row-count-in-spark).
This PR changes generated code in the following two points.
1. Replace a while-loop with long instance variables a for-loop with int local variables
2. Suppress generation of `shouldStop()` method if this method is unnecessary (e.g. `append()` is not generated).
These points facilitates compiler optimizations in a JIT compiler by feeding the simplified Java code into the JIT compiler. The performance of `sqlContext.read.parquet().count` is improved by 1.09x.
Benchmark program:
```java
val dir = "/dev/shm/parquet"
val N = 1000 * 1000 * 40
val iters = 20
val benchmark = new Benchmark("Parquet", N * iters, minNumIters = 5, warmupTime = 30.seconds)
sparkSession.range(n).write.mode("overwrite").parquet(dir)
benchmark.addCase("count") { i: Int =>
var n = 0
var len = 0L
while (n < iters) {
len += sparkSession.read.parquet(dir).count
n += 1
}
}
benchmark.run
```
Performance result without this PR
```
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-47-generic
Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz
Parquet: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
w/o this PR 1152 / 1211 694.7 1.4 1.0X
```
Performance result with this PR
```
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-47-generic
Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz
Parquet: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
with this PR 1053 / 1121 760.0 1.3 1.0X
```
Here is a comparison between generated code w/o and with this PR. Only the method ```agg_doAggregateWithoutKey``` is changed.
Generated code without this PR
```java
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private boolean agg_initAgg;
/* 009 */ private boolean agg_bufIsNull;
/* 010 */ private long agg_bufValue;
/* 011 */ private scala.collection.Iterator scan_input;
/* 012 */ private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows;
/* 013 */ private org.apache.spark.sql.execution.metric.SQLMetric scan_scanTime;
/* 014 */ private long scan_scanTime1;
/* 015 */ private org.apache.spark.sql.execution.vectorized.ColumnarBatch scan_batch;
/* 016 */ private int scan_batchIdx;
/* 017 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows;
/* 018 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime;
/* 019 */ private UnsafeRow agg_result;
/* 020 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 022 */
/* 023 */ public GeneratedIterator(Object[] references) {
/* 024 */ this.references = references;
/* 025 */ }
/* 026 */
/* 027 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 028 */ partitionIndex = index;
/* 029 */ this.inputs = inputs;
/* 030 */ agg_initAgg = false;
/* 031 */
/* 032 */ scan_input = inputs[0];
/* 033 */ this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 034 */ this.scan_scanTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 035 */ scan_scanTime1 = 0;
/* 036 */ scan_batch = null;
/* 037 */ scan_batchIdx = 0;
/* 038 */ this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 039 */ this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 040 */ agg_result = new UnsafeRow(1);
/* 041 */ this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
/* 042 */ this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
/* 043 */
/* 044 */ }
/* 045 */
/* 046 */ private void agg_doAggregateWithoutKey() throws java.io.IOException {
/* 047 */ // initialize aggregation buffer
/* 048 */ agg_bufIsNull = false;
/* 049 */ agg_bufValue = 0L;
/* 050 */
/* 051 */ if (scan_batch == null) {
/* 052 */ scan_nextBatch();
/* 053 */ }
/* 054 */ while (scan_batch != null) {
/* 055 */ int numRows = scan_batch.numRows();
/* 056 */ while (scan_batchIdx < numRows) {
/* 057 */ int scan_rowIdx = scan_batchIdx++;
/* 058 */ // do aggregate
/* 059 */ // common sub-expressions
/* 060 */
/* 061 */ // evaluate aggregate function
/* 062 */ boolean agg_isNull1 = false;
/* 063 */
/* 064 */ long agg_value1 = -1L;
/* 065 */ agg_value1 = agg_bufValue + 1L;
/* 066 */ // update aggregation buffer
/* 067 */ agg_bufIsNull = false;
/* 068 */ agg_bufValue = agg_value1;
/* 069 */ if (shouldStop()) return;
/* 070 */ }
/* 071 */ scan_batch = null;
/* 072 */ scan_nextBatch();
/* 073 */ }
/* 074 */ scan_scanTime.add(scan_scanTime1 / (1000 * 1000));
/* 075 */ scan_scanTime1 = 0;
/* 076 */
/* 077 */ }
/* 078 */
/* 079 */ private void scan_nextBatch() throws java.io.IOException {
/* 080 */ long getBatchStart = System.nanoTime();
/* 081 */ if (scan_input.hasNext()) {
/* 082 */ scan_batch = (org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next();
/* 083 */ scan_numOutputRows.add(scan_batch.numRows());
/* 084 */ scan_batchIdx = 0;
/* 085 */
/* 086 */ }
/* 087 */ scan_scanTime1 += System.nanoTime() - getBatchStart;
/* 088 */ }
/* 089 */
/* 090 */ protected void processNext() throws java.io.IOException {
/* 091 */ while (!agg_initAgg) {
/* 092 */ agg_initAgg = true;
/* 093 */ long agg_beforeAgg = System.nanoTime();
/* 094 */ agg_doAggregateWithoutKey();
/* 095 */ agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000);
/* 096 */
/* 097 */ // output the result
/* 098 */
/* 099 */ agg_numOutputRows.add(1);
/* 100 */ agg_rowWriter.zeroOutNullBytes();
/* 101 */
/* 102 */ if (agg_bufIsNull) {
/* 103 */ agg_rowWriter.setNullAt(0);
/* 104 */ } else {
/* 105 */ agg_rowWriter.write(0, agg_bufValue);
/* 106 */ }
/* 107 */ append(agg_result);
/* 108 */ }
/* 109 */ }
/* 110 */ }
```
Generated code with this PR
```java
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private boolean agg_initAgg;
/* 009 */ private boolean agg_bufIsNull;
/* 010 */ private long agg_bufValue;
/* 011 */ private scala.collection.Iterator scan_input;
/* 012 */ private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows;
/* 013 */ private org.apache.spark.sql.execution.metric.SQLMetric scan_scanTime;
/* 014 */ private long scan_scanTime1;
/* 015 */ private org.apache.spark.sql.execution.vectorized.ColumnarBatch scan_batch;
/* 016 */ private int scan_batchIdx;
/* 017 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows;
/* 018 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime;
/* 019 */ private UnsafeRow agg_result;
/* 020 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 022 */
/* 023 */ public GeneratedIterator(Object[] references) {
/* 024 */ this.references = references;
/* 025 */ }
/* 026 */
/* 027 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 028 */ partitionIndex = index;
/* 029 */ this.inputs = inputs;
/* 030 */ agg_initAgg = false;
/* 031 */
/* 032 */ scan_input = inputs[0];
/* 033 */ this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 034 */ this.scan_scanTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 035 */ scan_scanTime1 = 0;
/* 036 */ scan_batch = null;
/* 037 */ scan_batchIdx = 0;
/* 038 */ this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 039 */ this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 040 */ agg_result = new UnsafeRow(1);
/* 041 */ this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
/* 042 */ this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
/* 043 */
/* 044 */ }
/* 045 */
/* 046 */ private void agg_doAggregateWithoutKey() throws java.io.IOException {
/* 047 */ // initialize aggregation buffer
/* 048 */ agg_bufIsNull = false;
/* 049 */ agg_bufValue = 0L;
/* 050 */
/* 051 */ if (scan_batch == null) {
/* 052 */ scan_nextBatch();
/* 053 */ }
/* 054 */ while (scan_batch != null) {
/* 055 */ int numRows = scan_batch.numRows();
/* 056 */ int scan_localEnd = numRows - scan_batchIdx;
/* 057 */ for (int scan_localIdx = 0; scan_localIdx < scan_localEnd; scan_localIdx++) {
/* 058 */ int scan_rowIdx = scan_batchIdx + scan_localIdx;
/* 059 */ // do aggregate
/* 060 */ // common sub-expressions
/* 061 */
/* 062 */ // evaluate aggregate function
/* 063 */ boolean agg_isNull1 = false;
/* 064 */
/* 065 */ long agg_value1 = -1L;
/* 066 */ agg_value1 = agg_bufValue + 1L;
/* 067 */ // update aggregation buffer
/* 068 */ agg_bufIsNull = false;
/* 069 */ agg_bufValue = agg_value1;
/* 070 */ // shouldStop check is eliminated
/* 071 */ }
/* 072 */ scan_batchIdx = numRows;
/* 073 */ scan_batch = null;
/* 074 */ scan_nextBatch();
/* 075 */ }
/* 079 */ }
/* 080 */
/* 081 */ private void scan_nextBatch() throws java.io.IOException {
/* 082 */ long getBatchStart = System.nanoTime();
/* 083 */ if (scan_input.hasNext()) {
/* 084 */ scan_batch = (org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next();
/* 085 */ scan_numOutputRows.add(scan_batch.numRows());
/* 086 */ scan_batchIdx = 0;
/* 087 */
/* 088 */ }
/* 089 */ scan_scanTime1 += System.nanoTime() - getBatchStart;
/* 090 */ }
/* 091 */
/* 092 */ protected void processNext() throws java.io.IOException {
/* 093 */ while (!agg_initAgg) {
/* 094 */ agg_initAgg = true;
/* 095 */ long agg_beforeAgg = System.nanoTime();
/* 096 */ agg_doAggregateWithoutKey();
/* 097 */ agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000);
/* 098 */
/* 099 */ // output the result
/* 100 */
/* 101 */ agg_numOutputRows.add(1);
/* 102 */ agg_rowWriter.zeroOutNullBytes();
/* 103 */
/* 104 */ if (agg_bufIsNull) {
/* 105 */ agg_rowWriter.setNullAt(0);
/* 106 */ } else {
/* 107 */ agg_rowWriter.write(0, agg_bufValue);
/* 108 */ }
/* 109 */ append(agg_result);
/* 110 */ }
/* 111 */ }
/* 112 */ }
```
## How was this patch tested?
Tested existing test suites
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes #17378 from kiszk/SPARK-20046.
Diffstat (limited to 'dev/scalastyle')
0 files changed, 0 insertions, 0 deletions