aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKazuaki Ishizaki <ishizaki@jp.ibm.com>2016-03-21 14:36:51 -0700
committerDavies Liu <davies.liu@gmail.com>2016-03-21 14:36:51 -0700
commitf35df7d1820738cc1dac81271041707010e2f08f (patch)
treea83a305038d27b205c98b3ba987b1ed14284d50a
parent9b4e15ba13f62cff302d978093633fc3181a8475 (diff)
downloadspark-f35df7d1820738cc1dac81271041707010e2f08f.tar.gz
spark-f35df7d1820738cc1dac81271041707010e2f08f.tar.bz2
spark-f35df7d1820738cc1dac81271041707010e2f08f.zip
[SPARK-13805] [SQL] Generate code that get a value in each column from ColumnVector when ColumnarBatch is used
## What changes were proposed in this pull request? This PR generates code that get a value in each column from ```ColumnVector``` instead of creating ```InternalRow``` when ```ColumnarBatch``` is accessed. This PR improves benchmark program by up to 15%. This PR consists of two parts: 1. Get an ```ColumnVector ``` by using ```ColumnarBatch.column()``` method 2. Get a value of each column by using ```rdd_col${COLIDX}.getInt(ROWIDX)``` instead of ```rdd_row.getInt(COLIDX)``` This is a motivated example. ```` sqlContext.conf.setConfString(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, "true") sqlContext.conf.setConfString(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") val values = 10 withTempPath { dir => withTempTable("t1", "tempTable") { sqlContext.range(values).registerTempTable("t1") sqlContext.sql("select id % 2 as p, cast(id as INT) as id from t1") .write.partitionBy("p").parquet(dir.getCanonicalPath) sqlContext.read.parquet(dir.getCanonicalPath).registerTempTable("tempTable") sqlContext.sql("select sum(p) from tempTable").collect } } ```` The original code ````java ... /* 072 */ while (!shouldStop() && rdd_batchIdx < numRows) { /* 073 */ InternalRow rdd_row = rdd_batch.getRow(rdd_batchIdx++); /* 074 */ /*** CONSUME: TungstenAggregate(key=[], functions=[(sum(cast(p#4 as bigint)),mode=Partial,isDistinct=false)], output=[sum#10L]) */ /* 075 */ /* input[0, int] */ /* 076 */ boolean rdd_isNull = rdd_row.isNullAt(0); /* 077 */ int rdd_value = rdd_isNull ? -1 : (rdd_row.getInt(0)); ... ```` The code generated by this PR ````java /* 072 */ while (!shouldStop() && rdd_batchIdx < numRows) { /* 073 */ org.apache.spark.sql.execution.vectorized.ColumnVector rdd_col0 = rdd_batch.column(0); /* 074 */ /*** CONSUME: TungstenAggregate(key=[], functions=[(sum(cast(p#4 as bigint)),mode=Partial,isDistinct=false)], output=[sum#10L]) */ /* 075 */ /* input[0, int] */ /* 076 */ boolean rdd_isNull = rdd_col0.getIsNull(rdd_batchIdx); /* 077 */ int rdd_value = rdd_isNull ? -1 : (rdd_col0.getInt(rdd_batchIdx)); ... /* 128 */ rdd_batchIdx++; /* 129 */ } /* 130 */ if (shouldStop()) return; ```` Performance Without this PR ```` model name : Intel(R) Xeon(R) CPU E5-2667 v2 3.30GHz Partitioned Table: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- Read data column 434 / 488 36.3 27.6 1.0X Read partition column 302 / 346 52.1 19.2 1.4X Read both columns 588 / 643 26.8 37.4 0.7X ```` With this PR ```` model name : Intel(R) Xeon(R) CPU E5-2667 v2 3.30GHz Partitioned Table: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- Read data column 392 / 516 40.1 24.9 1.0X Read partition column 256 / 318 61.4 16.3 1.5X Read both columns 523 / 539 30.1 33.3 0.7X ```` ## How was this patch tested? Tested by existing test suites and benchmark Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes #11636 from kiszk/SPARK-13805.
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java40
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java2
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java4
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java2
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala42
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala8
9 files changed, 77 insertions, 33 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index 04adf1fb6d..13bf4c5c77 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -109,62 +109,62 @@ public abstract class ColumnVector {
if (dt instanceof BooleanType) {
for (int i = 0; i < length; i++) {
- if (!data.getIsNull(offset + i)) {
+ if (!data.isNullAt(offset + i)) {
list[i] = data.getBoolean(offset + i);
}
}
} else if (dt instanceof ByteType) {
for (int i = 0; i < length; i++) {
- if (!data.getIsNull(offset + i)) {
+ if (!data.isNullAt(offset + i)) {
list[i] = data.getByte(offset + i);
}
}
} else if (dt instanceof ShortType) {
for (int i = 0; i < length; i++) {
- if (!data.getIsNull(offset + i)) {
+ if (!data.isNullAt(offset + i)) {
list[i] = data.getShort(offset + i);
}
}
} else if (dt instanceof IntegerType) {
for (int i = 0; i < length; i++) {
- if (!data.getIsNull(offset + i)) {
+ if (!data.isNullAt(offset + i)) {
list[i] = data.getInt(offset + i);
}
}
} else if (dt instanceof FloatType) {
for (int i = 0; i < length; i++) {
- if (!data.getIsNull(offset + i)) {
+ if (!data.isNullAt(offset + i)) {
list[i] = data.getFloat(offset + i);
}
}
} else if (dt instanceof DoubleType) {
for (int i = 0; i < length; i++) {
- if (!data.getIsNull(offset + i)) {
+ if (!data.isNullAt(offset + i)) {
list[i] = data.getDouble(offset + i);
}
}
} else if (dt instanceof LongType) {
for (int i = 0; i < length; i++) {
- if (!data.getIsNull(offset + i)) {
+ if (!data.isNullAt(offset + i)) {
list[i] = data.getLong(offset + i);
}
}
} else if (dt instanceof DecimalType) {
DecimalType decType = (DecimalType)dt;
for (int i = 0; i < length; i++) {
- if (!data.getIsNull(offset + i)) {
+ if (!data.isNullAt(offset + i)) {
list[i] = getDecimal(i, decType.precision(), decType.scale());
}
}
} else if (dt instanceof StringType) {
for (int i = 0; i < length; i++) {
- if (!data.getIsNull(offset + i)) {
+ if (!data.isNullAt(offset + i)) {
list[i] = getUTF8String(i).toString();
}
}
} else if (dt instanceof CalendarIntervalType) {
for (int i = 0; i < length; i++) {
- if (!data.getIsNull(offset + i)) {
+ if (!data.isNullAt(offset + i)) {
list[i] = getInterval(i);
}
}
@@ -175,7 +175,7 @@ public abstract class ColumnVector {
}
@Override
- public boolean isNullAt(int ordinal) { return data.getIsNull(offset + ordinal); }
+ public boolean isNullAt(int ordinal) { return data.isNullAt(offset + ordinal); }
@Override
public boolean getBoolean(int ordinal) {
@@ -314,7 +314,7 @@ public abstract class ColumnVector {
/**
* Returns whether the value at rowId is NULL.
*/
- public abstract boolean getIsNull(int rowId);
+ public abstract boolean isNullAt(int rowId);
/**
* Sets the value at rowId to `value`.
@@ -501,6 +501,15 @@ public abstract class ColumnVector {
}
/**
+ * Returns a utility object to get structs.
+ * provided to keep API compabilitity with InternalRow for code generation
+ */
+ public ColumnarBatch.Row getStruct(int rowId, int size) {
+ resultStruct.rowId = rowId;
+ return resultStruct;
+ }
+
+ /**
* Returns the array at rowid.
*/
public final Array getArray(int rowId) {
@@ -532,6 +541,13 @@ public abstract class ColumnVector {
}
/**
+ * Returns the value for rowId.
+ */
+ public MapData getMap(int ordinal) {
+ throw new NotImplementedException();
+ }
+
+ /**
* Returns the decimal for rowId.
*/
public final Decimal getDecimal(int rowId, int precision, int scale) {
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
index b084eda6f8..2dc57dc50d 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
@@ -105,7 +105,7 @@ public class ColumnVectorUtils {
int[] result = new int[array.length];
ColumnVector data = array.data;
for (int i = 0; i < result.length; i++) {
- if (data.getIsNull(array.offset + i)) {
+ if (data.isNullAt(array.offset + i)) {
throw new RuntimeException("Cannot handle NULL values.");
}
result[i] = data.getInt(array.offset + i);
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
index b6fa9a0b9e..7ab4cda5a4 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
@@ -168,7 +168,7 @@ public final class ColumnarBatch {
}
@Override
- public boolean isNullAt(int ordinal) { return columns[ordinal].getIsNull(rowId); }
+ public boolean isNullAt(int ordinal) { return columns[ordinal].isNullAt(rowId); }
@Override
public boolean getBoolean(int ordinal) { return columns[ordinal].getBoolean(rowId); }
@@ -295,7 +295,7 @@ public final class ColumnarBatch {
for (int ordinal : nullFilteredColumns) {
if (columns[ordinal].numNulls != 0) {
for (int rowId = 0; rowId < numRows; rowId++) {
- if (!filteredRows[rowId] && columns[ordinal].getIsNull(rowId)) {
+ if (!filteredRows[rowId] && columns[ordinal].isNullAt(rowId)) {
filteredRows[rowId] = true;
++numRowsFiltered;
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index d5a9163274..689e6a2a6d 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -109,7 +109,7 @@ public final class OffHeapColumnVector extends ColumnVector {
}
@Override
- public boolean getIsNull(int rowId) {
+ public boolean isNullAt(int rowId) {
return Platform.getByte(null, nulls + rowId) == 1;
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index 5b671a7432..f332e87016 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -98,7 +98,7 @@ public final class OnHeapColumnVector extends ColumnVector {
}
@Override
- public boolean getIsNull(int rowId) {
+ public boolean isNullAt(int rowId) {
return nulls[rowId] == 1;
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index e97c6be7f1..b4348d39c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -22,9 +22,10 @@ import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.catalyst.util.toCommentSafeString
import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetSource}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
@@ -195,19 +196,42 @@ private[sql] case class DataSourceScan(
rdd :: Nil
}
+ private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String,
+ dataType: DataType, nullable: Boolean): ExprCode = {
+ val javaType = ctx.javaType(dataType)
+ val value = ctx.getValue(columnVar, dataType, ordinal)
+ val isNullVar = if (nullable) { ctx.freshName("isNull") } else { "false" }
+ val valueVar = ctx.freshName("value")
+ val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]"
+ val code = s"/* ${toCommentSafeString(str)} */\n" + (if (nullable) {
+ s"""
+ boolean ${isNullVar} = ${columnVar}.isNullAt($ordinal);
+ $javaType ${valueVar} = ${isNullVar} ? ${ctx.defaultValue(dataType)} : ($value);
+ """
+ } else {
+ s"$javaType ${valueVar} = $value;"
+ }).trim
+ ExprCode(code, isNullVar, valueVar)
+ }
+
// Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen
// never requires UnsafeRow as input.
override protected def doProduce(ctx: CodegenContext): String = {
val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch"
+ val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector"
val input = ctx.freshName("input")
val idx = ctx.freshName("batchIdx")
+ val rowidx = ctx.freshName("rowIdx")
val batch = ctx.freshName("batch")
// PhysicalRDD always just has one input
ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;")
ctx.addMutableState("int", idx, s"$idx = 0;")
+ val colVars = output.indices.map(i => ctx.freshName("colInstance" + i))
+ val columnAssigns = colVars.zipWithIndex.map { case (name, i) =>
+ ctx.addMutableState(columnVectorClz, name, s"$name = null;")
+ s"$name = ${batch}.column($i);" }
- val exprs = output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, true))
val row = ctx.freshName("row")
val numOutputRows = metricTerm(ctx, "numOutputRows")
@@ -217,19 +241,22 @@ private[sql] case class DataSourceScan(
// TODO: The abstractions between this class and SqlNewHadoopRDD makes it difficult to know
// here which path to use. Fix this.
- ctx.INPUT_ROW = row
ctx.currentVars = null
- val columns1 = exprs.map(_.gen(ctx))
+ val columns1 = (output zip colVars).map { case (attr, colVar) =>
+ genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) }
val scanBatches = ctx.freshName("processBatches")
ctx.addNewFunction(scanBatches,
s"""
| private void $scanBatches() throws java.io.IOException {
| while (true) {
| int numRows = $batch.numRows();
- | if ($idx == 0) $numOutputRows.add(numRows);
+ | if ($idx == 0) {
+ | ${columnAssigns.mkString("", "\n", "\n")}
+ | $numOutputRows.add(numRows);
+ | }
|
| while (!shouldStop() && $idx < numRows) {
- | InternalRow $row = $batch.getRow($idx++);
+ | int $rowidx = $idx++;
| ${consume(ctx, columns1).trim}
| }
| if (shouldStop()) return;
@@ -243,9 +270,10 @@ private[sql] case class DataSourceScan(
| }
| }""".stripMargin)
+ val exprRows = output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, true))
ctx.INPUT_ROW = row
ctx.currentVars = null
- val columns2 = exprs.map(_.gen(ctx))
+ val columns2 = exprRows.map(_.gen(ctx))
val inputRow = if (outputUnsafeRows) row else null
val scanRows = ctx.freshName("processRows")
ctx.addNewFunction(scanRows,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
index f42c7546c4..88fcfce0ec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
@@ -68,10 +68,10 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex
assert(batch.numRows() == n)
var i = 0
while (i < n) {
- assert(batch.column(0).getIsNull(i))
- assert(batch.column(1).getIsNull(i))
- assert(batch.column(2).getIsNull(i))
- assert(batch.column(3).getIsNull(i))
+ assert(batch.column(0).isNullAt(i))
+ assert(batch.column(1).isNullAt(i))
+ assert(batch.column(2).isNullAt(i))
+ assert(batch.column(3).isNullAt(i))
i += 1
}
reader.close()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
index 070c4004c4..cc0cc65d3e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
@@ -101,7 +101,7 @@ object ParquetReadBenchmark {
val numRows = batch.numRows()
var i = 0
while (i < numRows) {
- if (!col.getIsNull(i)) sum += col.getInt(i)
+ if (!col.isNullAt(i)) sum += col.getInt(i)
i += 1
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index fa2c74431a..4262097e8f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -68,7 +68,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
assert(column.numNulls() == 4)
reference.zipWithIndex.foreach { v =>
- assert(v._1 == column.getIsNull(v._2))
+ assert(v._1 == column.isNullAt(v._2))
if (memMode == MemoryMode.OFF_HEAP) {
val addr = column.nullsNativeAddress()
assert(v._1 == (Platform.getByte(null, addr + v._2) == 1), "index=" + v._2)
@@ -489,10 +489,10 @@ class ColumnarBatchSuite extends SparkFunSuite {
assert(batch.rowIterator().hasNext == true)
assert(batch.column(0).getInt(0) == 1)
- assert(batch.column(0).getIsNull(0) == false)
+ assert(batch.column(0).isNullAt(0) == false)
assert(batch.column(1).getDouble(0) == 1.1)
- assert(batch.column(1).getIsNull(0) == false)
- assert(batch.column(2).getIsNull(0) == true)
+ assert(batch.column(1).isNullAt(0) == false)
+ assert(batch.column(2).isNullAt(0) == true)
assert(batch.column(3).getUTF8String(0).toString == "Hello")
// Verify the iterator works correctly.