aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala16
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala1
5 files changed, 20 insertions, 30 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
index c416a74573..7e7bb2859b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
@@ -118,7 +118,7 @@ private[sql] class BinaryColumnBuilder extends ComplexColumnBuilder(BINARY)
private[sql] class GenericColumnBuilder extends ComplexColumnBuilder(GENERIC)
private[sql] object ColumnBuilder {
- val DEFAULT_INITIAL_BUFFER_SIZE = 10 * 1024 * 104
+ val DEFAULT_INITIAL_BUFFER_SIZE = 1024 * 1024
private[columnar] def ensureFreeSpace(orig: ByteBuffer, size: Int) = {
if (orig.remaining >= size) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
index d008806eed..f631ee76fc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
@@ -36,9 +36,9 @@ import org.apache.spark.sql.Row
* }}}
*/
private[sql] trait NullableColumnBuilder extends ColumnBuilder {
- private var nulls: ByteBuffer = _
+ protected var nulls: ByteBuffer = _
+ protected var nullCount: Int = _
private var pos: Int = _
- private var nullCount: Int = _
abstract override def initialize(initialSize: Int, columnName: String, useCompression: Boolean) {
nulls = ByteBuffer.allocate(1024)
@@ -78,4 +78,9 @@ private[sql] trait NullableColumnBuilder extends ColumnBuilder {
buffer.rewind()
buffer
}
+
+ protected def buildNonNulls(): ByteBuffer = {
+ nulls.limit(nulls.position()).rewind()
+ super.build()
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
index 6ad12a0dcb..a5826bb033 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
@@ -46,8 +46,6 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]
this: NativeColumnBuilder[T] with WithCompressionSchemes =>
- import CompressionScheme._
-
var compressionEncoders: Seq[Encoder[T]] = _
abstract override def initialize(initialSize: Int, columnName: String, useCompression: Boolean) {
@@ -81,28 +79,32 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]
}
}
- abstract override def build() = {
- val rawBuffer = super.build()
+ override def build() = {
+ val nonNullBuffer = buildNonNulls()
+ val typeId = nonNullBuffer.getInt()
val encoder: Encoder[T] = {
val candidate = compressionEncoders.minBy(_.compressionRatio)
if (isWorthCompressing(candidate)) candidate else PassThrough.encoder
}
- val headerSize = columnHeaderSize(rawBuffer)
+ // Header = column type ID + null count + null positions
+ val headerSize = 4 + 4 + nulls.limit()
val compressedSize = if (encoder.compressedSize == 0) {
- rawBuffer.limit - headerSize
+ nonNullBuffer.remaining()
} else {
encoder.compressedSize
}
- // Reserves 4 bytes for compression scheme ID
val compressedBuffer = ByteBuffer
+ // Reserves 4 bytes for compression scheme ID
.allocate(headerSize + 4 + compressedSize)
.order(ByteOrder.nativeOrder)
-
- copyColumnHeader(rawBuffer, compressedBuffer)
+ // Write the header
+ .putInt(typeId)
+ .putInt(nullCount)
+ .put(nulls)
logInfo(s"Compressor for [$columnName]: $encoder, ratio: ${encoder.compressionRatio}")
- encoder.compress(rawBuffer, compressedBuffer, columnType)
+ encoder.compress(nonNullBuffer, compressedBuffer, columnType)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
index ba1810dd2a..7797f75177 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
@@ -67,22 +67,6 @@ private[sql] object CompressionScheme {
s"Unrecognized compression scheme type ID: $typeId"))
}
- def copyColumnHeader(from: ByteBuffer, to: ByteBuffer) {
- // Writes column type ID
- to.putInt(from.getInt())
-
- // Writes null count
- val nullCount = from.getInt()
- to.putInt(nullCount)
-
- // Writes null positions
- var i = 0
- while (i < nullCount) {
- to.putInt(from.getInt())
- i += 1
- }
- }
-
def columnHeaderSize(columnBuffer: ByteBuffer): Int = {
val header = columnBuffer.duplicate().order(ByteOrder.nativeOrder)
val nullCount = header.getInt(4)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala
index 6d688ea95c..72c19fa31d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala
@@ -42,4 +42,3 @@ object TestCompressibleColumnBuilder {
builder
}
}
-