diff options
author | tedyu <yuzhihong@gmail.com> | 2016-04-08 12:25:36 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-04-08 12:25:36 -0700 |
commit | 02757535b58069ce8258108d89d8172a53c358e5 (patch) | |
tree | b22c90e2ea545395e280efe704386a6a462437de /sql/core/src/main/java | |
parent | 56af8e85cca056096fe4e765d8d287e0f9efc0d2 (diff) | |
download | spark-02757535b58069ce8258108d89d8172a53c358e5.tar.gz spark-02757535b58069ce8258108d89d8172a53c358e5.tar.bz2 spark-02757535b58069ce8258108d89d8172a53c358e5.zip |
[SPARK-14448] Improvements to ColumnVector
## What changes were proposed in this pull request?
In this PR, two changes are proposed for ColumnVector :
1. ColumnVector should be declared as implementing AutoCloseable - it already has close() method
2. In OnHeapColumnVector#reserveInternal(), we only need to allocate new array when existing array is null or the length of existing array is shorter than the newCapacity.
## How was this patch tested?
Existing unit tests.
Author: tedyu <yuzhihong@gmail.com>
Closes #12225 from tedyu/master.
Diffstat (limited to 'sql/core/src/main/java')
-rw-r--r-- | sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java | 2 | ||||
-rw-r--r-- | sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java | 56 |
2 files changed, 36 insertions, 22 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index d5daaf99df..0b276e6c77 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -56,7 +56,7 @@ import org.apache.spark.unsafe.types.UTF8String; * * ColumnVectors are intended to be reused. */ -public abstract class ColumnVector { +public abstract class ColumnVector implements AutoCloseable { /** * Allocates a column to store elements of `type` on or off heap. * Capacity is the initial capacity of the vector and it will grow as necessary. Capacity is diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 708a00953a..e97276800d 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -387,35 +387,49 @@ public final class OnHeapColumnVector extends ColumnVector { arrayLengths = newLengths; arrayOffsets = newOffsets; } else if (type instanceof BooleanType) { - byte[] newData = new byte[newCapacity]; - if (byteData != null) System.arraycopy(byteData, 0, newData, 0, elementsAppended); - byteData = newData; + if (byteData == null || byteData.length < newCapacity) { + byte[] newData = new byte[newCapacity]; + if (byteData != null) System.arraycopy(byteData, 0, newData, 0, elementsAppended); + byteData = newData; + } } else if (type instanceof ByteType) { - byte[] newData = new byte[newCapacity]; - if (byteData != null) System.arraycopy(byteData, 0, newData, 0, elementsAppended); - byteData = newData; + if (byteData == null || byteData.length < newCapacity) { + byte[] newData = new byte[newCapacity]; + if (byteData != null) System.arraycopy(byteData, 0, newData, 0, elementsAppended); + byteData = newData; + } } else if (type instanceof ShortType) { - short[] newData = new short[newCapacity]; - if (shortData != null) System.arraycopy(shortData, 0, newData, 0, elementsAppended); - shortData = newData; + if (shortData == null || shortData.length < newCapacity) { + short[] newData = new short[newCapacity]; + if (shortData != null) System.arraycopy(shortData, 0, newData, 0, elementsAppended); + shortData = newData; + } } else if (type instanceof IntegerType || type instanceof DateType || DecimalType.is32BitDecimalType(type)) { - int[] newData = new int[newCapacity]; - if (intData != null) System.arraycopy(intData, 0, newData, 0, elementsAppended); - intData = newData; + if (intData == null || intData.length < newCapacity) { + int[] newData = new int[newCapacity]; + if (intData != null) System.arraycopy(intData, 0, newData, 0, elementsAppended); + intData = newData; + } } else if (type instanceof LongType || type instanceof TimestampType || DecimalType.is64BitDecimalType(type)) { - long[] newData = new long[newCapacity]; - if (longData != null) System.arraycopy(longData, 0, newData, 0, elementsAppended); - longData = newData; + if (longData == null || longData.length < newCapacity) { + long[] newData = new long[newCapacity]; + if (longData != null) System.arraycopy(longData, 0, newData, 0, elementsAppended); + longData = newData; + } } else if (type instanceof FloatType) { - float[] newData = new float[newCapacity]; - if (floatData != null) System.arraycopy(floatData, 0, newData, 0, elementsAppended); - floatData = newData; + if (floatData == null || floatData.length < newCapacity) { + float[] newData = new float[newCapacity]; + if (floatData != null) System.arraycopy(floatData, 0, newData, 0, elementsAppended); + floatData = newData; + } } else if (type instanceof DoubleType) { - double[] newData = new double[newCapacity]; - if (doubleData != null) System.arraycopy(doubleData, 0, newData, 0, elementsAppended); - doubleData = newData; + if (doubleData == null || doubleData.length < newCapacity) { + double[] newData = new double[newCapacity]; + if (doubleData != null) System.arraycopy(doubleData, 0, newData, 0, elementsAppended); + doubleData = newData; + } } else if (resultStruct != null) { // Nothing to store. } else { |