aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2015-10-19 11:02:26 -0700
committerDavies Liu <davies.liu@gmail.com>2015-10-19 11:02:26 -0700
commit7893cd95db5f2caba59ff5c859d7e4964ad7938d (patch)
tree6bfb747406a4e138b0f6042c51b420940a378836 /sql/core
parent4c33a34ba3167ae67fdb4978ea2166ce65638fb9 (diff)
downloadspark-7893cd95db5f2caba59ff5c859d7e4964ad7938d.tar.gz
spark-7893cd95db5f2caba59ff5c859d7e4964ad7938d.tar.bz2
spark-7893cd95db5f2caba59ff5c859d7e4964ad7938d.zip
[SPARK-11119] [SQL] cleanup for unsafe array and map
The purpose of this PR is to keep the unsafe format detail only inside the unsafe class itself, so when we use them(like use unsafe array in unsafe map, use unsafe array and map in columnar cache), we don't need to understand the format before use them. change list: * unsafe array's 4-bytes numElements header is now required(was optional), and become a part of unsafe array format. * w.r.t the previous changing, the `sizeInBytes` of unsafe array now counts the 4-bytes header. * unsafe map's format was `[numElements] [key array numBytes] [key array content(without numElements header)] [value array content(without numElements header)]` before, which is a little hacky as it makes unsafe array's header optional. I think saving 4 bytes is not a big deal, so the format is now: `[key array numBytes] [unsafe key array] [unsafe value array]`. * w.r.t the previous changing, the `sizeInBytes` of unsafe map now counts both map's header and array's header. Author: Wenchen Fan <wenchen@databricks.com> Closes #9131 from cloud-fan/unsafe.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala30
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala2
2 files changed, 17 insertions, 15 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
index 2bc2c96b61..a41f04dd3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
@@ -482,12 +482,14 @@ private[sql] case class STRUCT(dataType: StructType) extends ColumnType[UnsafeRo
override def extract(buffer: ByteBuffer): UnsafeRow = {
val sizeInBytes = buffer.getInt()
assert(buffer.hasArray)
- val base = buffer.array()
- val offset = buffer.arrayOffset()
val cursor = buffer.position()
buffer.position(cursor + sizeInBytes)
val unsafeRow = new UnsafeRow
- unsafeRow.pointTo(base, Platform.BYTE_ARRAY_OFFSET + offset + cursor, numOfFields, sizeInBytes)
+ unsafeRow.pointTo(
+ buffer.array(),
+ Platform.BYTE_ARRAY_OFFSET + buffer.arrayOffset() + cursor,
+ numOfFields,
+ sizeInBytes)
unsafeRow
}
@@ -508,12 +510,11 @@ private[sql] case class ARRAY(dataType: ArrayType) extends ColumnType[UnsafeArra
override def actualSize(row: InternalRow, ordinal: Int): Int = {
val unsafeArray = getField(row, ordinal)
- 4 + 4 + unsafeArray.getSizeInBytes
+ 4 + unsafeArray.getSizeInBytes
}
override def append(value: UnsafeArrayData, buffer: ByteBuffer): Unit = {
- buffer.putInt(4 + value.getSizeInBytes)
- buffer.putInt(value.numElements())
+ buffer.putInt(value.getSizeInBytes)
value.writeTo(buffer)
}
@@ -522,10 +523,12 @@ private[sql] case class ARRAY(dataType: ArrayType) extends ColumnType[UnsafeArra
assert(buffer.hasArray)
val cursor = buffer.position()
buffer.position(cursor + numBytes)
- UnsafeReaders.readArray(
+ val array = new UnsafeArrayData
+ array.pointTo(
buffer.array(),
Platform.BYTE_ARRAY_OFFSET + buffer.arrayOffset() + cursor,
numBytes)
+ array
}
override def clone(v: UnsafeArrayData): UnsafeArrayData = v.copy()
@@ -545,15 +548,12 @@ private[sql] case class MAP(dataType: MapType) extends ColumnType[UnsafeMapData]
override def actualSize(row: InternalRow, ordinal: Int): Int = {
val unsafeMap = getField(row, ordinal)
- 12 + unsafeMap.keyArray().getSizeInBytes + unsafeMap.valueArray().getSizeInBytes
+ 4 + unsafeMap.getSizeInBytes
}
override def append(value: UnsafeMapData, buffer: ByteBuffer): Unit = {
- buffer.putInt(8 + value.keyArray().getSizeInBytes + value.valueArray().getSizeInBytes)
- buffer.putInt(value.numElements())
- buffer.putInt(value.keyArray().getSizeInBytes)
- value.keyArray().writeTo(buffer)
- value.valueArray().writeTo(buffer)
+ buffer.putInt(value.getSizeInBytes)
+ value.writeTo(buffer)
}
override def extract(buffer: ByteBuffer): UnsafeMapData = {
@@ -561,10 +561,12 @@ private[sql] case class MAP(dataType: MapType) extends ColumnType[UnsafeMapData]
assert(buffer.hasArray)
val cursor = buffer.position()
buffer.position(cursor + numBytes)
- UnsafeReaders.readMap(
+ val map = new UnsafeMapData
+ map.pointTo(
buffer.array(),
Platform.BYTE_ARRAY_OFFSET + buffer.arrayOffset() + cursor,
numBytes)
+ map
}
override def clone(v: UnsafeMapData): UnsafeMapData = v.copy()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
index 0e6e1bcf72..63bc39bfa0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
@@ -73,7 +73,7 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
checkActualSize(COMPACT_DECIMAL(15, 10), Decimal(0, 15, 10), 8)
checkActualSize(LARGE_DECIMAL(20, 10), Decimal(0, 20, 10), 5)
checkActualSize(ARRAY_TYPE, Array[Any](1), 16)
- checkActualSize(MAP_TYPE, Map(1 -> "a"), 25)
+ checkActualSize(MAP_TYPE, Map(1 -> "a"), 29)
checkActualSize(STRUCT_TYPE, Row("hello"), 28)
}