aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2015-12-04 17:02:04 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-12-04 17:02:04 -0800
commit3af53e61fd604fe8000e1fdf656d60b79c842d1c (patch)
treeb12b7a4cdb05361813fc81f93bbed924ba961822 /sql
parentf30373f5ee60f9892c28771e34b208e4f1f675a6 (diff)
downloadspark-3af53e61fd604fe8000e1fdf656d60b79c842d1c.tar.gz
spark-3af53e61fd604fe8000e1fdf656d60b79c842d1c.tar.bz2
spark-3af53e61fd604fe8000e1fdf656d60b79c842d1c.zip
[SPARK-12084][CORE] Fix codes that uses ByteBuffer.array incorrectly
`ByteBuffer` doesn't guarantee all contents in `ByteBuffer.array` are valid. E.g, a ByteBuffer returned by `ByteBuffer.slice`. We should not use the whole content of `ByteBuffer` unless we know that's correct. This patch fixed all places that use `ByteBuffer.array` incorrectly. Author: Shixiong Zhu <shixiong@databricks.com> Closes #10083 from zsxwing/bytebuffer-array.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala8
4 files changed, 14 insertions, 9 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
index dade488ca2..0cc4566c9c 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
@@ -332,12 +332,13 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
for (int n = 0; n < num; ++n) {
if (columnReaders[col].next()) {
ByteBuffer bytes = columnReaders[col].nextBinary().toByteBuffer();
- int len = bytes.limit() - bytes.position();
+ int len = bytes.remaining();
if (originalTypes[col] == OriginalType.UTF8) {
- UTF8String str = UTF8String.fromBytes(bytes.array(), bytes.position(), len);
+ UTF8String str =
+ UTF8String.fromBytes(bytes.array(), bytes.arrayOffset() + bytes.position(), len);
rowWriters[n].write(col, str);
} else {
- rowWriters[n].write(col, bytes.array(), bytes.position(), len);
+ rowWriters[n].write(col, bytes.array(), bytes.arrayOffset() + bytes.position(), len);
}
rows[n].setNotNullAt(col);
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
index 8317f648cc..45a8e03248 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
@@ -26,6 +26,7 @@ import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.twitter.chill.ResourcePool
+import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.{KryoSerializer, SerializerInstance}
import org.apache.spark.sql.types.Decimal
import org.apache.spark.util.MutablePair
@@ -76,7 +77,7 @@ private[sql] object SparkSqlSerializer {
def serialize[T: ClassTag](o: T): Array[Byte] =
acquireRelease { k =>
- k.serialize(o).array()
+ JavaUtils.bufferToArray(k.serialize(o))
}
def deserialize[T: ClassTag](bytes: Array[Byte]): T =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
index ce701fb3a7..3c5a8cb2aa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.columnar
import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
@@ -163,7 +164,9 @@ private[sql] case class InMemoryRelation(
.flatMap(_.values))
batchStats += stats
- CachedBatch(rowCount, columnBuilders.map(_.build().array()), stats)
+ CachedBatch(rowCount, columnBuilders.map { builder =>
+ JavaUtils.bufferToArray(builder.build())
+ }, stats)
}
def hasNext: Boolean = rowIterator.hasNext
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index 94298fae2d..8851bc23cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -327,8 +327,8 @@ private[parquet] class CatalystRowConverter(
// are using `Binary.toByteBuffer.array()` to steal the underlying byte array without copying
// it.
val buffer = value.toByteBuffer
- val offset = buffer.position()
- val numBytes = buffer.limit() - buffer.position()
+ val offset = buffer.arrayOffset() + buffer.position()
+ val numBytes = buffer.remaining()
updater.set(UTF8String.fromBytes(buffer.array(), offset, numBytes))
}
}
@@ -644,8 +644,8 @@ private[parquet] object CatalystRowConverter {
// copying it.
val buffer = binary.toByteBuffer
val bytes = buffer.array()
- val start = buffer.position()
- val end = buffer.limit()
+ val start = buffer.arrayOffset() + buffer.position()
+ val end = buffer.arrayOffset() + buffer.limit()
var unscaled = 0L
var i = start