aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala49
1 files changed, 19 insertions, 30 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index bc36bacd00..cb055cd74a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -104,40 +104,29 @@ private[sql] case class InMemoryColumnarTableScan(
override def execute() = {
relation.cachedColumnBuffers.mapPartitions { iterator =>
// Find the ordinals of the requested columns. If none are requested, use the first.
- val requestedColumns =
- if (attributes.isEmpty) {
- Seq(0)
- } else {
- attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
- }
-
- new Iterator[Row] {
- private[this] var columnBuffers: Array[ByteBuffer] = null
- private[this] var columnAccessors: Seq[ColumnAccessor] = null
- nextBatch()
-
- private[this] val nextRow = new GenericMutableRow(columnAccessors.length)
-
- def nextBatch() = {
- columnBuffers = iterator.next()
- columnAccessors = requestedColumns.map(columnBuffers(_)).map(ColumnAccessor(_))
- }
+ val requestedColumns = if (attributes.isEmpty) {
+ Seq(0)
+ } else {
+ attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
+ }
- override def next() = {
- if (!columnAccessors.head.hasNext) {
- nextBatch()
- }
+ iterator
+ .map(batch => requestedColumns.map(batch(_)).map(ColumnAccessor(_)))
+ .flatMap { columnAccessors =>
+ val nextRow = new GenericMutableRow(columnAccessors.length)
+ new Iterator[Row] {
+ override def next() = {
+ var i = 0
+ while (i < nextRow.length) {
+ columnAccessors(i).extractTo(nextRow, i)
+ i += 1
+ }
+ nextRow
+ }
- var i = 0
- while (i < nextRow.length) {
- columnAccessors(i).extractTo(nextRow, i)
- i += 1
+ override def hasNext = columnAccessors.head.hasNext
}
- nextRow
}
-
- override def hasNext = columnAccessors.head.hasNext || iterator.hasNext
- }
}
}
}