aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-08-11 20:21:56 -0700
committerMichael Armbrust <michael@databricks.com>2014-08-11 20:21:56 -0700
commitbad21ed085a505559dccc06223b486170371ddd2 (patch)
tree53dad5f03dc324e137c311beba572309416e5162 /sql
parentc686b7dd4668b5e9fc3177f15edeae3446d2e634 (diff)
downloadspark-bad21ed085a505559dccc06223b486170371ddd2.tar.gz
spark-bad21ed085a505559dccc06223b486170371ddd2.tar.bz2
spark-bad21ed085a505559dccc06223b486170371ddd2.zip
[SPARK-2650][SQL] Build column buffers in smaller batches
Author: Michael Armbrust <michael@databricks.com> Closes #1880 from marmbrus/columnBatches and squashes the following commits: 0649987 [Michael Armbrust] add test 4756fad [Michael Armbrust] fix compilation 2314532 [Michael Armbrust] Build column buffers in smaller batches
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala76
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala2
7 files changed, 70 insertions, 36 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 0fd7aaaa36..35c51dec0b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -25,6 +25,7 @@ import java.util.Properties
private[spark] object SQLConf {
val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed"
+ val COLUMN_BATCH_SIZE = "spark.sql.inMemoryColumnarStorage.batchSize"
val AUTO_BROADCASTJOIN_THRESHOLD = "spark.sql.autoBroadcastJoinThreshold"
val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
@@ -71,6 +72,9 @@ trait SQLConf {
/** When true tables cached using the in-memory columnar caching will be compressed. */
private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED, "false").toBoolean
+ /** The number of rows that will be */
+ private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE, "1000").toInt
+
/** Number of partitions to use for shuffle operators. */
private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, "200").toInt
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 71d338d21d..af9f7c62a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -273,7 +273,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
currentTable.logicalPlan
case _ =>
- InMemoryRelation(useCompression, executePlan(currentTable).executedPlan)
+ InMemoryRelation(useCompression, columnBatchSize, executePlan(currentTable).executedPlan)
}
catalog.registerTable(None, tableName, asInMemoryRelation)
@@ -284,7 +284,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
table(tableName).queryExecution.analyzed match {
// This is kind of a hack to make sure that if this was just an RDD registered as a table,
// we reregister the RDD as a table.
- case inMem @ InMemoryRelation(_, _, e: ExistingRdd) =>
+ case inMem @ InMemoryRelation(_, _, _, e: ExistingRdd) =>
inMem.cachedColumnBuffers.unpersist()
catalog.unregisterTable(None, tableName)
catalog.registerTable(None, tableName, SparkLogicalPlan(e)(self))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index 88901debbb..3364d0e18b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -28,13 +28,14 @@ import org.apache.spark.sql.Row
import org.apache.spark.SparkConf
object InMemoryRelation {
- def apply(useCompression: Boolean, child: SparkPlan): InMemoryRelation =
- new InMemoryRelation(child.output, useCompression, child)()
+ def apply(useCompression: Boolean, batchSize: Int, child: SparkPlan): InMemoryRelation =
+ new InMemoryRelation(child.output, useCompression, batchSize, child)()
}
private[sql] case class InMemoryRelation(
output: Seq[Attribute],
useCompression: Boolean,
+ batchSize: Int,
child: SparkPlan)
(private var _cachedColumnBuffers: RDD[Array[ByteBuffer]] = null)
extends LogicalPlan with MultiInstanceRelation {
@@ -43,22 +44,31 @@ private[sql] case class InMemoryRelation(
// As in Spark, the actual work of caching is lazy.
if (_cachedColumnBuffers == null) {
val output = child.output
- val cached = child.execute().mapPartitions { iterator =>
- val columnBuilders = output.map { attribute =>
- ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name, useCompression)
- }.toArray
-
- var row: Row = null
- while (iterator.hasNext) {
- row = iterator.next()
- var i = 0
- while (i < row.length) {
- columnBuilders(i).appendFrom(row, i)
- i += 1
+ val cached = child.execute().mapPartitions { baseIterator =>
+ new Iterator[Array[ByteBuffer]] {
+ def next() = {
+ val columnBuilders = output.map { attribute =>
+ ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name, useCompression)
+ }.toArray
+
+ var row: Row = null
+ var rowCount = 0
+
+ while (baseIterator.hasNext && rowCount < batchSize) {
+ row = baseIterator.next()
+ var i = 0
+ while (i < row.length) {
+ columnBuilders(i).appendFrom(row, i)
+ i += 1
+ }
+ rowCount += 1
+ }
+
+ columnBuilders.map(_.build())
}
- }
- Iterator.single(columnBuilders.map(_.build()))
+ def hasNext = baseIterator.hasNext
+ }
}.cache()
cached.setName(child.toString)
@@ -74,6 +84,7 @@ private[sql] case class InMemoryRelation(
new InMemoryRelation(
output.map(_.newInstance),
useCompression,
+ batchSize,
child)(
_cachedColumnBuffers).asInstanceOf[this.type]
}
@@ -90,22 +101,31 @@ private[sql] case class InMemoryColumnarTableScan(
override def execute() = {
relation.cachedColumnBuffers.mapPartitions { iterator =>
- val columnBuffers = iterator.next()
- assert(!iterator.hasNext)
+ // Find the ordinals of the requested columns. If none are requested, use the first.
+ val requestedColumns =
+ if (attributes.isEmpty) {
+ Seq(0)
+ } else {
+ attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
+ }
new Iterator[Row] {
- // Find the ordinals of the requested columns. If none are requested, use the first.
- val requestedColumns =
- if (attributes.isEmpty) {
- Seq(0)
- } else {
- attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
- }
+ private[this] var columnBuffers: Array[ByteBuffer] = null
+ private[this] var columnAccessors: Seq[ColumnAccessor] = null
+ nextBatch()
+
+ private[this] val nextRow = new GenericMutableRow(columnAccessors.length)
- val columnAccessors = requestedColumns.map(columnBuffers(_)).map(ColumnAccessor(_))
- val nextRow = new GenericMutableRow(columnAccessors.length)
+ def nextBatch() = {
+ columnBuffers = iterator.next()
+ columnAccessors = requestedColumns.map(columnBuffers(_)).map(ColumnAccessor(_))
+ }
override def next() = {
+ if (!columnAccessors.head.hasNext) {
+ nextBatch()
+ }
+
var i = 0
while (i < nextRow.length) {
columnAccessors(i).extractTo(nextRow, i)
@@ -114,7 +134,7 @@ private[sql] case class InMemoryColumnarTableScan(
nextRow
}
- override def hasNext = columnAccessors.head.hasNext
+ override def hasNext = columnAccessors.head.hasNext || iterator.hasNext
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index fbf9bd9dbc..befef46d93 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -22,9 +22,19 @@ import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableSca
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._
+case class BigData(s: String)
+
class CachedTableSuite extends QueryTest {
TestData // Load test tables.
+ test("too big for memory") {
+ val data = "*" * 10000
+ sparkContext.parallelize(1 to 1000000, 1).map(_ => BigData(data)).registerTempTable("bigData")
+ cacheTable("bigData")
+ assert(table("bigData").count() === 1000000L)
+ uncacheTable("bigData")
+ }
+
test("SPARK-1669: cacheTable should be idempotent") {
assume(!table("testData").logicalPlan.isInstanceOf[InMemoryRelation])
@@ -37,7 +47,7 @@ class CachedTableSuite extends QueryTest {
cacheTable("testData")
table("testData").queryExecution.analyzed match {
- case InMemoryRelation(_, _, _: InMemoryColumnarTableScan) =>
+ case InMemoryRelation(_, _, _, _: InMemoryColumnarTableScan) =>
fail("cacheTable is not idempotent")
case _ =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
index b561b44ad7..736c0f8571 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
@@ -28,14 +28,14 @@ class InMemoryColumnarQuerySuite extends QueryTest {
test("simple columnar query") {
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
- val scan = InMemoryRelation(useCompression = true, plan)
+ val scan = InMemoryRelation(useCompression = true, 5, plan)
checkAnswer(scan, testData.collect().toSeq)
}
test("projection") {
val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan
- val scan = InMemoryRelation(useCompression = true, plan)
+ val scan = InMemoryRelation(useCompression = true, 5, plan)
checkAnswer(scan, testData.collect().map {
case Row(key: Int, value: String) => value -> key
@@ -44,7 +44,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
- val scan = InMemoryRelation(useCompression = true, plan)
+ val scan = InMemoryRelation(useCompression = true, 5, plan)
checkAnswer(scan, testData.collect().toSeq)
checkAnswer(scan, testData.collect().toSeq)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 82e9c1a248..3b371211e1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -137,7 +137,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
castChildOutput(p, table, child)
case p @ logical.InsertIntoTable(
- InMemoryRelation(_, _,
+ InMemoryRelation(_, _, _,
HiveTableScan(_, table, _)), _, child, _) =>
castChildOutput(p, table, child)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 85d2496a34..5fcc1bd4b9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -45,7 +45,7 @@ private[hive] trait HiveStrategies {
case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
case logical.InsertIntoTable(
- InMemoryRelation(_, _,
+ InMemoryRelation(_, _, _,
HiveTableScan(_, table, _)), partition, child, overwrite) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
case _ => Nil