diff options
author | Wenchen Fan <wenchen@databricks.com> | 2015-10-19 11:02:26 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2015-10-19 11:02:26 -0700 |
commit | 7893cd95db5f2caba59ff5c859d7e4964ad7938d (patch) | |
tree | 6bfb747406a4e138b0f6042c51b420940a378836 /sql/core | |
parent | 4c33a34ba3167ae67fdb4978ea2166ce65638fb9 (diff) | |
download | spark-7893cd95db5f2caba59ff5c859d7e4964ad7938d.tar.gz spark-7893cd95db5f2caba59ff5c859d7e4964ad7938d.tar.bz2 spark-7893cd95db5f2caba59ff5c859d7e4964ad7938d.zip |
[SPARK-11119] [SQL] cleanup for unsafe array and map
The purpose of this PR is to keep the unsafe format detail only inside the unsafe class itself, so when we use them(like use unsafe array in unsafe map, use unsafe array and map in columnar cache), we don't need to understand the format before use them.
change list:
* unsafe array's 4-bytes numElements header is now required(was optional), and become a part of unsafe array format.
* w.r.t the previous changing, the `sizeInBytes` of unsafe array now counts the 4-bytes header.
* unsafe map's format was `[numElements] [key array numBytes] [key array content(without numElements header)] [value array content(without numElements header)]` before, which is a little hacky as it makes unsafe array's header optional. I think saving 4 bytes is not a big deal, so the format is now: `[key array numBytes] [unsafe key array] [unsafe value array]`.
* w.r.t the previous changing, the `sizeInBytes` of unsafe map now counts both map's header and array's header.
Author: Wenchen Fan <wenchen@databricks.com>
Closes #9131 from cloud-fan/unsafe.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala | 30 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala | 2 |
2 files changed, 17 insertions, 15 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala index 2bc2c96b61..a41f04dd3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala @@ -482,12 +482,14 @@ private[sql] case class STRUCT(dataType: StructType) extends ColumnType[UnsafeRo override def extract(buffer: ByteBuffer): UnsafeRow = { val sizeInBytes = buffer.getInt() assert(buffer.hasArray) - val base = buffer.array() - val offset = buffer.arrayOffset() val cursor = buffer.position() buffer.position(cursor + sizeInBytes) val unsafeRow = new UnsafeRow - unsafeRow.pointTo(base, Platform.BYTE_ARRAY_OFFSET + offset + cursor, numOfFields, sizeInBytes) + unsafeRow.pointTo( + buffer.array(), + Platform.BYTE_ARRAY_OFFSET + buffer.arrayOffset() + cursor, + numOfFields, + sizeInBytes) unsafeRow } @@ -508,12 +510,11 @@ private[sql] case class ARRAY(dataType: ArrayType) extends ColumnType[UnsafeArra override def actualSize(row: InternalRow, ordinal: Int): Int = { val unsafeArray = getField(row, ordinal) - 4 + 4 + unsafeArray.getSizeInBytes + 4 + unsafeArray.getSizeInBytes } override def append(value: UnsafeArrayData, buffer: ByteBuffer): Unit = { - buffer.putInt(4 + value.getSizeInBytes) - buffer.putInt(value.numElements()) + buffer.putInt(value.getSizeInBytes) value.writeTo(buffer) } @@ -522,10 +523,12 @@ private[sql] case class ARRAY(dataType: ArrayType) extends ColumnType[UnsafeArra assert(buffer.hasArray) val cursor = buffer.position() buffer.position(cursor + numBytes) - UnsafeReaders.readArray( + val array = new UnsafeArrayData + array.pointTo( buffer.array(), Platform.BYTE_ARRAY_OFFSET + buffer.arrayOffset() + cursor, numBytes) + array } override def clone(v: UnsafeArrayData): UnsafeArrayData = v.copy() @@ -545,15 +548,12 @@ private[sql] case class MAP(dataType: MapType) extends ColumnType[UnsafeMapData] override def actualSize(row: InternalRow, ordinal: Int): Int = { val unsafeMap = getField(row, ordinal) - 12 + unsafeMap.keyArray().getSizeInBytes + unsafeMap.valueArray().getSizeInBytes + 4 + unsafeMap.getSizeInBytes } override def append(value: UnsafeMapData, buffer: ByteBuffer): Unit = { - buffer.putInt(8 + value.keyArray().getSizeInBytes + value.valueArray().getSizeInBytes) - buffer.putInt(value.numElements()) - buffer.putInt(value.keyArray().getSizeInBytes) - value.keyArray().writeTo(buffer) - value.valueArray().writeTo(buffer) + buffer.putInt(value.getSizeInBytes) + value.writeTo(buffer) } override def extract(buffer: ByteBuffer): UnsafeMapData = { @@ -561,10 +561,12 @@ private[sql] case class MAP(dataType: MapType) extends ColumnType[UnsafeMapData] assert(buffer.hasArray) val cursor = buffer.position() buffer.position(cursor + numBytes) - UnsafeReaders.readMap( + val map = new UnsafeMapData + map.pointTo( buffer.array(), Platform.BYTE_ARRAY_OFFSET + buffer.arrayOffset() + cursor, numBytes) + map } override def clone(v: UnsafeMapData): UnsafeMapData = v.copy() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala index 0e6e1bcf72..63bc39bfa0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala @@ -73,7 +73,7 @@ class ColumnTypeSuite extends SparkFunSuite with Logging { checkActualSize(COMPACT_DECIMAL(15, 10), Decimal(0, 15, 10), 8) checkActualSize(LARGE_DECIMAL(20, 10), Decimal(0, 20, 10), 5) checkActualSize(ARRAY_TYPE, Array[Any](1), 16) - checkActualSize(MAP_TYPE, Map(1 -> "a"), 25) + checkActualSize(MAP_TYPE, Map(1 -> "a"), 29) checkActualSize(STRUCT_TYPE, Row("hello"), 28) } |