diff options
author | Cheng Lian <lian.cs.zju@gmail.com> | 2014-08-29 18:16:47 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-08-29 18:16:47 -0700 |
commit | 32b18dd52cf8920903819f23e406271ecd8ac6bb (patch) | |
tree | c9aba3e54f8fbf2807754649ce622004fb10d5b6 /sql/core/src/main | |
parent | 13901764f4e9ed3de03e420d88ab42bdce5d5140 (diff) | |
download | spark-32b18dd52cf8920903819f23e406271ecd8ac6bb.tar.gz spark-32b18dd52cf8920903819f23e406271ecd8ac6bb.tar.bz2 spark-32b18dd52cf8920903819f23e406271ecd8ac6bb.zip |
[SPARK-3320][SQL] Made batched in-memory column buffer building work for SchemaRDDs with empty partitions
Author: Cheng Lian <lian.cs.zju@gmail.com>
Closes #2213 from liancheng/spark-3320 and squashes the following commits:
45a0139 [Cheng Lian] Fixed typo in InMemoryColumnarQuerySuite
f67067d [Cheng Lian] Fixed SPARK-3320
Diffstat (limited to 'sql/core/src/main')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala | 49 |
1 files changed, 19 insertions, 30 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index bc36bacd00..cb055cd74a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -104,40 +104,29 @@ private[sql] case class InMemoryColumnarTableScan( override def execute() = { relation.cachedColumnBuffers.mapPartitions { iterator => // Find the ordinals of the requested columns. If none are requested, use the first. - val requestedColumns = - if (attributes.isEmpty) { - Seq(0) - } else { - attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId)) - } - - new Iterator[Row] { - private[this] var columnBuffers: Array[ByteBuffer] = null - private[this] var columnAccessors: Seq[ColumnAccessor] = null - nextBatch() - - private[this] val nextRow = new GenericMutableRow(columnAccessors.length) - - def nextBatch() = { - columnBuffers = iterator.next() - columnAccessors = requestedColumns.map(columnBuffers(_)).map(ColumnAccessor(_)) - } + val requestedColumns = if (attributes.isEmpty) { + Seq(0) + } else { + attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId)) + } - override def next() = { - if (!columnAccessors.head.hasNext) { - nextBatch() - } + iterator + .map(batch => requestedColumns.map(batch(_)).map(ColumnAccessor(_))) + .flatMap { columnAccessors => + val nextRow = new GenericMutableRow(columnAccessors.length) + new Iterator[Row] { + override def next() = { + var i = 0 + while (i < nextRow.length) { + columnAccessors(i).extractTo(nextRow, i) + i += 1 + } + nextRow + } - var i = 0 - while (i < nextRow.length) { - columnAccessors(i).extractTo(nextRow, i) - i += 1 + override def hasNext = columnAccessors.head.hasNext } - nextRow } - - override def hasNext = columnAccessors.head.hasNext || iterator.hasNext - } } } } |