aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala6
3 files changed, 16 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
index 7a7345a7e0..599f30f2d7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
@@ -73,6 +73,13 @@ private[sql] class BasicColumnBuilder[JvmType](
}
override def build(): ByteBuffer = {
+ if (buffer.capacity() > buffer.position() * 1.1) {
+ // trim the buffer
+ buffer = ByteBuffer
+ .allocate(buffer.position())
+ .order(ByteOrder.nativeOrder())
+ .put(buffer.array(), 0, buffer.position())
+ }
buffer.flip().asInstanceOf[ByteBuffer]
}
}
@@ -129,7 +136,8 @@ private[sql] class MapColumnBuilder(dataType: MapType)
extends ComplexColumnBuilder(new ObjectColumnStats(dataType), MAP(dataType))
private[sql] object ColumnBuilder {
- val DEFAULT_INITIAL_BUFFER_SIZE = 1024 * 1024
+ val DEFAULT_INITIAL_BUFFER_SIZE = 128 * 1024
+ val MAX_BATCH_SIZE_IN_BYTE = 4 * 1024 * 1024L
private[columnar] def ensureFreeSpace(orig: ByteBuffer, size: Int) = {
if (orig.remaining >= size) {
@@ -137,7 +145,7 @@ private[sql] object ColumnBuilder {
} else {
// grow in steps of initial size
val capacity = orig.capacity()
- val newSize = capacity + size.max(capacity / 8 + 1)
+ val newSize = capacity + size.max(capacity)
val pos = orig.position()
ByteBuffer
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
index ba61003ba4..91a0565058 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
@@ -48,7 +48,7 @@ private[sql] class PartitionStatistics(tableSchema: Seq[Attribute]) extends Seri
private[sql] sealed trait ColumnStats extends Serializable {
protected var count = 0
protected var nullCount = 0
- protected var sizeInBytes = 0L
+ private[sql] var sizeInBytes = 0L
/**
* Gathers statistics information from `row(ordinal)`.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index 2cface61e5..ae77298e6d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -133,7 +133,9 @@ private[sql] case class InMemoryRelation(
}.toArray
var rowCount = 0
- while (rowIterator.hasNext && rowCount < batchSize) {
+ var totalSize = 0L
+ while (rowIterator.hasNext && rowCount < batchSize
+ && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) {
val row = rowIterator.next()
// Added for SPARK-6082. This assertion can be useful for scenarios when something
@@ -147,8 +149,10 @@ private[sql] case class InMemoryRelation(
s"\nRow content: $row")
var i = 0
+ totalSize = 0
while (i < row.numFields) {
columnBuilders(i).appendFrom(row, i)
+ totalSize += columnBuilders(i).columnStats.sizeInBytes
i += 1
}
rowCount += 1