aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorSameer Agarwal <sameer@databricks.com>2016-06-23 18:21:41 -0700
committerHerman van Hovell <hvanhovell@databricks.com>2016-06-23 18:21:41 -0700
commitcc71d4fa372f6eb187c68dbd8358de4003ace3fe (patch)
treebe792fbeb6b1b3b64250d70af475ed82e460feff /sql
parent264bc63623b20529abcf84abcb333e7c16ad1ef9 (diff)
downloadspark-cc71d4fa372f6eb187c68dbd8358de4003ace3fe.tar.gz
spark-cc71d4fa372f6eb187c68dbd8358de4003ace3fe.tar.bz2
spark-cc71d4fa372f6eb187c68dbd8358de4003ace3fe.zip
[SPARK-16123] Avoid NegativeArraySizeException while reserving additional capacity in VectorizedColumnReader
## What changes were proposed in this pull request? This patch fixes an overflow bug in vectorized parquet reader where both off-heap and on-heap variants of `ColumnVector.reserve()` can unfortunately overflow while reserving additional capacity during reads. ## How was this patch tested? Manual Tests Author: Sameer Agarwal <sameer@databricks.com> Closes #13832 from sameeragarwal/negative-array.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java26
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java8
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala19
4 files changed, 47 insertions, 14 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index 3f94255256..80c84b1336 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.vectorized;
import java.math.BigDecimal;
import java.math.BigInteger;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.NotImplementedException;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.io.api.Binary;
@@ -27,6 +28,7 @@ import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
@@ -277,11 +279,25 @@ public abstract class ColumnVector implements AutoCloseable {
*/
public abstract void close();
- /*
+ public void reserve(int requiredCapacity) {
+ if (requiredCapacity > capacity) {
+ int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L);
+ if (requiredCapacity <= newCapacity) {
+ reserveInternal(newCapacity);
+ } else {
+ throw new RuntimeException("Cannot reserve more than " + newCapacity +
+ " bytes in the vectorized reader (requested = " + requiredCapacity + " bytes). As a " +
+ "workaround, you can disable the vectorized reader by setting "
+ + SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key() + " to false.");
+ }
+ }
+ }
+
+ /**
* Ensures that there is enough storage to store capcity elements. That is, the put() APIs
* must work for all rowIds < capcity.
*/
- public abstract void reserve(int capacity);
+ protected abstract void reserveInternal(int capacity);
/**
* Returns the number of nulls in this column.
@@ -847,6 +863,12 @@ public abstract class ColumnVector implements AutoCloseable {
protected int capacity;
/**
+ * Upper limit for the maximum capacity for this column.
+ */
+ @VisibleForTesting
+ protected int MAX_CAPACITY = Integer.MAX_VALUE;
+
+ /**
* Data type for this column.
*/
protected final DataType type;
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 70b4a68331..913a05a0aa 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -422,13 +422,9 @@ public final class OffHeapColumnVector extends ColumnVector {
array.byteArrayOffset = 0;
}
- @Override
- public void reserve(int requiredCapacity) {
- if (requiredCapacity > capacity) reserveInternal(requiredCapacity * 2);
- }
-
// Split out the slow path.
- private void reserveInternal(int newCapacity) {
+ @Override
+ protected void reserveInternal(int newCapacity) {
if (this.resultArray != null) {
this.lengthData =
Platform.reallocateMemory(lengthData, elementsAppended * 4, newCapacity * 4);
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index 7fb7617050..85067df4eb 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -392,13 +392,9 @@ public final class OnHeapColumnVector extends ColumnVector {
return result;
}
- @Override
- public void reserve(int requiredCapacity) {
- if (requiredCapacity > capacity) reserveInternal(requiredCapacity * 2);
- }
-
// Spilt this function out since it is the slow path.
- private void reserveInternal(int newCapacity) {
+ @Override
+ protected void reserveInternal(int newCapacity) {
if (this.resultArray != null || DecimalType.isByteArrayDecimalType(type)) {
int[] newLengths = new int[newCapacity];
int[] newOffsets = new int[newCapacity];
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index 7e576a8657..100cc4daca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -787,4 +787,23 @@ class ColumnarBatchSuite extends SparkFunSuite {
}
}
}
+
+ test("exceeding maximum capacity should throw an error") {
+ (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode =>
+ val column = ColumnVector.allocate(1, ByteType, memMode)
+ column.MAX_CAPACITY = 15
+ column.appendBytes(5, 0.toByte)
+ // Successfully allocate twice the requested capacity
+ assert(column.capacity == 10)
+ column.appendBytes(10, 0.toByte)
+ // Allocated capacity doesn't exceed MAX_CAPACITY
+ assert(column.capacity == 15)
+ val ex = intercept[RuntimeException] {
+ // Over-allocating beyond MAX_CAPACITY throws an exception
+ column.appendBytes(10, 0.toByte)
+ }
+ assert(ex.getMessage.contains(s"Cannot reserve more than ${column.MAX_CAPACITY} bytes in " +
+ s"the vectorized reader"))
+ }
+ }
}