diff options
author | Kazuaki Ishizaki <ishizaki@jp.ibm.com> | 2016-03-21 14:36:51 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-03-21 14:36:51 -0700 |
commit | f35df7d1820738cc1dac81271041707010e2f08f (patch) | |
tree | a83a305038d27b205c98b3ba987b1ed14284d50a /sql | |
parent | 9b4e15ba13f62cff302d978093633fc3181a8475 (diff) | |
download | spark-f35df7d1820738cc1dac81271041707010e2f08f.tar.gz spark-f35df7d1820738cc1dac81271041707010e2f08f.tar.bz2 spark-f35df7d1820738cc1dac81271041707010e2f08f.zip |
[SPARK-13805] [SQL] Generate code that get a value in each column from ColumnVector when ColumnarBatch is used
## What changes were proposed in this pull request?
This PR generates code that get a value in each column from ```ColumnVector``` instead of creating ```InternalRow``` when ```ColumnarBatch``` is accessed. This PR improves benchmark program by up to 15%.
This PR consists of two parts:
1. Get an ```ColumnVector ``` by using ```ColumnarBatch.column()``` method
2. Get a value of each column by using ```rdd_col${COLIDX}.getInt(ROWIDX)``` instead of ```rdd_row.getInt(COLIDX)```
This is a motivated example.
````
sqlContext.conf.setConfString(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, "true")
sqlContext.conf.setConfString(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")
val values = 10
withTempPath { dir =>
withTempTable("t1", "tempTable") {
sqlContext.range(values).registerTempTable("t1")
sqlContext.sql("select id % 2 as p, cast(id as INT) as id from t1")
.write.partitionBy("p").parquet(dir.getCanonicalPath)
sqlContext.read.parquet(dir.getCanonicalPath).registerTempTable("tempTable")
sqlContext.sql("select sum(p) from tempTable").collect
}
}
````
The original code
````java
...
/* 072 */ while (!shouldStop() && rdd_batchIdx < numRows) {
/* 073 */ InternalRow rdd_row = rdd_batch.getRow(rdd_batchIdx++);
/* 074 */ /*** CONSUME: TungstenAggregate(key=[], functions=[(sum(cast(p#4 as bigint)),mode=Partial,isDistinct=false)], output=[sum#10L]) */
/* 075 */ /* input[0, int] */
/* 076 */ boolean rdd_isNull = rdd_row.isNullAt(0);
/* 077 */ int rdd_value = rdd_isNull ? -1 : (rdd_row.getInt(0));
...
````
The code generated by this PR
````java
/* 072 */ while (!shouldStop() && rdd_batchIdx < numRows) {
/* 073 */ org.apache.spark.sql.execution.vectorized.ColumnVector rdd_col0 = rdd_batch.column(0);
/* 074 */ /*** CONSUME: TungstenAggregate(key=[], functions=[(sum(cast(p#4 as bigint)),mode=Partial,isDistinct=false)], output=[sum#10L]) */
/* 075 */ /* input[0, int] */
/* 076 */ boolean rdd_isNull = rdd_col0.getIsNull(rdd_batchIdx);
/* 077 */ int rdd_value = rdd_isNull ? -1 : (rdd_col0.getInt(rdd_batchIdx));
...
/* 128 */ rdd_batchIdx++;
/* 129 */ }
/* 130 */ if (shouldStop()) return;
````
Performance
Without this PR
````
model name : Intel(R) Xeon(R) CPU E5-2667 v2 3.30GHz
Partitioned Table: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
Read data column 434 / 488 36.3 27.6 1.0X
Read partition column 302 / 346 52.1 19.2 1.4X
Read both columns 588 / 643 26.8 37.4 0.7X
````
With this PR
````
model name : Intel(R) Xeon(R) CPU E5-2667 v2 3.30GHz
Partitioned Table: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
Read data column 392 / 516 40.1 24.9 1.0X
Read partition column 256 / 318 61.4 16.3 1.5X
Read both columns 523 / 539 30.1 33.3 0.7X
````
## How was this patch tested?
Tested by existing test suites and benchmark
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes #11636 from kiszk/SPARK-13805.
Diffstat (limited to 'sql')
9 files changed, 77 insertions, 33 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index 04adf1fb6d..13bf4c5c77 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -109,62 +109,62 @@ public abstract class ColumnVector { if (dt instanceof BooleanType) { for (int i = 0; i < length; i++) { - if (!data.getIsNull(offset + i)) { + if (!data.isNullAt(offset + i)) { list[i] = data.getBoolean(offset + i); } } } else if (dt instanceof ByteType) { for (int i = 0; i < length; i++) { - if (!data.getIsNull(offset + i)) { + if (!data.isNullAt(offset + i)) { list[i] = data.getByte(offset + i); } } } else if (dt instanceof ShortType) { for (int i = 0; i < length; i++) { - if (!data.getIsNull(offset + i)) { + if (!data.isNullAt(offset + i)) { list[i] = data.getShort(offset + i); } } } else if (dt instanceof IntegerType) { for (int i = 0; i < length; i++) { - if (!data.getIsNull(offset + i)) { + if (!data.isNullAt(offset + i)) { list[i] = data.getInt(offset + i); } } } else if (dt instanceof FloatType) { for (int i = 0; i < length; i++) { - if (!data.getIsNull(offset + i)) { + if (!data.isNullAt(offset + i)) { list[i] = data.getFloat(offset + i); } } } else if (dt instanceof DoubleType) { for (int i = 0; i < length; i++) { - if (!data.getIsNull(offset + i)) { + if (!data.isNullAt(offset + i)) { list[i] = data.getDouble(offset + i); } } } else if (dt instanceof LongType) { for (int i = 0; i < length; i++) { - if (!data.getIsNull(offset + i)) { + if (!data.isNullAt(offset + i)) { list[i] = data.getLong(offset + i); } } } else if (dt instanceof DecimalType) { DecimalType decType = (DecimalType)dt; for (int i = 0; i < length; i++) { - if (!data.getIsNull(offset + i)) { + if (!data.isNullAt(offset + i)) { list[i] = getDecimal(i, decType.precision(), decType.scale()); } } } else if (dt instanceof StringType) { for (int i = 0; i < length; i++) { - if (!data.getIsNull(offset + i)) { + if (!data.isNullAt(offset + i)) { list[i] = getUTF8String(i).toString(); } } } else if (dt instanceof CalendarIntervalType) { for (int i = 0; i < length; i++) { - if (!data.getIsNull(offset + i)) { + if (!data.isNullAt(offset + i)) { list[i] = getInterval(i); } } @@ -175,7 +175,7 @@ public abstract class ColumnVector { } @Override - public boolean isNullAt(int ordinal) { return data.getIsNull(offset + ordinal); } + public boolean isNullAt(int ordinal) { return data.isNullAt(offset + ordinal); } @Override public boolean getBoolean(int ordinal) { @@ -314,7 +314,7 @@ public abstract class ColumnVector { /** * Returns whether the value at rowId is NULL. */ - public abstract boolean getIsNull(int rowId); + public abstract boolean isNullAt(int rowId); /** * Sets the value at rowId to `value`. @@ -501,6 +501,15 @@ public abstract class ColumnVector { } /** + * Returns a utility object to get structs. + * provided to keep API compabilitity with InternalRow for code generation + */ + public ColumnarBatch.Row getStruct(int rowId, int size) { + resultStruct.rowId = rowId; + return resultStruct; + } + + /** * Returns the array at rowid. */ public final Array getArray(int rowId) { @@ -532,6 +541,13 @@ public abstract class ColumnVector { } /** + * Returns the value for rowId. + */ + public MapData getMap(int ordinal) { + throw new NotImplementedException(); + } + + /** * Returns the decimal for rowId. */ public final Decimal getDecimal(int rowId, int precision, int scale) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java index b084eda6f8..2dc57dc50d 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java @@ -105,7 +105,7 @@ public class ColumnVectorUtils { int[] result = new int[array.length]; ColumnVector data = array.data; for (int i = 0; i < result.length; i++) { - if (data.getIsNull(array.offset + i)) { + if (data.isNullAt(array.offset + i)) { throw new RuntimeException("Cannot handle NULL values."); } result[i] = data.getInt(array.offset + i); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java index b6fa9a0b9e..7ab4cda5a4 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java @@ -168,7 +168,7 @@ public final class ColumnarBatch { } @Override - public boolean isNullAt(int ordinal) { return columns[ordinal].getIsNull(rowId); } + public boolean isNullAt(int ordinal) { return columns[ordinal].isNullAt(rowId); } @Override public boolean getBoolean(int ordinal) { return columns[ordinal].getBoolean(rowId); } @@ -295,7 +295,7 @@ public final class ColumnarBatch { for (int ordinal : nullFilteredColumns) { if (columns[ordinal].numNulls != 0) { for (int rowId = 0; rowId < numRows; rowId++) { - if (!filteredRows[rowId] && columns[ordinal].getIsNull(rowId)) { + if (!filteredRows[rowId] && columns[ordinal].isNullAt(rowId)) { filteredRows[rowId] = true; ++numRowsFiltered; } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index d5a9163274..689e6a2a6d 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -109,7 +109,7 @@ public final class OffHeapColumnVector extends ColumnVector { } @Override - public boolean getIsNull(int rowId) { + public boolean isNullAt(int rowId) { return Platform.getByte(null, nulls + rowId) == 1; } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 5b671a7432..f332e87016 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -98,7 +98,7 @@ public final class OnHeapColumnVector extends ColumnVector { } @Override - public boolean getIsNull(int rowId) { + public boolean isNullAt(int rowId) { return nulls[rowId] == 1; } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index e97c6be7f1..b4348d39c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -22,9 +22,10 @@ import org.apache.spark.sql.{AnalysisException, Row, SQLContext} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.util.toCommentSafeString import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetSource} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf @@ -195,19 +196,42 @@ private[sql] case class DataSourceScan( rdd :: Nil } + private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String, + dataType: DataType, nullable: Boolean): ExprCode = { + val javaType = ctx.javaType(dataType) + val value = ctx.getValue(columnVar, dataType, ordinal) + val isNullVar = if (nullable) { ctx.freshName("isNull") } else { "false" } + val valueVar = ctx.freshName("value") + val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]" + val code = s"/* ${toCommentSafeString(str)} */\n" + (if (nullable) { + s""" + boolean ${isNullVar} = ${columnVar}.isNullAt($ordinal); + $javaType ${valueVar} = ${isNullVar} ? ${ctx.defaultValue(dataType)} : ($value); + """ + } else { + s"$javaType ${valueVar} = $value;" + }).trim + ExprCode(code, isNullVar, valueVar) + } + // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen // never requires UnsafeRow as input. override protected def doProduce(ctx: CodegenContext): String = { val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch" + val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector" val input = ctx.freshName("input") val idx = ctx.freshName("batchIdx") + val rowidx = ctx.freshName("rowIdx") val batch = ctx.freshName("batch") // PhysicalRDD always just has one input ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;") ctx.addMutableState("int", idx, s"$idx = 0;") + val colVars = output.indices.map(i => ctx.freshName("colInstance" + i)) + val columnAssigns = colVars.zipWithIndex.map { case (name, i) => + ctx.addMutableState(columnVectorClz, name, s"$name = null;") + s"$name = ${batch}.column($i);" } - val exprs = output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, true)) val row = ctx.freshName("row") val numOutputRows = metricTerm(ctx, "numOutputRows") @@ -217,19 +241,22 @@ private[sql] case class DataSourceScan( // TODO: The abstractions between this class and SqlNewHadoopRDD makes it difficult to know // here which path to use. Fix this. - ctx.INPUT_ROW = row ctx.currentVars = null - val columns1 = exprs.map(_.gen(ctx)) + val columns1 = (output zip colVars).map { case (attr, colVar) => + genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) } val scanBatches = ctx.freshName("processBatches") ctx.addNewFunction(scanBatches, s""" | private void $scanBatches() throws java.io.IOException { | while (true) { | int numRows = $batch.numRows(); - | if ($idx == 0) $numOutputRows.add(numRows); + | if ($idx == 0) { + | ${columnAssigns.mkString("", "\n", "\n")} + | $numOutputRows.add(numRows); + | } | | while (!shouldStop() && $idx < numRows) { - | InternalRow $row = $batch.getRow($idx++); + | int $rowidx = $idx++; | ${consume(ctx, columns1).trim} | } | if (shouldStop()) return; @@ -243,9 +270,10 @@ private[sql] case class DataSourceScan( | } | }""".stripMargin) + val exprRows = output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, true)) ctx.INPUT_ROW = row ctx.currentVars = null - val columns2 = exprs.map(_.gen(ctx)) + val columns2 = exprRows.map(_.gen(ctx)) val inputRow = if (outputUnsafeRows) row else null val scanRows = ctx.freshName("processRows") ctx.addNewFunction(scanRows, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala index f42c7546c4..88fcfce0ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala @@ -68,10 +68,10 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex assert(batch.numRows() == n) var i = 0 while (i < n) { - assert(batch.column(0).getIsNull(i)) - assert(batch.column(1).getIsNull(i)) - assert(batch.column(2).getIsNull(i)) - assert(batch.column(3).getIsNull(i)) + assert(batch.column(0).isNullAt(i)) + assert(batch.column(1).isNullAt(i)) + assert(batch.column(2).isNullAt(i)) + assert(batch.column(3).isNullAt(i)) i += 1 } reader.close() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala index 070c4004c4..cc0cc65d3e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala @@ -101,7 +101,7 @@ object ParquetReadBenchmark { val numRows = batch.numRows() var i = 0 while (i < numRows) { - if (!col.getIsNull(i)) sum += col.getInt(i) + if (!col.isNullAt(i)) sum += col.getInt(i) i += 1 } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index fa2c74431a..4262097e8f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -68,7 +68,7 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(column.numNulls() == 4) reference.zipWithIndex.foreach { v => - assert(v._1 == column.getIsNull(v._2)) + assert(v._1 == column.isNullAt(v._2)) if (memMode == MemoryMode.OFF_HEAP) { val addr = column.nullsNativeAddress() assert(v._1 == (Platform.getByte(null, addr + v._2) == 1), "index=" + v._2) @@ -489,10 +489,10 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(batch.rowIterator().hasNext == true) assert(batch.column(0).getInt(0) == 1) - assert(batch.column(0).getIsNull(0) == false) + assert(batch.column(0).isNullAt(0) == false) assert(batch.column(1).getDouble(0) == 1.1) - assert(batch.column(1).getIsNull(0) == false) - assert(batch.column(2).getIsNull(0) == true) + assert(batch.column(1).isNullAt(0) == false) + assert(batch.column(2).isNullAt(0) == true) assert(batch.column(3).getUTF8String(0).toString == "Hello") // Verify the iterator works correctly. |