aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-08-29 18:16:47 -0700
committerMichael Armbrust <michael@databricks.com>2014-08-29 18:16:47 -0700
commit32b18dd52cf8920903819f23e406271ecd8ac6bb (patch)
treec9aba3e54f8fbf2807754649ce622004fb10d5b6 /sql/core/src/main
parent13901764f4e9ed3de03e420d88ab42bdce5d5140 (diff)
downloadspark-32b18dd52cf8920903819f23e406271ecd8ac6bb.tar.gz
spark-32b18dd52cf8920903819f23e406271ecd8ac6bb.tar.bz2
spark-32b18dd52cf8920903819f23e406271ecd8ac6bb.zip
[SPARK-3320][SQL] Made batched in-memory column buffer building work for SchemaRDDs with empty partitions
Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #2213 from liancheng/spark-3320 and squashes the following commits: 45a0139 [Cheng Lian] Fixed typo in InMemoryColumnarQuerySuite f67067d [Cheng Lian] Fixed SPARK-3320
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala49
1 files changed, 19 insertions, 30 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index bc36bacd00..cb055cd74a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -104,40 +104,29 @@ private[sql] case class InMemoryColumnarTableScan(
override def execute() = {
relation.cachedColumnBuffers.mapPartitions { iterator =>
// Find the ordinals of the requested columns. If none are requested, use the first.
- val requestedColumns =
- if (attributes.isEmpty) {
- Seq(0)
- } else {
- attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
- }
-
- new Iterator[Row] {
- private[this] var columnBuffers: Array[ByteBuffer] = null
- private[this] var columnAccessors: Seq[ColumnAccessor] = null
- nextBatch()
-
- private[this] val nextRow = new GenericMutableRow(columnAccessors.length)
-
- def nextBatch() = {
- columnBuffers = iterator.next()
- columnAccessors = requestedColumns.map(columnBuffers(_)).map(ColumnAccessor(_))
- }
+ val requestedColumns = if (attributes.isEmpty) {
+ Seq(0)
+ } else {
+ attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
+ }
- override def next() = {
- if (!columnAccessors.head.hasNext) {
- nextBatch()
- }
+ iterator
+ .map(batch => requestedColumns.map(batch(_)).map(ColumnAccessor(_)))
+ .flatMap { columnAccessors =>
+ val nextRow = new GenericMutableRow(columnAccessors.length)
+ new Iterator[Row] {
+ override def next() = {
+ var i = 0
+ while (i < nextRow.length) {
+ columnAccessors(i).extractTo(nextRow, i)
+ i += 1
+ }
+ nextRow
+ }
- var i = 0
- while (i < nextRow.length) {
- columnAccessors(i).extractTo(nextRow, i)
- i += 1
+ override def hasNext = columnAccessors.head.hasNext
}
- nextRow
}
-
- override def hasNext = columnAccessors.head.hasNext || iterator.hasNext
- }
}
}
}