aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-11-17 12:50:01 -0800
committerDavies Liu <davies.liu@gmail.com>2015-11-17 12:50:01 -0800
commit5aca6ad00c9d7fa43c725b8da4a10114a3a77421 (patch)
tree40c175bd3c9c424b8efb25c51fa2da55291ebc72
parentd98d1cb000c8c4e391d73ae86efd09f15e5d165c (diff)
downloadspark-5aca6ad00c9d7fa43c725b8da4a10114a3a77421.tar.gz
spark-5aca6ad00c9d7fa43c725b8da4a10114a3a77421.tar.bz2
spark-5aca6ad00c9d7fa43c725b8da4a10114a3a77421.zip
[SPARK-11767] [SQL] limit the size of caced batch
Currently the size of cached batch in only controlled by `batchSize` (default value is 10000), which does not work well with the size of serialized columns (for example, complex types). The memory used to build the batch is not accounted, it's easy to OOM (especially after unified memory management). This PR introduce a hard limit as 4M for total columns (up to 50 columns of uncompressed primitive columns). This also change the way to grow buffer, double it each time, then trim it once finished. cc liancheng Author: Davies Liu <davies@databricks.com> Closes #9760 from davies/cache_limit.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala6
3 files changed, 16 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
index 7a7345a7e0..599f30f2d7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
@@ -73,6 +73,13 @@ private[sql] class BasicColumnBuilder[JvmType](
}
override def build(): ByteBuffer = {
+ if (buffer.capacity() > buffer.position() * 1.1) {
+ // trim the buffer
+ buffer = ByteBuffer
+ .allocate(buffer.position())
+ .order(ByteOrder.nativeOrder())
+ .put(buffer.array(), 0, buffer.position())
+ }
buffer.flip().asInstanceOf[ByteBuffer]
}
}
@@ -129,7 +136,8 @@ private[sql] class MapColumnBuilder(dataType: MapType)
extends ComplexColumnBuilder(new ObjectColumnStats(dataType), MAP(dataType))
private[sql] object ColumnBuilder {
- val DEFAULT_INITIAL_BUFFER_SIZE = 1024 * 1024
+ val DEFAULT_INITIAL_BUFFER_SIZE = 128 * 1024
+ val MAX_BATCH_SIZE_IN_BYTE = 4 * 1024 * 1024L
private[columnar] def ensureFreeSpace(orig: ByteBuffer, size: Int) = {
if (orig.remaining >= size) {
@@ -137,7 +145,7 @@ private[sql] object ColumnBuilder {
} else {
// grow in steps of initial size
val capacity = orig.capacity()
- val newSize = capacity + size.max(capacity / 8 + 1)
+ val newSize = capacity + size.max(capacity)
val pos = orig.position()
ByteBuffer
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
index ba61003ba4..91a0565058 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
@@ -48,7 +48,7 @@ private[sql] class PartitionStatistics(tableSchema: Seq[Attribute]) extends Seri
private[sql] sealed trait ColumnStats extends Serializable {
protected var count = 0
protected var nullCount = 0
- protected var sizeInBytes = 0L
+ private[sql] var sizeInBytes = 0L
/**
* Gathers statistics information from `row(ordinal)`.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index 2cface61e5..ae77298e6d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -133,7 +133,9 @@ private[sql] case class InMemoryRelation(
}.toArray
var rowCount = 0
- while (rowIterator.hasNext && rowCount < batchSize) {
+ var totalSize = 0L
+ while (rowIterator.hasNext && rowCount < batchSize
+ && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) {
val row = rowIterator.next()
// Added for SPARK-6082. This assertion can be useful for scenarios when something
@@ -147,8 +149,10 @@ private[sql] case class InMemoryRelation(
s"\nRow content: $row")
var i = 0
+ totalSize = 0
while (i < row.numFields) {
columnBuilders(i).appendFrom(row, i)
+ totalSize += columnBuilders(i).columnStats.sizeInBytes
i += 1
}
rowCount += 1