aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authortedyu <yuzhihong@gmail.com>2016-04-08 12:25:36 -0700
committerDavies Liu <davies.liu@gmail.com>2016-04-08 12:25:36 -0700
commit02757535b58069ce8258108d89d8172a53c358e5 (patch)
treeb22c90e2ea545395e280efe704386a6a462437de /sql
parent56af8e85cca056096fe4e765d8d287e0f9efc0d2 (diff)
downloadspark-02757535b58069ce8258108d89d8172a53c358e5.tar.gz
spark-02757535b58069ce8258108d89d8172a53c358e5.tar.bz2
spark-02757535b58069ce8258108d89d8172a53c358e5.zip
[SPARK-14448] Improvements to ColumnVector
## What changes were proposed in this pull request? In this PR, two changes are proposed for ColumnVector : 1. ColumnVector should be declared as implementing AutoCloseable - it already has close() method 2. In OnHeapColumnVector#reserveInternal(), we only need to allocate new array when existing array is null or the length of existing array is shorter than the newCapacity. ## How was this patch tested? Existing unit tests. Author: tedyu <yuzhihong@gmail.com> Closes #12225 from tedyu/master.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java2
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java56
2 files changed, 36 insertions, 22 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index d5daaf99df..0b276e6c77 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -56,7 +56,7 @@ import org.apache.spark.unsafe.types.UTF8String;
*
* ColumnVectors are intended to be reused.
*/
-public abstract class ColumnVector {
+public abstract class ColumnVector implements AutoCloseable {
/**
* Allocates a column to store elements of `type` on or off heap.
* Capacity is the initial capacity of the vector and it will grow as necessary. Capacity is
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index 708a00953a..e97276800d 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -387,35 +387,49 @@ public final class OnHeapColumnVector extends ColumnVector {
arrayLengths = newLengths;
arrayOffsets = newOffsets;
} else if (type instanceof BooleanType) {
- byte[] newData = new byte[newCapacity];
- if (byteData != null) System.arraycopy(byteData, 0, newData, 0, elementsAppended);
- byteData = newData;
+ if (byteData == null || byteData.length < newCapacity) {
+ byte[] newData = new byte[newCapacity];
+ if (byteData != null) System.arraycopy(byteData, 0, newData, 0, elementsAppended);
+ byteData = newData;
+ }
} else if (type instanceof ByteType) {
- byte[] newData = new byte[newCapacity];
- if (byteData != null) System.arraycopy(byteData, 0, newData, 0, elementsAppended);
- byteData = newData;
+ if (byteData == null || byteData.length < newCapacity) {
+ byte[] newData = new byte[newCapacity];
+ if (byteData != null) System.arraycopy(byteData, 0, newData, 0, elementsAppended);
+ byteData = newData;
+ }
} else if (type instanceof ShortType) {
- short[] newData = new short[newCapacity];
- if (shortData != null) System.arraycopy(shortData, 0, newData, 0, elementsAppended);
- shortData = newData;
+ if (shortData == null || shortData.length < newCapacity) {
+ short[] newData = new short[newCapacity];
+ if (shortData != null) System.arraycopy(shortData, 0, newData, 0, elementsAppended);
+ shortData = newData;
+ }
} else if (type instanceof IntegerType || type instanceof DateType ||
DecimalType.is32BitDecimalType(type)) {
- int[] newData = new int[newCapacity];
- if (intData != null) System.arraycopy(intData, 0, newData, 0, elementsAppended);
- intData = newData;
+ if (intData == null || intData.length < newCapacity) {
+ int[] newData = new int[newCapacity];
+ if (intData != null) System.arraycopy(intData, 0, newData, 0, elementsAppended);
+ intData = newData;
+ }
} else if (type instanceof LongType || type instanceof TimestampType ||
DecimalType.is64BitDecimalType(type)) {
- long[] newData = new long[newCapacity];
- if (longData != null) System.arraycopy(longData, 0, newData, 0, elementsAppended);
- longData = newData;
+ if (longData == null || longData.length < newCapacity) {
+ long[] newData = new long[newCapacity];
+ if (longData != null) System.arraycopy(longData, 0, newData, 0, elementsAppended);
+ longData = newData;
+ }
} else if (type instanceof FloatType) {
- float[] newData = new float[newCapacity];
- if (floatData != null) System.arraycopy(floatData, 0, newData, 0, elementsAppended);
- floatData = newData;
+ if (floatData == null || floatData.length < newCapacity) {
+ float[] newData = new float[newCapacity];
+ if (floatData != null) System.arraycopy(floatData, 0, newData, 0, elementsAppended);
+ floatData = newData;
+ }
} else if (type instanceof DoubleType) {
- double[] newData = new double[newCapacity];
- if (doubleData != null) System.arraycopy(doubleData, 0, newData, 0, elementsAppended);
- doubleData = newData;
+ if (doubleData == null || doubleData.length < newCapacity) {
+ double[] newData = new double[newCapacity];
+ if (doubleData != null) System.arraycopy(doubleData, 0, newData, 0, elementsAppended);
+ doubleData = newData;
+ }
} else if (resultStruct != null) {
// Nothing to store.
} else {