aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorQifan Pu <qifan.pu@gmail.com>2016-08-10 14:45:13 -0700
committerDavies Liu <davies.liu@gmail.com>2016-08-10 14:45:13 -0700
commitbf5cb8af4a649e0c7ac565891427484eab9ee5d9 (patch)
tree06b431129766ecb19c5d1c36999e3d3825383cda /sql
parent214ba66a030bc3a718c567a742b0db44bf911d61 (diff)
downloadspark-bf5cb8af4a649e0c7ac565891427484eab9ee5d9.tar.gz
spark-bf5cb8af4a649e0c7ac565891427484eab9ee5d9.tar.bz2
spark-bf5cb8af4a649e0c7ac565891427484eab9ee5d9.zip
[SPARK-16928] [SQL] Recursive call of ColumnVector::getInt() breaks JIT inlining
## What changes were proposed in this pull request? In both `OnHeapColumnVector` and `OffHeapColumnVector`, we implemented `getInt()` with the following code pattern: ``` public int getInt(int rowId) { if (dictionary == null) { return intData[rowId]; } else { return dictionary.decodeToInt(dictionaryIds.getInt(rowId)); } } ``` As `dictionaryIds` is also a `ColumnVector`, this results in a recursive call of `getInt()` and breaks JIT inlining. As a result, `getInt()` will not get inlined. We fix this by adding a separate method `getDictId()` specific for `dictionaryIds` to use. ## How was this patch tested? We tested the difference with the following aggregate query on a TPCDS dataset (with scale factor = 5): ``` select max(ss_sold_date_sk) as max_ss_sold_date_sk, from store_sales ``` The query runtime is improved, from 202ms (before) to 159ms (after). Author: Qifan Pu <qifan.pu@gmail.com> Closes #14513 from ooq/SPARK-16928.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java22
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java11
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java23
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java23
4 files changed, 54 insertions, 25 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 6c47dc09a8..4ed59b08a4 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -221,15 +221,15 @@ public class VectorizedColumnReader {
if (column.dataType() == DataTypes.IntegerType ||
DecimalType.is32BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
- column.putInt(i, dictionary.decodeToInt(dictionaryIds.getInt(i)));
+ column.putInt(i, dictionary.decodeToInt(dictionaryIds.getDictId(i)));
}
} else if (column.dataType() == DataTypes.ByteType) {
for (int i = rowId; i < rowId + num; ++i) {
- column.putByte(i, (byte) dictionary.decodeToInt(dictionaryIds.getInt(i)));
+ column.putByte(i, (byte) dictionary.decodeToInt(dictionaryIds.getDictId(i)));
}
} else if (column.dataType() == DataTypes.ShortType) {
for (int i = rowId; i < rowId + num; ++i) {
- column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getInt(i)));
+ column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getDictId(i)));
}
} else {
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
@@ -240,7 +240,7 @@ public class VectorizedColumnReader {
if (column.dataType() == DataTypes.LongType ||
DecimalType.is64BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
- column.putLong(i, dictionary.decodeToLong(dictionaryIds.getInt(i)));
+ column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i)));
}
} else {
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
@@ -249,20 +249,20 @@ public class VectorizedColumnReader {
case FLOAT:
for (int i = rowId; i < rowId + num; ++i) {
- column.putFloat(i, dictionary.decodeToFloat(dictionaryIds.getInt(i)));
+ column.putFloat(i, dictionary.decodeToFloat(dictionaryIds.getDictId(i)));
}
break;
case DOUBLE:
for (int i = rowId; i < rowId + num; ++i) {
- column.putDouble(i, dictionary.decodeToDouble(dictionaryIds.getInt(i)));
+ column.putDouble(i, dictionary.decodeToDouble(dictionaryIds.getDictId(i)));
}
break;
case INT96:
if (column.dataType() == DataTypes.TimestampType) {
for (int i = rowId; i < rowId + num; ++i) {
// TODO: Convert dictionary of Binaries to dictionary of Longs
- Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
+ Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
}
} else {
@@ -275,7 +275,7 @@ public class VectorizedColumnReader {
// and reuse it across batches. This should mean adding a ByteArray would just update
// the length and offset.
for (int i = rowId; i < rowId + num; ++i) {
- Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
+ Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
column.putByteArray(i, v.getBytes());
}
break;
@@ -283,17 +283,17 @@ public class VectorizedColumnReader {
// DecimalType written in the legacy mode
if (DecimalType.is32BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
- Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
+ Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
column.putInt(i, (int) ParquetRowConverter.binaryToUnscaledLong(v));
}
} else if (DecimalType.is64BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
- Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
+ Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
column.putLong(i, ParquetRowConverter.binaryToUnscaledLong(v));
}
} else if (DecimalType.isByteArrayDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
- Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
+ Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
column.putByteArray(i, v.getBytes());
}
} else {
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index 59173d253b..a7cb3b11f6 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -429,6 +429,13 @@ public abstract class ColumnVector implements AutoCloseable {
public abstract int getInt(int rowId);
/**
+ * Returns the dictionary Id for rowId.
+ * This should only be called when the ColumnVector is dictionaryIds.
+ * We have this separate method for dictionaryIds as per SPARK-16928.
+ */
+ public abstract int getDictId(int rowId);
+
+ /**
* Sets the value at rowId to `value`.
*/
public abstract void putLong(int rowId, long value);
@@ -615,7 +622,7 @@ public abstract class ColumnVector implements AutoCloseable {
ColumnVector.Array a = getByteArray(rowId);
return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length);
} else {
- Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(rowId));
+ Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
return UTF8String.fromBytes(v.getBytes());
}
}
@@ -630,7 +637,7 @@ public abstract class ColumnVector implements AutoCloseable {
System.arraycopy(array.byteArray, array.byteArrayOffset, bytes, 0, bytes.length);
return bytes;
} else {
- Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(rowId));
+ Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
return v.getBytes();
}
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 913a05a0aa..12fa109cec 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -161,7 +161,7 @@ public final class OffHeapColumnVector extends ColumnVector {
if (dictionary == null) {
return Platform.getByte(null, data + rowId);
} else {
- return (byte) dictionary.decodeToInt(dictionaryIds.getInt(rowId));
+ return (byte) dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
}
}
@@ -193,7 +193,7 @@ public final class OffHeapColumnVector extends ColumnVector {
if (dictionary == null) {
return Platform.getShort(null, data + 2 * rowId);
} else {
- return (short) dictionary.decodeToInt(dictionaryIds.getInt(rowId));
+ return (short) dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
}
}
@@ -240,10 +240,21 @@ public final class OffHeapColumnVector extends ColumnVector {
if (dictionary == null) {
return Platform.getInt(null, data + 4 * rowId);
} else {
- return dictionary.decodeToInt(dictionaryIds.getInt(rowId));
+ return dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
}
}
+ /**
+ * Returns the dictionary Id for rowId.
+ * This should only be called when the ColumnVector is dictionaryIds.
+ * We have this separate method for dictionaryIds as per SPARK-16928.
+ */
+ public int getDictId(int rowId) {
+ assert(dictionary == null)
+ : "A ColumnVector dictionary should not have a dictionary for itself.";
+ return Platform.getInt(null, data + 4 * rowId);
+ }
+
//
// APIs dealing with Longs
//
@@ -287,7 +298,7 @@ public final class OffHeapColumnVector extends ColumnVector {
if (dictionary == null) {
return Platform.getLong(null, data + 8 * rowId);
} else {
- return dictionary.decodeToLong(dictionaryIds.getInt(rowId));
+ return dictionary.decodeToLong(dictionaryIds.getDictId(rowId));
}
}
@@ -333,7 +344,7 @@ public final class OffHeapColumnVector extends ColumnVector {
if (dictionary == null) {
return Platform.getFloat(null, data + rowId * 4);
} else {
- return dictionary.decodeToFloat(dictionaryIds.getInt(rowId));
+ return dictionary.decodeToFloat(dictionaryIds.getDictId(rowId));
}
}
@@ -380,7 +391,7 @@ public final class OffHeapColumnVector extends ColumnVector {
if (dictionary == null) {
return Platform.getDouble(null, data + rowId * 8);
} else {
- return dictionary.decodeToDouble(dictionaryIds.getInt(rowId));
+ return dictionary.decodeToDouble(dictionaryIds.getDictId(rowId));
}
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index 85067df4eb..9b410bacff 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -158,7 +158,7 @@ public final class OnHeapColumnVector extends ColumnVector {
if (dictionary == null) {
return byteData[rowId];
} else {
- return (byte) dictionary.decodeToInt(dictionaryIds.getInt(rowId));
+ return (byte) dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
}
}
@@ -188,7 +188,7 @@ public final class OnHeapColumnVector extends ColumnVector {
if (dictionary == null) {
return shortData[rowId];
} else {
- return (short) dictionary.decodeToInt(dictionaryIds.getInt(rowId));
+ return (short) dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
}
}
@@ -230,10 +230,21 @@ public final class OnHeapColumnVector extends ColumnVector {
if (dictionary == null) {
return intData[rowId];
} else {
- return dictionary.decodeToInt(dictionaryIds.getInt(rowId));
+ return dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
}
}
+ /**
+ * Returns the dictionary Id for rowId.
+ * This should only be called when the ColumnVector is dictionaryIds.
+ * We have this separate method for dictionaryIds as per SPARK-16928.
+ */
+ public int getDictId(int rowId) {
+ assert(dictionary == null)
+ : "A ColumnVector dictionary should not have a dictionary for itself.";
+ return intData[rowId];
+ }
+
//
// APIs dealing with Longs
//
@@ -271,7 +282,7 @@ public final class OnHeapColumnVector extends ColumnVector {
if (dictionary == null) {
return longData[rowId];
} else {
- return dictionary.decodeToLong(dictionaryIds.getInt(rowId));
+ return dictionary.decodeToLong(dictionaryIds.getDictId(rowId));
}
}
@@ -310,7 +321,7 @@ public final class OnHeapColumnVector extends ColumnVector {
if (dictionary == null) {
return floatData[rowId];
} else {
- return dictionary.decodeToFloat(dictionaryIds.getInt(rowId));
+ return dictionary.decodeToFloat(dictionaryIds.getDictId(rowId));
}
}
@@ -351,7 +362,7 @@ public final class OnHeapColumnVector extends ColumnVector {
if (dictionary == null) {
return doubleData[rowId];
} else {
- return dictionary.decodeToDouble(dictionaryIds.getInt(rowId));
+ return dictionary.decodeToDouble(dictionaryIds.getDictId(rowId));
}
}