1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
|
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.columnar
import java.nio.{ByteBuffer, ByteOrder}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.columnar.ColumnBuilder._
import org.apache.spark.sql.execution.columnar.compression.{AllCompressionSchemes, CompressibleColumnBuilder}
import org.apache.spark.sql.types._
private[columnar] trait ColumnBuilder {
/**
* Initializes with an approximate lower bound on the expected number of elements in this column.
*/
def initialize(initialSize: Int, columnName: String = "", useCompression: Boolean = false): Unit
/**
* Appends `row(ordinal)` to the column builder.
*/
def appendFrom(row: InternalRow, ordinal: Int): Unit
/**
* Column statistics information
*/
def columnStats: ColumnStats
/**
* Returns the final columnar byte buffer.
*/
def build(): ByteBuffer
}
private[columnar] class BasicColumnBuilder[JvmType](
val columnStats: ColumnStats,
val columnType: ColumnType[JvmType])
extends ColumnBuilder {
protected var columnName: String = _
protected var buffer: ByteBuffer = _
override def initialize(
initialSize: Int,
columnName: String = "",
useCompression: Boolean = false): Unit = {
val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize
this.columnName = columnName
buffer = ByteBuffer.allocate(size * columnType.defaultSize)
buffer.order(ByteOrder.nativeOrder())
}
override def appendFrom(row: InternalRow, ordinal: Int): Unit = {
buffer = ensureFreeSpace(buffer, columnType.actualSize(row, ordinal))
columnType.append(row, ordinal, buffer)
}
override def build(): ByteBuffer = {
if (buffer.capacity() > buffer.position() * 1.1) {
// trim the buffer
buffer = ByteBuffer
.allocate(buffer.position())
.order(ByteOrder.nativeOrder())
.put(buffer.array(), 0, buffer.position())
}
buffer.flip().asInstanceOf[ByteBuffer]
}
}
private[columnar] class NullColumnBuilder
extends BasicColumnBuilder[Any](new ObjectColumnStats(NullType), NULL)
with NullableColumnBuilder
private[columnar] abstract class ComplexColumnBuilder[JvmType](
columnStats: ColumnStats,
columnType: ColumnType[JvmType])
extends BasicColumnBuilder[JvmType](columnStats, columnType)
with NullableColumnBuilder
private[columnar] abstract class NativeColumnBuilder[T <: AtomicType](
override val columnStats: ColumnStats,
override val columnType: NativeColumnType[T])
extends BasicColumnBuilder[T#InternalType](columnStats, columnType)
with NullableColumnBuilder
with AllCompressionSchemes
with CompressibleColumnBuilder[T]
private[columnar]
class BooleanColumnBuilder extends NativeColumnBuilder(new BooleanColumnStats, BOOLEAN)
private[columnar]
class ByteColumnBuilder extends NativeColumnBuilder(new ByteColumnStats, BYTE)
private[columnar] class ShortColumnBuilder extends NativeColumnBuilder(new ShortColumnStats, SHORT)
private[columnar] class IntColumnBuilder extends NativeColumnBuilder(new IntColumnStats, INT)
private[columnar] class LongColumnBuilder extends NativeColumnBuilder(new LongColumnStats, LONG)
private[columnar] class FloatColumnBuilder extends NativeColumnBuilder(new FloatColumnStats, FLOAT)
private[columnar]
class DoubleColumnBuilder extends NativeColumnBuilder(new DoubleColumnStats, DOUBLE)
private[columnar]
class StringColumnBuilder extends NativeColumnBuilder(new StringColumnStats, STRING)
private[columnar]
class BinaryColumnBuilder extends ComplexColumnBuilder(new BinaryColumnStats, BINARY)
private[columnar] class CompactDecimalColumnBuilder(dataType: DecimalType)
extends NativeColumnBuilder(new DecimalColumnStats(dataType), COMPACT_DECIMAL(dataType))
private[columnar] class DecimalColumnBuilder(dataType: DecimalType)
extends ComplexColumnBuilder(new DecimalColumnStats(dataType), LARGE_DECIMAL(dataType))
private[columnar] class StructColumnBuilder(dataType: StructType)
extends ComplexColumnBuilder(new ObjectColumnStats(dataType), STRUCT(dataType))
private[columnar] class ArrayColumnBuilder(dataType: ArrayType)
extends ComplexColumnBuilder(new ObjectColumnStats(dataType), ARRAY(dataType))
private[columnar] class MapColumnBuilder(dataType: MapType)
extends ComplexColumnBuilder(new ObjectColumnStats(dataType), MAP(dataType))
private[columnar] object ColumnBuilder {
val DEFAULT_INITIAL_BUFFER_SIZE = 128 * 1024
val MAX_BATCH_SIZE_IN_BYTE = 4 * 1024 * 1024L
private[columnar] def ensureFreeSpace(orig: ByteBuffer, size: Int) = {
if (orig.remaining >= size) {
orig
} else {
// grow in steps of initial size
val capacity = orig.capacity()
val newSize = capacity + size.max(capacity)
val pos = orig.position()
ByteBuffer
.allocate(newSize)
.order(ByteOrder.nativeOrder())
.put(orig.array(), 0, pos)
}
}
def apply(
dataType: DataType,
initialSize: Int = 0,
columnName: String = "",
useCompression: Boolean = false): ColumnBuilder = {
val builder: ColumnBuilder = dataType match {
case NullType => new NullColumnBuilder
case BooleanType => new BooleanColumnBuilder
case ByteType => new ByteColumnBuilder
case ShortType => new ShortColumnBuilder
case IntegerType | DateType => new IntColumnBuilder
case LongType | TimestampType => new LongColumnBuilder
case FloatType => new FloatColumnBuilder
case DoubleType => new DoubleColumnBuilder
case StringType => new StringColumnBuilder
case BinaryType => new BinaryColumnBuilder
case dt: DecimalType if dt.precision <= Decimal.MAX_LONG_DIGITS =>
new CompactDecimalColumnBuilder(dt)
case dt: DecimalType => new DecimalColumnBuilder(dt)
case struct: StructType => new StructColumnBuilder(struct)
case array: ArrayType => new ArrayColumnBuilder(array)
case map: MapType => new MapColumnBuilder(map)
case udt: UserDefinedType[_] =>
return apply(udt.sqlType, initialSize, columnName, useCompression)
case other =>
throw new Exception(s"not supported type: $other")
}
builder.initialize(initialSize, columnName, useCompression)
builder
}
}
|