diff options
author | Sameer Agarwal <sameer@databricks.com> | 2016-06-23 18:21:41 -0700 |
---|---|---|
committer | Herman van Hovell <hvanhovell@databricks.com> | 2016-06-23 18:21:41 -0700 |
commit | cc71d4fa372f6eb187c68dbd8358de4003ace3fe (patch) | |
tree | be792fbeb6b1b3b64250d70af475ed82e460feff /sql/core/src | |
parent | 264bc63623b20529abcf84abcb333e7c16ad1ef9 (diff) | |
download | spark-cc71d4fa372f6eb187c68dbd8358de4003ace3fe.tar.gz spark-cc71d4fa372f6eb187c68dbd8358de4003ace3fe.tar.bz2 spark-cc71d4fa372f6eb187c68dbd8358de4003ace3fe.zip |
[SPARK-16123] Avoid NegativeArraySizeException while reserving additional capacity in VectorizedColumnReader
## What changes were proposed in this pull request?
This patch fixes an overflow bug in vectorized parquet reader where both off-heap and on-heap variants of `ColumnVector.reserve()` can unfortunately overflow while reserving additional capacity during reads.
## How was this patch tested?
Manual Tests
Author: Sameer Agarwal <sameer@databricks.com>
Closes #13832 from sameeragarwal/negative-array.
Diffstat (limited to 'sql/core/src')
4 files changed, 47 insertions, 14 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index 3f94255256..80c84b1336 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.vectorized; import java.math.BigDecimal; import java.math.BigInteger; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.NotImplementedException; import org.apache.parquet.column.Dictionary; import org.apache.parquet.io.api.Binary; @@ -27,6 +28,7 @@ import org.apache.spark.memory.MemoryMode; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.util.ArrayData; import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.*; import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; @@ -277,11 +279,25 @@ public abstract class ColumnVector implements AutoCloseable { */ public abstract void close(); - /* + public void reserve(int requiredCapacity) { + if (requiredCapacity > capacity) { + int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L); + if (requiredCapacity <= newCapacity) { + reserveInternal(newCapacity); + } else { + throw new RuntimeException("Cannot reserve more than " + newCapacity + + " bytes in the vectorized reader (requested = " + requiredCapacity + " bytes). As a " + + "workaround, you can disable the vectorized reader by setting " + + SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key() + " to false."); + } + } + } + + /** * Ensures that there is enough storage to store capcity elements. That is, the put() APIs * must work for all rowIds < capcity. */ - public abstract void reserve(int capacity); + protected abstract void reserveInternal(int capacity); /** * Returns the number of nulls in this column. @@ -847,6 +863,12 @@ public abstract class ColumnVector implements AutoCloseable { protected int capacity; /** + * Upper limit for the maximum capacity for this column. + */ + @VisibleForTesting + protected int MAX_CAPACITY = Integer.MAX_VALUE; + + /** * Data type for this column. */ protected final DataType type; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 70b4a68331..913a05a0aa 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -422,13 +422,9 @@ public final class OffHeapColumnVector extends ColumnVector { array.byteArrayOffset = 0; } - @Override - public void reserve(int requiredCapacity) { - if (requiredCapacity > capacity) reserveInternal(requiredCapacity * 2); - } - // Split out the slow path. - private void reserveInternal(int newCapacity) { + @Override + protected void reserveInternal(int newCapacity) { if (this.resultArray != null) { this.lengthData = Platform.reallocateMemory(lengthData, elementsAppended * 4, newCapacity * 4); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 7fb7617050..85067df4eb 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -392,13 +392,9 @@ public final class OnHeapColumnVector extends ColumnVector { return result; } - @Override - public void reserve(int requiredCapacity) { - if (requiredCapacity > capacity) reserveInternal(requiredCapacity * 2); - } - // Spilt this function out since it is the slow path. - private void reserveInternal(int newCapacity) { + @Override + protected void reserveInternal(int newCapacity) { if (this.resultArray != null || DecimalType.isByteArrayDecimalType(type)) { int[] newLengths = new int[newCapacity]; int[] newOffsets = new int[newCapacity]; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index 7e576a8657..100cc4daca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -787,4 +787,23 @@ class ColumnarBatchSuite extends SparkFunSuite { } } } + + test("exceeding maximum capacity should throw an error") { + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => + val column = ColumnVector.allocate(1, ByteType, memMode) + column.MAX_CAPACITY = 15 + column.appendBytes(5, 0.toByte) + // Successfully allocate twice the requested capacity + assert(column.capacity == 10) + column.appendBytes(10, 0.toByte) + // Allocated capacity doesn't exceed MAX_CAPACITY + assert(column.capacity == 15) + val ex = intercept[RuntimeException] { + // Over-allocating beyond MAX_CAPACITY throws an exception + column.appendBytes(10, 0.toByte) + } + assert(ex.getMessage.contains(s"Cannot reserve more than ${column.MAX_CAPACITY} bytes in " + + s"the vectorized reader")) + } + } } |