diff options
Diffstat (limited to 'sql/core/src/main')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala | 49 |
1 files changed, 19 insertions, 30 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index bc36bacd00..cb055cd74a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -104,40 +104,29 @@ private[sql] case class InMemoryColumnarTableScan( override def execute() = { relation.cachedColumnBuffers.mapPartitions { iterator => // Find the ordinals of the requested columns. If none are requested, use the first. - val requestedColumns = - if (attributes.isEmpty) { - Seq(0) - } else { - attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId)) - } - - new Iterator[Row] { - private[this] var columnBuffers: Array[ByteBuffer] = null - private[this] var columnAccessors: Seq[ColumnAccessor] = null - nextBatch() - - private[this] val nextRow = new GenericMutableRow(columnAccessors.length) - - def nextBatch() = { - columnBuffers = iterator.next() - columnAccessors = requestedColumns.map(columnBuffers(_)).map(ColumnAccessor(_)) - } + val requestedColumns = if (attributes.isEmpty) { + Seq(0) + } else { + attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId)) + } - override def next() = { - if (!columnAccessors.head.hasNext) { - nextBatch() - } + iterator + .map(batch => requestedColumns.map(batch(_)).map(ColumnAccessor(_))) + .flatMap { columnAccessors => + val nextRow = new GenericMutableRow(columnAccessors.length) + new Iterator[Row] { + override def next() = { + var i = 0 + while (i < nextRow.length) { + columnAccessors(i).extractTo(nextRow, i) + i += 1 + } + nextRow + } - var i = 0 - while (i < nextRow.length) { - columnAccessors(i).extractTo(nextRow, i) - i += 1 + override def hasNext = columnAccessors.head.hasNext } - nextRow } - - override def hasNext = columnAccessors.head.hasNext || iterator.hasNext - } } } } |