aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java176
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java296
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java179
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java175
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala320
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala317
6 files changed, 1463 insertions, 0 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
new file mode 100644
index 0000000000..d9dde92ceb
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.vectorized;
+
+import org.apache.spark.sql.types.DataType;
+
+/**
+ * This class represents a column of values and provides the main APIs to access the data
+ * values. It supports all the types and contains get/put APIs as well as their batched versions.
+ * The batched versions are preferable whenever possible.
+ *
+ * Most of the APIs take the rowId as a parameter. This is the local 0-based row id for values
+ * in the current RowBatch.
+ *
+ * A ColumnVector should be considered immutable once originally created. In other words, it is not
+ * valid to call put APIs after reads until reset() is called.
+ */
+public abstract class ColumnVector {
+ /**
+ * Allocates a column with each element of size `width` either on or off heap.
+ */
+ public static ColumnVector allocate(int capacity, DataType type, boolean offHeap) {
+ if (offHeap) {
+ return new OffHeapColumnVector(capacity, type);
+ } else {
+ return new OnHeapColumnVector(capacity, type);
+ }
+ }
+
+ public final DataType dataType() { return type; }
+
+ /**
+ * Resets this column for writing. The currently stored values are no longer accessible.
+ */
+ public void reset() {
+ numNulls = 0;
+ if (anyNullsSet) {
+ putNotNulls(0, capacity);
+ anyNullsSet = false;
+ }
+ }
+
+ /**
+ * Cleans up memory for this column. The column is not usable after this.
+ * TODO: this should probably have ref-counted semantics.
+ */
+ public abstract void close();
+
+ /**
+ * Returns the number of nulls in this column.
+ */
+ public final int numNulls() { return numNulls; }
+
+ /**
+ * Returns true if any of the nulls indicator are set for this column. This can be used
+ * as an optimization to prevent setting nulls.
+ */
+ public final boolean anyNullsSet() { return anyNullsSet; }
+
+ /**
+ * Returns the off heap ptr for the arrays backing the NULLs and values buffer. Only valid
+ * to call for off heap columns.
+ */
+ public abstract long nullsNativeAddress();
+ public abstract long valuesNativeAddress();
+
+ /**
+ * Sets the value at rowId to null/not null.
+ */
+ public abstract void putNotNull(int rowId);
+ public abstract void putNull(int rowId);
+
+ /**
+ * Sets the values from [rowId, rowId + count) to null/not null.
+ */
+ public abstract void putNulls(int rowId, int count);
+ public abstract void putNotNulls(int rowId, int count);
+
+ /**
+ * Returns whether the value at rowId is NULL.
+ */
+ public abstract boolean getIsNull(int rowId);
+
+ /**
+ * Sets the value at rowId to `value`.
+ */
+ public abstract void putInt(int rowId, int value);
+
+ /**
+ * Sets values from [rowId, rowId + count) to value.
+ */
+ public abstract void putInts(int rowId, int count, int value);
+
+ /**
+ * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
+ */
+ public abstract void putInts(int rowId, int count, int[] src, int srcIndex);
+
+ /**
+ * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
+ * The data in src must be 4-byte little endian ints.
+ */
+ public abstract void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex);
+
+ /**
+ * Returns the integer for rowId.
+ */
+ public abstract int getInt(int rowId);
+
+ /**
+ * Sets the value at rowId to `value`.
+ */
+ public abstract void putDouble(int rowId, double value);
+
+ /**
+ * Sets values from [rowId, rowId + count) to value.
+ */
+ public abstract void putDoubles(int rowId, int count, double value);
+
+ /**
+ * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
+ * src should contain `count` doubles written as ieee format.
+ */
+ public abstract void putDoubles(int rowId, int count, double[] src, int srcIndex);
+
+ /**
+ * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
+ * The data in src must be ieee formated doubles.
+ */
+ public abstract void putDoubles(int rowId, int count, byte[] src, int srcIndex);
+
+ /**
+ * Returns the double for rowId.
+ */
+ public abstract double getDouble(int rowId);
+
+ /**
+ * Maximum number of rows that can be stored in this column.
+ */
+ protected final int capacity;
+
+ /**
+ * Number of nulls in this column. This is an optimization for the reader, to skip NULL checks.
+ */
+ protected int numNulls;
+
+ /**
+ * True if there is at least one NULL byte set. This is an optimization for the writer, to skip
+ * having to clear NULL bits.
+ */
+ protected boolean anyNullsSet;
+
+ /**
+ * Data type for this column.
+ */
+ protected final DataType type;
+
+ protected ColumnVector(int capacity, DataType type) {
+ this.capacity = capacity;
+ this.type = type;
+ }
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
new file mode 100644
index 0000000000..47defac453
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.vectorized;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import org.apache.commons.lang.NotImplementedException;
+
+/**
+ * This class is the in memory representation of rows as they are streamed through operators. It
+ * is designed to maximize CPU efficiency and not storage footprint. Since it is expected that
+ * each operator allocates one of thee objects, the storage footprint on the task is negligible.
+ *
+ * The layout is a columnar with values encoded in their native format. Each RowBatch contains
+ * a horizontal partitioning of the data, split into columns.
+ *
+ * The ColumnarBatch supports either on heap or offheap modes with (mostly) the identical API.
+ *
+ * TODO:
+ * - There are many TODOs for the existing APIs. They should throw a not implemented exception.
+ * - Compaction: The batch and columns should be able to compact based on a selection vector.
+ */
+public final class ColumnarBatch {
+ private static final int DEFAULT_BATCH_SIZE = 4 * 1024;
+
+ private final StructType schema;
+ private final int capacity;
+ private int numRows;
+ private final ColumnVector[] columns;
+
+ // True if the row is filtered.
+ private final boolean[] filteredRows;
+
+ // Total number of rows that have been filtered.
+ private int numRowsFiltered = 0;
+
+ public static ColumnarBatch allocate(StructType schema, boolean offHeap) {
+ return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, offHeap);
+ }
+
+ public static ColumnarBatch allocate(StructType schema, boolean offHeap, int maxRows) {
+ return new ColumnarBatch(schema, maxRows, offHeap);
+ }
+
+ /**
+ * Called to close all the columns in this batch. It is not valid to access the data after
+ * calling this. This must be called at the end to clean up memory allcoations.
+ */
+ public void close() {
+ for (ColumnVector c: columns) {
+ c.close();
+ }
+ }
+
+ /**
+ * Adapter class to interop with existing components that expect internal row. A lot of
+ * performance is lost with this translation.
+ */
+ public final class Row extends InternalRow {
+ private int rowId;
+
+ /**
+ * Marks this row as being filtered out. This means a subsequent iteration over the rows
+ * in this batch will not include this row.
+ */
+ public final void markFiltered() {
+ ColumnarBatch.this.markFiltered(rowId);
+ }
+
+ @Override
+ public final int numFields() {
+ return ColumnarBatch.this.numCols();
+ }
+
+ @Override
+ public final InternalRow copy() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public final boolean anyNull() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public final boolean isNullAt(int ordinal) {
+ return ColumnarBatch.this.column(ordinal).getIsNull(rowId);
+ }
+
+ @Override
+ public final boolean getBoolean(int ordinal) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public final byte getByte(int ordinal) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public final short getShort(int ordinal) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public final int getInt(int ordinal) {
+ return ColumnarBatch.this.column(ordinal).getInt(rowId);
+ }
+
+ @Override
+ public final long getLong(int ordinal) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public final float getFloat(int ordinal) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public final double getDouble(int ordinal) {
+ return ColumnarBatch.this.column(ordinal).getDouble(rowId);
+ }
+
+ @Override
+ public final Decimal getDecimal(int ordinal, int precision, int scale) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public final UTF8String getUTF8String(int ordinal) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public final byte[] getBinary(int ordinal) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public final CalendarInterval getInterval(int ordinal) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public final InternalRow getStruct(int ordinal, int numFields) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public final ArrayData getArray(int ordinal) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public final MapData getMap(int ordinal) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public final Object get(int ordinal, DataType dataType) {
+ throw new NotImplementedException();
+ }
+ }
+
+ /**
+ * Returns an iterator over the rows in this batch. This skips rows that are filtered out.
+ */
+ public Iterator<Row> rowIterator() {
+ final int maxRows = ColumnarBatch.this.numRows();
+ final Row row = new Row();
+ return new Iterator<Row>() {
+ int rowId = 0;
+
+ @Override
+ public boolean hasNext() {
+ while (rowId < maxRows && ColumnarBatch.this.filteredRows[rowId]) {
+ ++rowId;
+ }
+ return rowId < maxRows;
+ }
+
+ @Override
+ public Row next() {
+ assert(hasNext());
+ while (rowId < maxRows && ColumnarBatch.this.filteredRows[rowId]) {
+ ++rowId;
+ }
+ row.rowId = rowId++;
+ return row;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ /**
+ * Resets the batch for writing.
+ */
+ public void reset() {
+ for (int i = 0; i < numCols(); ++i) {
+ columns[i].reset();
+ }
+ if (this.numRowsFiltered > 0) {
+ Arrays.fill(filteredRows, false);
+ }
+ this.numRows = 0;
+ this.numRowsFiltered = 0;
+ }
+
+ /**
+ * Sets the number of rows that are valid.
+ */
+ public void setNumRows(int numRows) {
+ assert(numRows <= this.capacity);
+ this.numRows = numRows;
+ }
+
+ /**
+ * Returns the number of columns that make up this batch.
+ */
+ public int numCols() { return columns.length; }
+
+ /**
+ * Returns the number of rows for read, including filtered rows.
+ */
+ public int numRows() { return numRows; }
+
+ /**
+ * Returns the number of valid rowss.
+ */
+ public int numValidRows() {
+ assert(numRowsFiltered <= numRows);
+ return numRows - numRowsFiltered;
+ }
+
+ /**
+ * Returns the max capacity (in number of rows) for this batch.
+ */
+ public int capacity() { return capacity; }
+
+ /**
+ * Returns the column at `ordinal`.
+ */
+ public ColumnVector column(int ordinal) { return columns[ordinal]; }
+
+ /**
+ * Marks this row as being filtered out. This means a subsequent iteration over the rows
+ * in this batch will not include this row.
+ */
+ public final void markFiltered(int rowId) {
+ assert(filteredRows[rowId] == false);
+ filteredRows[rowId] = true;
+ ++numRowsFiltered;
+ }
+
+ private ColumnarBatch(StructType schema, int maxRows, boolean offHeap) {
+ this.schema = schema;
+ this.capacity = maxRows;
+ this.columns = new ColumnVector[schema.size()];
+ this.filteredRows = new boolean[maxRows];
+
+ for (int i = 0; i < schema.fields().length; ++i) {
+ StructField field = schema.fields()[i];
+ columns[i] = ColumnVector.allocate(maxRows, field.dataType(), offHeap);
+ }
+ }
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
new file mode 100644
index 0000000000..2a9a2d1104
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.vectorized;
+
+import java.nio.ByteOrder;
+
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.unsafe.Platform;
+
+
+import org.apache.commons.lang.NotImplementedException;
+
+/**
+ * Column data backed using offheap memory.
+ */
+public final class OffHeapColumnVector extends ColumnVector {
+ // The data stored in these two allocations need to maintain binary compatible. We can
+ // directly pass this buffer to external components.
+ private long nulls;
+ private long data;
+
+ protected OffHeapColumnVector(int capacity, DataType type) {
+ super(capacity, type);
+ if (!ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN)) {
+ throw new NotImplementedException("Only little endian is supported.");
+ }
+
+ this.nulls = Platform.allocateMemory(capacity);
+ if (type instanceof IntegerType) {
+ this.data = Platform.allocateMemory(capacity * 4);
+ } else if (type instanceof DoubleType) {
+ this.data = Platform.allocateMemory(capacity * 8);
+ } else {
+ throw new RuntimeException("Unhandled " + type);
+ }
+ reset();
+ }
+
+ @Override
+ public final long valuesNativeAddress() {
+ return data;
+ }
+
+ @Override
+ public long nullsNativeAddress() {
+ return nulls;
+ }
+
+ @Override
+ public final void close() {
+ Platform.freeMemory(nulls);
+ Platform.freeMemory(data);
+ nulls = 0;
+ data = 0;
+ }
+
+ //
+ // APIs dealing with nulls
+ //
+
+ @Override
+ public final void putNotNull(int rowId) {
+ Platform.putByte(null, nulls + rowId, (byte) 0);
+ }
+
+ @Override
+ public final void putNull(int rowId) {
+ Platform.putByte(null, nulls + rowId, (byte) 1);
+ ++numNulls;
+ anyNullsSet = true;
+ }
+
+ @Override
+ public final void putNulls(int rowId, int count) {
+ long offset = nulls + rowId;
+ for (int i = 0; i < count; ++i, ++offset) {
+ Platform.putByte(null, offset, (byte) 1);
+ }
+ anyNullsSet = true;
+ numNulls += count;
+ }
+
+ @Override
+ public final void putNotNulls(int rowId, int count) {
+ long offset = nulls + rowId;
+ for (int i = 0; i < count; ++i, ++offset) {
+ Platform.putByte(null, offset, (byte) 0);
+ }
+ }
+
+ @Override
+ public final boolean getIsNull(int rowId) {
+ return Platform.getByte(null, nulls + rowId) == 1;
+ }
+
+ //
+ // APIs dealing with ints
+ //
+
+ @Override
+ public final void putInt(int rowId, int value) {
+ Platform.putInt(null, data + 4 * rowId, value);
+ }
+
+ @Override
+ public final void putInts(int rowId, int count, int value) {
+ long offset = data + 4 * rowId;
+ for (int i = 0; i < count; ++i, offset += 4) {
+ Platform.putInt(null, offset, value);
+ }
+ }
+
+ @Override
+ public final void putInts(int rowId, int count, int[] src, int srcIndex) {
+ Platform.copyMemory(src, Platform.INT_ARRAY_OFFSET + srcIndex * 4,
+ null, data + 4 * rowId, count * 4);
+ }
+
+ @Override
+ public final void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
+ Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,
+ null, data + 4 * rowId, count * 4);
+ }
+
+ @Override
+ public final int getInt(int rowId) {
+ return Platform.getInt(null, data + 4 * rowId);
+ }
+
+ //
+ // APIs dealing with doubles
+ //
+
+ @Override
+ public final void putDouble(int rowId, double value) {
+ Platform.putDouble(null, data + rowId * 8, value);
+ }
+
+ @Override
+ public final void putDoubles(int rowId, int count, double value) {
+ long offset = data + 8 * rowId;
+ for (int i = 0; i < count; ++i, offset += 8) {
+ Platform.putDouble(null, offset, value);
+ }
+ }
+
+ @Override
+ public final void putDoubles(int rowId, int count, double[] src, int srcIndex) {
+ Platform.copyMemory(src, Platform.DOUBLE_ARRAY_OFFSET + srcIndex * 8,
+ null, data + 8 * rowId, count * 8);
+ }
+
+ @Override
+ public final void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
+ Platform.copyMemory(src, Platform.DOUBLE_ARRAY_OFFSET + srcIndex,
+ null, data + rowId * 8, count * 8);
+ }
+
+ @Override
+ public final double getDouble(int rowId) {
+ return Platform.getDouble(null, data + rowId * 8);
+ }
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
new file mode 100644
index 0000000000..a7b3addf11
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.vectorized;
+
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.unsafe.Platform;
+
+import java.nio.ByteBuffer;
+import java.nio.DoubleBuffer;
+import java.util.Arrays;
+
+/**
+ * A column backed by an in memory JVM array. This stores the NULLs as a byte per value
+ * and a java array for the values.
+ */
+public final class OnHeapColumnVector extends ColumnVector {
+ // The data stored in these arrays need to maintain binary compatible. We can
+ // directly pass this buffer to external components.
+
+ // This is faster than a boolean array and we optimize this over memory footprint.
+ private byte[] nulls;
+
+ // Array for each type. Only 1 is populated for any type.
+ private int[] intData;
+ private double[] doubleData;
+
+ protected OnHeapColumnVector(int capacity, DataType type) {
+ super(capacity, type);
+ if (type instanceof IntegerType) {
+ this.intData = new int[capacity];
+ } else if (type instanceof DoubleType) {
+ this.doubleData = new double[capacity];
+ } else {
+ throw new RuntimeException("Unhandled " + type);
+ }
+ this.nulls = new byte[capacity];
+ reset();
+ }
+
+ @Override
+ public final long valuesNativeAddress() {
+ throw new RuntimeException("Cannot get native address for on heap column");
+ }
+ @Override
+ public final long nullsNativeAddress() {
+ throw new RuntimeException("Cannot get native address for on heap column");
+ }
+
+ @Override
+ public final void close() {
+ nulls = null;
+ intData = null;
+ doubleData = null;
+ }
+
+
+ //
+ // APIs dealing with nulls
+ //
+
+ @Override
+ public final void putNotNull(int rowId) {
+ nulls[rowId] = (byte)0;
+ }
+
+ @Override
+ public final void putNull(int rowId) {
+ nulls[rowId] = (byte)1;
+ ++numNulls;
+ anyNullsSet = true;
+ }
+
+ @Override
+ public final void putNulls(int rowId, int count) {
+ for (int i = 0; i < count; ++i) {
+ nulls[rowId + i] = (byte)1;
+ }
+ anyNullsSet = true;
+ numNulls += count;
+ }
+
+ @Override
+ public final void putNotNulls(int rowId, int count) {
+ for (int i = 0; i < count; ++i) {
+ nulls[rowId + i] = (byte)0;
+ }
+ }
+
+ @Override
+ public final boolean getIsNull(int rowId) {
+ return nulls[rowId] == 1;
+ }
+
+ //
+ // APIs dealing with Ints
+ //
+
+ @Override
+ public final void putInt(int rowId, int value) {
+ intData[rowId] = value;
+ }
+
+ @Override
+ public final void putInts(int rowId, int count, int value) {
+ for (int i = 0; i < count; ++i) {
+ intData[i + rowId] = value;
+ }
+ }
+
+ @Override
+ public final void putInts(int rowId, int count, int[] src, int srcIndex) {
+ System.arraycopy(src, srcIndex, intData, rowId, count);
+ }
+
+ @Override
+ public final void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
+ int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
+ for (int i = 0; i < count; ++i) {
+ intData[i + rowId] = Platform.getInt(src, srcOffset);;
+ srcIndex += 4;
+ srcOffset += 4;
+ }
+ }
+
+ @Override
+ public final int getInt(int rowId) {
+ return intData[rowId];
+ }
+
+ //
+ // APIs dealing with doubles
+ //
+
+ @Override
+ public final void putDouble(int rowId, double value) {
+ doubleData[rowId] = value;
+ }
+
+ @Override
+ public final void putDoubles(int rowId, int count, double value) {
+ Arrays.fill(doubleData, rowId, rowId + count, value);
+ }
+
+ @Override
+ public final void putDoubles(int rowId, int count, double[] src, int srcIndex) {
+ System.arraycopy(src, srcIndex, doubleData, rowId, count);
+ }
+
+ @Override
+ public final void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
+ Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, doubleData,
+ Platform.DOUBLE_ARRAY_OFFSET + rowId * 8, count * 8);
+ }
+
+ @Override
+ public final double getDouble(int rowId) {
+ return doubleData[rowId];
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
new file mode 100644
index 0000000000..e28153d12a
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.nio.ByteBuffer
+
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.vectorized.ColumnVector
+import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.util.Benchmark
+import org.apache.spark.util.collection.BitSet
+
+/**
+ * Benchmark to low level memory access using different ways to manage buffers.
+ */
+object ColumnarBatchBenchmark {
+
+ // This benchmark reads and writes an array of ints.
+ // TODO: there is a big (2x) penalty for a random access API for off heap.
+ // Note: carefully if modifying this code. It's hard to reason about the JIT.
+ def intAccess(iters: Long): Unit = {
+ val count = 8 * 1000
+
+ // Accessing a java array.
+ val javaArray = { i: Int =>
+ val data = new Array[Int](count)
+ var sum = 0L
+ for (n <- 0L until iters) {
+ var i = 0
+ while (i < count) {
+ data(i) = i
+ i += 1
+ }
+ i = 0
+ while (i < count) {
+ sum += data(i)
+ i += 1
+ }
+ }
+ }
+
+ // Accessing ByteBuffers
+ val byteBufferUnsafe = { i: Int =>
+ val data = ByteBuffer.allocate(count * 4)
+ var sum = 0L
+ for (n <- 0L until iters) {
+ var i = 0
+ while (i < count) {
+ Platform.putInt(data.array(), Platform.BYTE_ARRAY_OFFSET + i * 4, i)
+ i += 1
+ }
+ i = 0
+ while (i < count) {
+ sum += Platform.getInt(data.array(), Platform.BYTE_ARRAY_OFFSET + i * 4)
+ i += 1
+ }
+ }
+ }
+
+ // Accessing offheap byte buffers
+ val directByteBuffer = { i: Int =>
+ val data = ByteBuffer.allocateDirect(count * 4).asIntBuffer()
+ var sum = 0L
+ for (n <- 0L until iters) {
+ var i = 0
+ while (i < count) {
+ data.put(i)
+ i += 1
+ }
+ data.rewind()
+ i = 0
+ while (i < count) {
+ sum += data.get()
+ i += 1
+ }
+ data.rewind()
+ }
+ }
+
+ // Accessing ByteBuffer using the typed APIs
+ val byteBufferApi = { i: Int =>
+ val data = ByteBuffer.allocate(count * 4)
+ var sum = 0L
+ for (n <- 0L until iters) {
+ var i = 0
+ while (i < count) {
+ data.putInt(i)
+ i += 1
+ }
+ data.rewind()
+ i = 0
+ while (i < count) {
+ sum += data.getInt()
+ i += 1
+ }
+ data.rewind()
+ }
+ }
+
+ // Using unsafe memory
+ val unsafeBuffer = { i: Int =>
+ val data: Long = Platform.allocateMemory(count * 4)
+ var sum = 0L
+ for (n <- 0L until iters) {
+ var ptr = data
+ var i = 0
+ while (i < count) {
+ Platform.putInt(null, ptr, i)
+ ptr += 4
+ i += 1
+ }
+ ptr = data
+ i = 0
+ while (i < count) {
+ sum += Platform.getInt(null, ptr)
+ ptr += 4
+ i += 1
+ }
+ }
+ }
+
+ // Access through the column API with on heap memory
+ val columnOnHeap = { i: Int =>
+ val col = ColumnVector.allocate(count, IntegerType, false)
+ var sum = 0L
+ for (n <- 0L until iters) {
+ var i = 0
+ while (i < count) {
+ col.putInt(i, i)
+ i += 1
+ }
+ i = 0
+ while (i < count) {
+ sum += col.getInt(i)
+ i += 1
+ }
+ }
+ col.close
+ }
+
+ // Access through the column API with off heap memory
+ def columnOffHeap = { i: Int => {
+ val col = ColumnVector.allocate(count, IntegerType, true)
+ var sum = 0L
+ for (n <- 0L until iters) {
+ var i = 0
+ while (i < count) {
+ col.putInt(i, i)
+ i += 1
+ }
+ i = 0
+ while (i < count) {
+ sum += col.getInt(i)
+ i += 1
+ }
+ }
+ col.close
+ }}
+
+ // Access by directly getting the buffer backing the column.
+ val columnOffheapDirect = { i: Int =>
+ val col = ColumnVector.allocate(count, IntegerType, true)
+ var sum = 0L
+ for (n <- 0L until iters) {
+ var addr = col.valuesNativeAddress()
+ var i = 0
+ while (i < count) {
+ Platform.putInt(null, addr, i)
+ addr += 4
+ i += 1
+ }
+ i = 0
+ addr = col.valuesNativeAddress()
+ while (i < count) {
+ sum += Platform.getInt(null, addr)
+ addr += 4
+ i += 1
+ }
+ }
+ col.close
+ }
+
+ // Access by going through a batch of unsafe rows.
+ val unsafeRowOnheap = { i: Int =>
+ val buffer = new Array[Byte](count * 16)
+ var sum = 0L
+ for (n <- 0L until iters) {
+ val row = new UnsafeRow(1)
+ var i = 0
+ while (i < count) {
+ row.pointTo(buffer, Platform.BYTE_ARRAY_OFFSET + i * 16, 16)
+ row.setInt(0, i)
+ i += 1
+ }
+ i = 0
+ while (i < count) {
+ row.pointTo(buffer, Platform.BYTE_ARRAY_OFFSET + i * 16, 16)
+ sum += row.getInt(0)
+ i += 1
+ }
+ }
+ }
+
+ // Access by going through a batch of unsafe rows.
+ val unsafeRowOffheap = { i: Int =>
+ val buffer = Platform.allocateMemory(count * 16)
+ var sum = 0L
+ for (n <- 0L until iters) {
+ val row = new UnsafeRow(1)
+ var i = 0
+ while (i < count) {
+ row.pointTo(null, buffer + i * 16, 16)
+ row.setInt(0, i)
+ i += 1
+ }
+ i = 0
+ while (i < count) {
+ row.pointTo(null, buffer + i * 16, 16)
+ sum += row.getInt(0)
+ i += 1
+ }
+ }
+ Platform.freeMemory(buffer)
+ }
+
+ /*
+ Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
+ Int Read/Write: Avg Time(ms) Avg Rate(M/s) Relative Rate
+ -------------------------------------------------------------------------
+ Java Array 248.8 1317.04 1.00 X
+ ByteBuffer Unsafe 435.6 752.25 0.57 X
+ ByteBuffer API 1752.0 187.03 0.14 X
+ DirectByteBuffer 595.4 550.35 0.42 X
+ Unsafe Buffer 235.2 1393.20 1.06 X
+ Column(on heap) 189.8 1726.45 1.31 X
+ Column(off heap) 408.4 802.35 0.61 X
+ Column(off heap direct) 237.6 1379.12 1.05 X
+ UnsafeRow (on heap) 414.6 790.35 0.60 X
+ UnsafeRow (off heap) 487.2 672.58 0.51 X
+ */
+ val benchmark = new Benchmark("Int Read/Write", count * iters)
+ benchmark.addCase("Java Array")(javaArray)
+ benchmark.addCase("ByteBuffer Unsafe")(byteBufferUnsafe)
+ benchmark.addCase("ByteBuffer API")(byteBufferApi)
+ benchmark.addCase("DirectByteBuffer")(directByteBuffer)
+ benchmark.addCase("Unsafe Buffer")(unsafeBuffer)
+ benchmark.addCase("Column(on heap)")(columnOnHeap)
+ benchmark.addCase("Column(off heap)")(columnOffHeap)
+ benchmark.addCase("Column(off heap direct)")(columnOffheapDirect)
+ benchmark.addCase("UnsafeRow (on heap)")(unsafeRowOnheap)
+ benchmark.addCase("UnsafeRow (off heap)")(unsafeRowOffheap)
+ benchmark.run()
+ }
+
+ def booleanAccess(iters: Int): Unit = {
+ val count = 8 * 1024
+ val benchmark = new Benchmark("Boolean Read/Write", iters * count)
+ benchmark.addCase("Bitset") { i: Int => {
+ val b = new BitSet(count)
+ var sum = 0L
+ for (n <- 0L until iters) {
+ var i = 0
+ while (i < count) {
+ if (i % 2 == 0) b.set(i)
+ i += 1
+ }
+ i = 0
+ while (i < count) {
+ if (b.get(i)) sum += 1
+ i += 1
+ }
+ }
+ }}
+
+ benchmark.addCase("Byte Array") { i: Int => {
+ val b = new Array[Byte](count)
+ var sum = 0L
+ for (n <- 0L until iters) {
+ var i = 0
+ while (i < count) {
+ if (i % 2 == 0) b(i) = 1;
+ i += 1
+ }
+ i = 0
+ while (i < count) {
+ if (b(i) == 1) sum += 1
+ i += 1
+ }
+ }
+ }}
+ /*
+ Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
+ Boolean Read/Write: Avg Time(ms) Avg Rate(M/s) Relative Rate
+ -------------------------------------------------------------------------
+ Bitset 895.88 374.54 1.00 X
+ Byte Array 578.96 579.56 1.55 X
+ */
+ benchmark.run()
+ }
+
+ def main(args: Array[String]): Unit = {
+ intAccess(1024 * 40)
+ booleanAccess(1024 * 40)
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
new file mode 100644
index 0000000000..305a83e3e4
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.vectorized
+
+import scala.collection.mutable
+import scala.util.Random
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType}
+import org.apache.spark.unsafe.Platform
+
+class ColumnarBatchSuite extends SparkFunSuite {
+ test("Null Apis") {
+ (false :: true :: Nil).foreach { offHeap => {
+ val reference = mutable.ArrayBuffer.empty[Boolean]
+
+ val column = ColumnVector.allocate(1024, IntegerType, offHeap)
+ var idx = 0
+ assert(column.anyNullsSet() == false)
+
+ column.putNotNull(idx)
+ reference += false
+ idx += 1
+ assert(column.anyNullsSet() == false)
+
+ column.putNull(idx)
+ reference += true
+ idx += 1
+ assert(column.anyNullsSet() == true)
+ assert(column.numNulls() == 1)
+
+ column.putNulls(idx, 3)
+ reference += true
+ reference += true
+ reference += true
+ idx += 3
+ assert(column.anyNullsSet() == true)
+
+ column.putNotNulls(idx, 4)
+ reference += false
+ reference += false
+ reference += false
+ reference += false
+ idx += 4
+ assert(column.anyNullsSet() == true)
+ assert(column.numNulls() == 4)
+
+ reference.zipWithIndex.foreach { v =>
+ assert(v._1 == column.getIsNull(v._2))
+ if (offHeap) {
+ val addr = column.nullsNativeAddress()
+ assert(v._1 == (Platform.getByte(null, addr + v._2) == 1), "index=" + v._2)
+ }
+ }
+ column.close
+ }}
+ }
+
+ test("Int Apis") {
+ (false :: true :: Nil).foreach { offHeap => {
+ val seed = System.currentTimeMillis()
+ val random = new Random(seed)
+ val reference = mutable.ArrayBuffer.empty[Int]
+
+ val column = ColumnVector.allocate(1024, IntegerType, offHeap)
+ var idx = 0
+
+ val values = (1 :: 2 :: 3 :: 4 :: 5 :: Nil).toArray
+ column.putInts(idx, 2, values, 0)
+ reference += 1
+ reference += 2
+ idx += 2
+
+ column.putInts(idx, 3, values, 2)
+ reference += 3
+ reference += 4
+ reference += 5
+ idx += 3
+
+ val littleEndian = new Array[Byte](8)
+ littleEndian(0) = 7
+ littleEndian(1) = 1
+ littleEndian(4) = 6
+ littleEndian(6) = 1
+
+ column.putIntsLittleEndian(idx, 1, littleEndian, 4)
+ column.putIntsLittleEndian(idx + 1, 1, littleEndian, 0)
+ reference += 6 + (1 << 16)
+ reference += 7 + (1 << 8)
+ idx += 2
+
+ column.putIntsLittleEndian(idx, 2, littleEndian, 0)
+ reference += 7 + (1 << 8)
+ reference += 6 + (1 << 16)
+ idx += 2
+
+ while (idx < column.capacity) {
+ val single = random.nextBoolean()
+ if (single) {
+ val v = random.nextInt()
+ column.putInt(idx, v)
+ reference += v
+ idx += 1
+ } else {
+ val n = math.min(random.nextInt(column.capacity / 20), column.capacity - idx)
+ column.putInts(idx, n, n + 1)
+ var i = 0
+ while (i < n) {
+ reference += (n + 1)
+ i += 1
+ }
+ idx += n
+ }
+ }
+
+ reference.zipWithIndex.foreach { v =>
+ assert(v._1 == column.getInt(v._2), "Seed = " + seed + " Off Heap=" + offHeap)
+ if (offHeap) {
+ val addr = column.valuesNativeAddress()
+ assert(v._1 == Platform.getInt(null, addr + 4 * v._2))
+ }
+ }
+ column.close
+ }}
+ }
+
+ test("Double APIs") {
+ (false :: true :: Nil).foreach { offHeap => {
+ val seed = System.currentTimeMillis()
+ val random = new Random(seed)
+ val reference = mutable.ArrayBuffer.empty[Double]
+
+ val column = ColumnVector.allocate(1024, DoubleType, offHeap)
+ var idx = 0
+
+ val values = (1.0 :: 2.0 :: 3.0 :: 4.0 :: 5.0 :: Nil).toArray
+ column.putDoubles(idx, 2, values, 0)
+ reference += 1.0
+ reference += 2.0
+ idx += 2
+
+ column.putDoubles(idx, 3, values, 2)
+ reference += 3.0
+ reference += 4.0
+ reference += 5.0
+ idx += 3
+
+ val buffer = new Array[Byte](16)
+ Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET, 2.234)
+ Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, 1.123)
+
+ column.putDoubles(idx, 1, buffer, 8)
+ column.putDoubles(idx + 1, 1, buffer, 0)
+ reference += 1.123
+ reference += 2.234
+ idx += 2
+
+ column.putDoubles(idx, 2, buffer, 0)
+ reference += 2.234
+ reference += 1.123
+ idx += 2
+
+ while (idx < column.capacity) {
+ val single = random.nextBoolean()
+ if (single) {
+ val v = random.nextDouble()
+ column.putDouble(idx, v)
+ reference += v
+ idx += 1
+ } else {
+ val n = math.min(random.nextInt(column.capacity / 20), column.capacity - idx)
+ val v = random.nextDouble()
+ column.putDoubles(idx, n, v)
+ var i = 0
+ while (i < n) {
+ reference += v
+ i += 1
+ }
+ idx += n
+ }
+ }
+
+ reference.zipWithIndex.foreach { v =>
+ assert(v._1 == column.getDouble(v._2), "Seed = " + seed + " Off Heap=" + offHeap)
+ if (offHeap) {
+ val addr = column.valuesNativeAddress()
+ assert(v._1 == Platform.getDouble(null, addr + 8 * v._2))
+ }
+ }
+ column.close
+ }}
+ }
+
+ test("ColumnarBatch basic") {
+ (false :: true :: Nil).foreach { offHeap => {
+ val schema = new StructType()
+ .add("intCol", IntegerType)
+ .add("doubleCol", DoubleType)
+ .add("intCol2", IntegerType)
+
+ val batch = ColumnarBatch.allocate(schema, offHeap)
+ assert(batch.numCols() == 3)
+ assert(batch.numRows() == 0)
+ assert(batch.numValidRows() == 0)
+ assert(batch.capacity() > 0)
+ assert(batch.rowIterator().hasNext == false)
+
+ // Add a row [1, 1.1, NULL]
+ batch.column(0).putInt(0, 1)
+ batch.column(1).putDouble(0, 1.1)
+ batch.column(2).putNull(0)
+ batch.setNumRows(1)
+
+ // Verify the results of the row.
+ assert(batch.numCols() == 3)
+ assert(batch.numRows() == 1)
+ assert(batch.numValidRows() == 1)
+ assert(batch.rowIterator().hasNext == true)
+ assert(batch.rowIterator().hasNext == true)
+
+ assert(batch.column(0).getInt(0) == 1)
+ assert(batch.column(0).getIsNull(0) == false)
+ assert(batch.column(1).getDouble(0) == 1.1)
+ assert(batch.column(1).getIsNull(0) == false)
+ assert(batch.column(2).getIsNull(0) == true)
+
+ // Verify the iterator works correctly.
+ val it = batch.rowIterator()
+ assert(it.hasNext())
+ val row = it.next()
+ assert(row.getInt(0) == 1)
+ assert(row.isNullAt(0) == false)
+ assert(row.getDouble(1) == 1.1)
+ assert(row.isNullAt(1) == false)
+ assert(row.isNullAt(2) == true)
+ assert(it.hasNext == false)
+ assert(it.hasNext == false)
+
+ // Filter out the row.
+ row.markFiltered()
+ assert(batch.numRows() == 1)
+ assert(batch.numValidRows() == 0)
+ assert(batch.rowIterator().hasNext == false)
+
+ // Reset and add 3 throws
+ batch.reset()
+ assert(batch.numRows() == 0)
+ assert(batch.numValidRows() == 0)
+ assert(batch.rowIterator().hasNext == false)
+
+ // Add rows [NULL, 2.2, 2], [3, NULL, 3], [4, 4.4, 4]
+ batch.column(0).putNull(0)
+ batch.column(1).putDouble(0, 2.2)
+ batch.column(2).putInt(0, 2)
+
+ batch.column(0).putInt(1, 3)
+ batch.column(1).putNull(1)
+ batch.column(2).putInt(1, 3)
+
+ batch.column(0).putInt(2, 4)
+ batch.column(1).putDouble(2, 4.4)
+ batch.column(2).putInt(2, 4)
+ batch.setNumRows(3)
+
+ def rowEquals(x: InternalRow, y: Row): Unit = {
+ assert(x.isNullAt(0) == y.isNullAt(0))
+ if (!x.isNullAt(0)) assert(x.getInt(0) == y.getInt(0))
+
+ assert(x.isNullAt(1) == y.isNullAt(1))
+ if (!x.isNullAt(1)) assert(x.getDouble(1) == y.getDouble(1))
+
+ assert(x.isNullAt(2) == y.isNullAt(2))
+ if (!x.isNullAt(2)) assert(x.getInt(2) == y.getInt(2))
+ }
+ // Verify
+ assert(batch.numRows() == 3)
+ assert(batch.numValidRows() == 3)
+ val it2 = batch.rowIterator()
+ rowEquals(it2.next(), Row(null, 2.2, 2))
+ rowEquals(it2.next(), Row(3, null, 3))
+ rowEquals(it2.next(), Row(4, 4.4, 4))
+ assert(!it.hasNext)
+
+ // Filter out some rows and verify
+ batch.markFiltered(1)
+ assert(batch.numValidRows() == 2)
+ val it3 = batch.rowIterator()
+ rowEquals(it3.next(), Row(null, 2.2, 2))
+ rowEquals(it3.next(), Row(4, 4.4, 4))
+ assert(!it.hasNext)
+
+ batch.markFiltered(2)
+ assert(batch.numValidRows() == 1)
+ val it4 = batch.rowIterator()
+ rowEquals(it4.next(), Row(null, 2.2, 2))
+
+ batch.close
+ }}
+ }
+}