aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorSameer Agarwal <sameer@databricks.com>2016-03-18 22:33:43 -0700
committerReynold Xin <rxin@databricks.com>2016-03-18 22:33:43 -0700
commitb39594472b69b4eafc11f1a74eb47872215bdfdd (patch)
tree8dfd7679c74a896fa7dc1f30587bd3f28efde907 /sql/core
parentc11ea2e4138acdd8d4ed487049ded35346bca528 (diff)
downloadspark-b39594472b69b4eafc11f1a74eb47872215bdfdd.tar.gz
spark-b39594472b69b4eafc11f1a74eb47872215bdfdd.tar.bz2
spark-b39594472b69b4eafc11f1a74eb47872215bdfdd.zip
[SPARK-14012][SQL] Extract VectorizedColumnReader from VectorizedParquetRecordReader
## What changes were proposed in this pull request? This is a minor followup on https://github.com/apache/spark/pull/11799 that extracts out the `VectorizedColumnReader` from `VectorizedParquetRecordReader` into its own file. ## How was this patch tested? N/A (refactoring only) Author: Sameer Agarwal <sameer@databricks.com> Closes #11834 from sameeragarwal/rename.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java475
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java451
2 files changed, 476 insertions, 450 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
new file mode 100644
index 0000000000..46c84c5dd4
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -0,0 +1,475 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.*;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.api.Binary;
+
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DecimalType;
+
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.ValuesReaderIntIterator;
+import static org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.createRLEIterator;
+
+/**
+ * Decoder to return values from a single column.
+ */
+public class VectorizedColumnReader {
+ /**
+ * Total number of values read.
+ */
+ private long valuesRead;
+
+ /**
+ * value that indicates the end of the current page. That is,
+ * if valuesRead == endOfPageValueCount, we are at the end of the page.
+ */
+ private long endOfPageValueCount;
+
+ /**
+ * The dictionary, if this column has dictionary encoding.
+ */
+ private final Dictionary dictionary;
+
+ /**
+ * If true, the current page is dictionary encoded.
+ */
+ private boolean useDictionary;
+
+ /**
+ * Maximum definition level for this column.
+ */
+ private final int maxDefLevel;
+
+ /**
+ * Repetition/Definition/Value readers.
+ */
+ private SpecificParquetRecordReaderBase.IntIterator repetitionLevelColumn;
+ private SpecificParquetRecordReaderBase.IntIterator definitionLevelColumn;
+ private ValuesReader dataColumn;
+
+ // Only set if vectorized decoding is true. This is used instead of the row by row decoding
+ // with `definitionLevelColumn`.
+ private VectorizedRleValuesReader defColumn;
+
+ /**
+ * Total number of values in this column (in this row group).
+ */
+ private final long totalValueCount;
+
+ /**
+ * Total values in the current page.
+ */
+ private int pageValueCount;
+
+ private final PageReader pageReader;
+ private final ColumnDescriptor descriptor;
+
+ public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader pageReader)
+ throws IOException {
+ this.descriptor = descriptor;
+ this.pageReader = pageReader;
+ this.maxDefLevel = descriptor.getMaxDefinitionLevel();
+
+ DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
+ if (dictionaryPage != null) {
+ try {
+ this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage);
+ this.useDictionary = true;
+ } catch (IOException e) {
+ throw new IOException("could not decode the dictionary for " + descriptor, e);
+ }
+ } else {
+ this.dictionary = null;
+ this.useDictionary = false;
+ }
+ this.totalValueCount = pageReader.getTotalValueCount();
+ if (totalValueCount == 0) {
+ throw new IOException("totalValueCount == 0");
+ }
+ }
+
+ /**
+ * TODO: Hoist the useDictionary branch to decode*Batch and make the batch page aligned.
+ */
+ public boolean nextBoolean() {
+ if (!useDictionary) {
+ return dataColumn.readBoolean();
+ } else {
+ return dictionary.decodeToBoolean(dataColumn.readValueDictionaryId());
+ }
+ }
+
+ public int nextInt() {
+ if (!useDictionary) {
+ return dataColumn.readInteger();
+ } else {
+ return dictionary.decodeToInt(dataColumn.readValueDictionaryId());
+ }
+ }
+
+ public long nextLong() {
+ if (!useDictionary) {
+ return dataColumn.readLong();
+ } else {
+ return dictionary.decodeToLong(dataColumn.readValueDictionaryId());
+ }
+ }
+
+ public float nextFloat() {
+ if (!useDictionary) {
+ return dataColumn.readFloat();
+ } else {
+ return dictionary.decodeToFloat(dataColumn.readValueDictionaryId());
+ }
+ }
+
+ public double nextDouble() {
+ if (!useDictionary) {
+ return dataColumn.readDouble();
+ } else {
+ return dictionary.decodeToDouble(dataColumn.readValueDictionaryId());
+ }
+ }
+
+ public Binary nextBinary() {
+ if (!useDictionary) {
+ return dataColumn.readBytes();
+ } else {
+ return dictionary.decodeToBinary(dataColumn.readValueDictionaryId());
+ }
+ }
+
+ /**
+ * Advances to the next value. Returns true if the value is non-null.
+ */
+ private boolean next() throws IOException {
+ if (valuesRead >= endOfPageValueCount) {
+ if (valuesRead >= totalValueCount) {
+ // How do we get here? Throw end of stream exception?
+ return false;
+ }
+ readPage();
+ }
+ ++valuesRead;
+ // TODO: Don't read for flat schemas
+ //repetitionLevel = repetitionLevelColumn.nextInt();
+ return definitionLevelColumn.nextInt() == maxDefLevel;
+ }
+
+ /**
+ * Reads `total` values from this columnReader into column.
+ */
+ void readBatch(int total, ColumnVector column) throws IOException {
+ int rowId = 0;
+ while (total > 0) {
+ // Compute the number of values we want to read in this page.
+ int leftInPage = (int) (endOfPageValueCount - valuesRead);
+ if (leftInPage == 0) {
+ readPage();
+ leftInPage = (int) (endOfPageValueCount - valuesRead);
+ }
+ int num = Math.min(total, leftInPage);
+ if (useDictionary) {
+ // Read and decode dictionary ids.
+ ColumnVector dictionaryIds = column.reserveDictionaryIds(total);
+ defColumn.readIntegers(
+ num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+ decodeDictionaryIds(rowId, num, column, dictionaryIds);
+ } else {
+ column.setDictionary(null);
+ switch (descriptor.getType()) {
+ case BOOLEAN:
+ readBooleanBatch(rowId, num, column);
+ break;
+ case INT32:
+ readIntBatch(rowId, num, column);
+ break;
+ case INT64:
+ readLongBatch(rowId, num, column);
+ break;
+ case FLOAT:
+ readFloatBatch(rowId, num, column);
+ break;
+ case DOUBLE:
+ readDoubleBatch(rowId, num, column);
+ break;
+ case BINARY:
+ readBinaryBatch(rowId, num, column);
+ break;
+ case FIXED_LEN_BYTE_ARRAY:
+ readFixedLenByteArrayBatch(rowId, num, column, descriptor.getTypeLength());
+ break;
+ default:
+ throw new IOException("Unsupported type: " + descriptor.getType());
+ }
+ }
+
+ valuesRead += num;
+ rowId += num;
+ total -= num;
+ }
+ }
+
+ /**
+ * Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`.
+ */
+ private void decodeDictionaryIds(int rowId, int num, ColumnVector column,
+ ColumnVector dictionaryIds) {
+ switch (descriptor.getType()) {
+ case INT32:
+ case INT64:
+ case FLOAT:
+ case DOUBLE:
+ case BINARY:
+ column.setDictionary(dictionary);
+ break;
+
+ case FIXED_LEN_BYTE_ARRAY:
+ // DecimalType written in the legacy mode
+ if (DecimalType.is32BitDecimalType(column.dataType())) {
+ for (int i = rowId; i < rowId + num; ++i) {
+ Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
+ column.putInt(i, (int) CatalystRowConverter.binaryToUnscaledLong(v));
+ }
+ } else if (DecimalType.is64BitDecimalType(column.dataType())) {
+ for (int i = rowId; i < rowId + num; ++i) {
+ Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
+ column.putLong(i, CatalystRowConverter.binaryToUnscaledLong(v));
+ }
+ } else {
+ throw new NotImplementedException();
+ }
+ break;
+
+ default:
+ throw new NotImplementedException("Unsupported type: " + descriptor.getType());
+ }
+ }
+
+ /**
+ * For all the read*Batch functions, reads `num` values from this columnReader into column. It
+ * is guaranteed that num is smaller than the number of values left in the current page.
+ */
+
+ private void readBooleanBatch(int rowId, int num, ColumnVector column) throws IOException {
+ assert(column.dataType() == DataTypes.BooleanType);
+ defColumn.readBooleans(
+ num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+ }
+
+ private void readIntBatch(int rowId, int num, ColumnVector column) throws IOException {
+ // This is where we implement support for the valid type conversions.
+ // TODO: implement remaining type conversions
+ if (column.dataType() == DataTypes.IntegerType || column.dataType() == DataTypes.DateType ||
+ DecimalType.is32BitDecimalType(column.dataType())) {
+ defColumn.readIntegers(
+ num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+ } else if (column.dataType() == DataTypes.ByteType) {
+ defColumn.readBytes(
+ num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+ } else if (column.dataType() == DataTypes.ShortType) {
+ defColumn.readShorts(
+ num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+ } else {
+ throw new NotImplementedException("Unimplemented type: " + column.dataType());
+ }
+ }
+
+ private void readLongBatch(int rowId, int num, ColumnVector column) throws IOException {
+ // This is where we implement support for the valid type conversions.
+ if (column.dataType() == DataTypes.LongType ||
+ DecimalType.is64BitDecimalType(column.dataType())) {
+ defColumn.readLongs(
+ num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+ } else {
+ throw new UnsupportedOperationException("Unsupported conversion to: " + column.dataType());
+ }
+ }
+
+ private void readFloatBatch(int rowId, int num, ColumnVector column) throws IOException {
+ // This is where we implement support for the valid type conversions.
+ // TODO: support implicit cast to double?
+ if (column.dataType() == DataTypes.FloatType) {
+ defColumn.readFloats(
+ num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+ } else {
+ throw new UnsupportedOperationException("Unsupported conversion to: " + column.dataType());
+ }
+ }
+
+ private void readDoubleBatch(int rowId, int num, ColumnVector column) throws IOException {
+ // This is where we implement support for the valid type conversions.
+ // TODO: implement remaining type conversions
+ if (column.dataType() == DataTypes.DoubleType) {
+ defColumn.readDoubles(
+ num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+ } else {
+ throw new NotImplementedException("Unimplemented type: " + column.dataType());
+ }
+ }
+
+ private void readBinaryBatch(int rowId, int num, ColumnVector column) throws IOException {
+ // This is where we implement support for the valid type conversions.
+ // TODO: implement remaining type conversions
+ if (column.isArray()) {
+ defColumn.readBinarys(
+ num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+ } else {
+ throw new NotImplementedException("Unimplemented type: " + column.dataType());
+ }
+ }
+
+ private void readFixedLenByteArrayBatch(int rowId, int num,
+ ColumnVector column, int arrayLen) throws IOException {
+ VectorizedValuesReader data = (VectorizedValuesReader) dataColumn;
+ // This is where we implement support for the valid type conversions.
+ // TODO: implement remaining type conversions
+ if (DecimalType.is32BitDecimalType(column.dataType())) {
+ for (int i = 0; i < num; i++) {
+ if (defColumn.readInteger() == maxDefLevel) {
+ column.putInt(rowId + i,
+ (int) CatalystRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
+ } else {
+ column.putNull(rowId + i);
+ }
+ }
+ } else if (DecimalType.is64BitDecimalType(column.dataType())) {
+ for (int i = 0; i < num; i++) {
+ if (defColumn.readInteger() == maxDefLevel) {
+ column.putLong(rowId + i,
+ CatalystRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
+ } else {
+ column.putNull(rowId + i);
+ }
+ }
+ } else {
+ throw new NotImplementedException("Unimplemented type: " + column.dataType());
+ }
+ }
+
+ private void readPage() throws IOException {
+ DataPage page = pageReader.readPage();
+ // TODO: Why is this a visitor?
+ page.accept(new DataPage.Visitor<Void>() {
+ @Override
+ public Void visit(DataPageV1 dataPageV1) {
+ try {
+ readPageV1(dataPageV1);
+ return null;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Void visit(DataPageV2 dataPageV2) {
+ try {
+ readPageV2(dataPageV2);
+ return null;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+
+ private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset) throws IOException {
+ this.endOfPageValueCount = valuesRead + pageValueCount;
+ if (dataEncoding.usesDictionary()) {
+ this.dataColumn = null;
+ if (dictionary == null) {
+ throw new IOException(
+ "could not read page in col " + descriptor +
+ " as the dictionary was missing for encoding " + dataEncoding);
+ }
+ @SuppressWarnings("deprecation")
+ Encoding plainDict = Encoding.PLAIN_DICTIONARY; // var to allow warning suppression
+ if (dataEncoding != plainDict && dataEncoding != Encoding.RLE_DICTIONARY) {
+ throw new NotImplementedException("Unsupported encoding: " + dataEncoding);
+ }
+ this.dataColumn = new VectorizedRleValuesReader();
+ this.useDictionary = true;
+ } else {
+ if (dataEncoding != Encoding.PLAIN) {
+ throw new NotImplementedException("Unsupported encoding: " + dataEncoding);
+ }
+ this.dataColumn = new VectorizedPlainValuesReader();
+ this.useDictionary = false;
+ }
+
+ try {
+ dataColumn.initFromPage(pageValueCount, bytes, offset);
+ } catch (IOException e) {
+ throw new IOException("could not read page in col " + descriptor, e);
+ }
+ }
+
+ private void readPageV1(DataPageV1 page) throws IOException {
+ this.pageValueCount = page.getValueCount();
+ ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
+ ValuesReader dlReader;
+
+ // Initialize the decoders.
+ if (page.getDlEncoding() != Encoding.RLE && descriptor.getMaxDefinitionLevel() != 0) {
+ throw new NotImplementedException("Unsupported encoding: " + page.getDlEncoding());
+ }
+ int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
+ this.defColumn = new VectorizedRleValuesReader(bitWidth);
+ dlReader = this.defColumn;
+ this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
+ this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
+ try {
+ byte[] bytes = page.getBytes().toByteArray();
+ rlReader.initFromPage(pageValueCount, bytes, 0);
+ int next = rlReader.getNextOffset();
+ dlReader.initFromPage(pageValueCount, bytes, next);
+ next = dlReader.getNextOffset();
+ initDataReader(page.getValueEncoding(), bytes, next);
+ } catch (IOException e) {
+ throw new IOException("could not read page " + page + " in col " + descriptor, e);
+ }
+ }
+
+ private void readPageV2(DataPageV2 page) throws IOException {
+ this.pageValueCount = page.getValueCount();
+ this.repetitionLevelColumn = createRLEIterator(descriptor.getMaxRepetitionLevel(),
+ page.getRepetitionLevels(), descriptor);
+
+ int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
+ this.defColumn = new VectorizedRleValuesReader(bitWidth);
+ this.definitionLevelColumn = new ValuesReaderIntIterator(this.defColumn);
+ this.defColumn.initFromBuffer(
+ this.pageValueCount, page.getDefinitionLevels().toByteArray());
+ try {
+ initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0);
+ } catch (IOException e) {
+ throw new IOException("could not read page " + page + " in col " + descriptor, e);
+ }
+ }
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index 0f00f56a3a..ef44b62a8b 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -20,28 +20,17 @@ package org.apache.spark.sql.execution.datasources.parquet;
import java.io.IOException;
import java.util.List;
-import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.Dictionary;
-import org.apache.parquet.column.Encoding;
-import org.apache.parquet.column.page.*;
-import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.execution.vectorized.ColumnVector;
import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
-import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.sql.types.DecimalType;
-
-import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
/**
* A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
@@ -245,444 +234,6 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
}
}
- /**
- * Decoder to return values from a single column.
- */
- private class VectorizedColumnReader {
- /**
- * Total number of values read.
- */
- private long valuesRead;
-
- /**
- * value that indicates the end of the current page. That is,
- * if valuesRead == endOfPageValueCount, we are at the end of the page.
- */
- private long endOfPageValueCount;
-
- /**
- * The dictionary, if this column has dictionary encoding.
- */
- private final Dictionary dictionary;
-
- /**
- * If true, the current page is dictionary encoded.
- */
- private boolean useDictionary;
-
- /**
- * Maximum definition level for this column.
- */
- private final int maxDefLevel;
-
- /**
- * Repetition/Definition/Value readers.
- */
- private IntIterator repetitionLevelColumn;
- private IntIterator definitionLevelColumn;
- private ValuesReader dataColumn;
-
- // Only set if vectorized decoding is true. This is used instead of the row by row decoding
- // with `definitionLevelColumn`.
- private VectorizedRleValuesReader defColumn;
-
- /**
- * Total number of values in this column (in this row group).
- */
- private final long totalValueCount;
-
- /**
- * Total values in the current page.
- */
- private int pageValueCount;
-
- private final PageReader pageReader;
- private final ColumnDescriptor descriptor;
-
- public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader pageReader)
- throws IOException {
- this.descriptor = descriptor;
- this.pageReader = pageReader;
- this.maxDefLevel = descriptor.getMaxDefinitionLevel();
-
- DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
- if (dictionaryPage != null) {
- try {
- this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage);
- this.useDictionary = true;
- } catch (IOException e) {
- throw new IOException("could not decode the dictionary for " + descriptor, e);
- }
- } else {
- this.dictionary = null;
- this.useDictionary = false;
- }
- this.totalValueCount = pageReader.getTotalValueCount();
- if (totalValueCount == 0) {
- throw new IOException("totalValueCount == 0");
- }
- }
-
- /**
- * TODO: Hoist the useDictionary branch to decode*Batch and make the batch page aligned.
- */
- public boolean nextBoolean() {
- if (!useDictionary) {
- return dataColumn.readBoolean();
- } else {
- return dictionary.decodeToBoolean(dataColumn.readValueDictionaryId());
- }
- }
-
- public int nextInt() {
- if (!useDictionary) {
- return dataColumn.readInteger();
- } else {
- return dictionary.decodeToInt(dataColumn.readValueDictionaryId());
- }
- }
-
- public long nextLong() {
- if (!useDictionary) {
- return dataColumn.readLong();
- } else {
- return dictionary.decodeToLong(dataColumn.readValueDictionaryId());
- }
- }
-
- public float nextFloat() {
- if (!useDictionary) {
- return dataColumn.readFloat();
- } else {
- return dictionary.decodeToFloat(dataColumn.readValueDictionaryId());
- }
- }
-
- public double nextDouble() {
- if (!useDictionary) {
- return dataColumn.readDouble();
- } else {
- return dictionary.decodeToDouble(dataColumn.readValueDictionaryId());
- }
- }
-
- public Binary nextBinary() {
- if (!useDictionary) {
- return dataColumn.readBytes();
- } else {
- return dictionary.decodeToBinary(dataColumn.readValueDictionaryId());
- }
- }
-
- /**
- * Advances to the next value. Returns true if the value is non-null.
- */
- private boolean next() throws IOException {
- if (valuesRead >= endOfPageValueCount) {
- if (valuesRead >= totalValueCount) {
- // How do we get here? Throw end of stream exception?
- return false;
- }
- readPage();
- }
- ++valuesRead;
- // TODO: Don't read for flat schemas
- //repetitionLevel = repetitionLevelColumn.nextInt();
- return definitionLevelColumn.nextInt() == maxDefLevel;
- }
-
- /**
- * Reads `total` values from this columnReader into column.
- */
- private void readBatch(int total, ColumnVector column) throws IOException {
- int rowId = 0;
- while (total > 0) {
- // Compute the number of values we want to read in this page.
- int leftInPage = (int)(endOfPageValueCount - valuesRead);
- if (leftInPage == 0) {
- readPage();
- leftInPage = (int)(endOfPageValueCount - valuesRead);
- }
- int num = Math.min(total, leftInPage);
- if (useDictionary) {
- // Read and decode dictionary ids.
- ColumnVector dictionaryIds = column.reserveDictionaryIds(total);
- defColumn.readIntegers(
- num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
- decodeDictionaryIds(rowId, num, column, dictionaryIds);
- } else {
- column.setDictionary(null);
- switch (descriptor.getType()) {
- case BOOLEAN:
- readBooleanBatch(rowId, num, column);
- break;
- case INT32:
- readIntBatch(rowId, num, column);
- break;
- case INT64:
- readLongBatch(rowId, num, column);
- break;
- case FLOAT:
- readFloatBatch(rowId, num, column);
- break;
- case DOUBLE:
- readDoubleBatch(rowId, num, column);
- break;
- case BINARY:
- readBinaryBatch(rowId, num, column);
- break;
- case FIXED_LEN_BYTE_ARRAY:
- readFixedLenByteArrayBatch(rowId, num, column, descriptor.getTypeLength());
- break;
- default:
- throw new IOException("Unsupported type: " + descriptor.getType());
- }
- }
-
- valuesRead += num;
- rowId += num;
- total -= num;
- }
- }
-
- /**
- * Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`.
- */
- private void decodeDictionaryIds(int rowId, int num, ColumnVector column,
- ColumnVector dictionaryIds) {
- switch (descriptor.getType()) {
- case INT32:
- case INT64:
- case FLOAT:
- case DOUBLE:
- case BINARY:
- column.setDictionary(dictionary);
- break;
-
- case FIXED_LEN_BYTE_ARRAY:
- // DecimalType written in the legacy mode
- if (DecimalType.is32BitDecimalType(column.dataType())) {
- for (int i = rowId; i < rowId + num; ++i) {
- Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
- column.putInt(i, (int) CatalystRowConverter.binaryToUnscaledLong(v));
- }
- } else if (DecimalType.is64BitDecimalType(column.dataType())) {
- for (int i = rowId; i < rowId + num; ++i) {
- Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
- column.putLong(i, CatalystRowConverter.binaryToUnscaledLong(v));
- }
- } else {
- throw new NotImplementedException();
- }
- break;
-
- default:
- throw new NotImplementedException("Unsupported type: " + descriptor.getType());
- }
- }
-
- /**
- * For all the read*Batch functions, reads `num` values from this columnReader into column. It
- * is guaranteed that num is smaller than the number of values left in the current page.
- */
-
- private void readBooleanBatch(int rowId, int num, ColumnVector column) throws IOException {
- assert(column.dataType() == DataTypes.BooleanType);
- defColumn.readBooleans(
- num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
- }
-
- private void readIntBatch(int rowId, int num, ColumnVector column) throws IOException {
- // This is where we implement support for the valid type conversions.
- // TODO: implement remaining type conversions
- if (column.dataType() == DataTypes.IntegerType || column.dataType() == DataTypes.DateType ||
- DecimalType.is32BitDecimalType(column.dataType())) {
- defColumn.readIntegers(
- num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
- } else if (column.dataType() == DataTypes.ByteType) {
- defColumn.readBytes(
- num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
- } else if (column.dataType() == DataTypes.ShortType) {
- defColumn.readShorts(
- num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
- } else {
- throw new NotImplementedException("Unimplemented type: " + column.dataType());
- }
- }
-
- private void readLongBatch(int rowId, int num, ColumnVector column) throws IOException {
- // This is where we implement support for the valid type conversions.
- if (column.dataType() == DataTypes.LongType ||
- DecimalType.is64BitDecimalType(column.dataType())) {
- defColumn.readLongs(
- num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
- } else {
- throw new UnsupportedOperationException("Unsupported conversion to: " + column.dataType());
- }
- }
-
- private void readFloatBatch(int rowId, int num, ColumnVector column) throws IOException {
- // This is where we implement support for the valid type conversions.
- // TODO: support implicit cast to double?
- if (column.dataType() == DataTypes.FloatType) {
- defColumn.readFloats(
- num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
- } else {
- throw new UnsupportedOperationException("Unsupported conversion to: " + column.dataType());
- }
- }
-
- private void readDoubleBatch(int rowId, int num, ColumnVector column) throws IOException {
- // This is where we implement support for the valid type conversions.
- // TODO: implement remaining type conversions
- if (column.dataType() == DataTypes.DoubleType) {
- defColumn.readDoubles(
- num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
- } else {
- throw new NotImplementedException("Unimplemented type: " + column.dataType());
- }
- }
-
- private void readBinaryBatch(int rowId, int num, ColumnVector column) throws IOException {
- // This is where we implement support for the valid type conversions.
- // TODO: implement remaining type conversions
- if (column.isArray()) {
- defColumn.readBinarys(
- num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
- } else {
- throw new NotImplementedException("Unimplemented type: " + column.dataType());
- }
- }
-
- private void readFixedLenByteArrayBatch(int rowId, int num,
- ColumnVector column, int arrayLen) throws IOException {
- VectorizedValuesReader data = (VectorizedValuesReader) dataColumn;
- // This is where we implement support for the valid type conversions.
- // TODO: implement remaining type conversions
- if (DecimalType.is32BitDecimalType(column.dataType())) {
- for (int i = 0; i < num; i++) {
- if (defColumn.readInteger() == maxDefLevel) {
- column.putInt(rowId + i,
- (int) CatalystRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
- } else {
- column.putNull(rowId + i);
- }
- }
- } else if (DecimalType.is64BitDecimalType(column.dataType())) {
- for (int i = 0; i < num; i++) {
- if (defColumn.readInteger() == maxDefLevel) {
- column.putLong(rowId + i,
- CatalystRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
- } else {
- column.putNull(rowId + i);
- }
- }
- } else {
- throw new NotImplementedException("Unimplemented type: " + column.dataType());
- }
- }
-
- private void readPage() throws IOException {
- DataPage page = pageReader.readPage();
- // TODO: Why is this a visitor?
- page.accept(new DataPage.Visitor<Void>() {
- @Override
- public Void visit(DataPageV1 dataPageV1) {
- try {
- readPageV1(dataPageV1);
- return null;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public Void visit(DataPageV2 dataPageV2) {
- try {
- readPageV2(dataPageV2);
- return null;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- });
- }
-
- private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset) throws IOException {
- this.endOfPageValueCount = valuesRead + pageValueCount;
- if (dataEncoding.usesDictionary()) {
- this.dataColumn = null;
- if (dictionary == null) {
- throw new IOException(
- "could not read page in col " + descriptor +
- " as the dictionary was missing for encoding " + dataEncoding);
- }
- @SuppressWarnings("deprecation")
- Encoding plainDict = Encoding.PLAIN_DICTIONARY; // var to allow warning suppression
- if (dataEncoding != plainDict && dataEncoding != Encoding.RLE_DICTIONARY) {
- throw new NotImplementedException("Unsupported encoding: " + dataEncoding);
- }
- this.dataColumn = new VectorizedRleValuesReader();
- this.useDictionary = true;
- } else {
- if (dataEncoding != Encoding.PLAIN) {
- throw new NotImplementedException("Unsupported encoding: " + dataEncoding);
- }
- this.dataColumn = new VectorizedPlainValuesReader();
- this.useDictionary = false;
- }
-
- try {
- dataColumn.initFromPage(pageValueCount, bytes, offset);
- } catch (IOException e) {
- throw new IOException("could not read page in col " + descriptor, e);
- }
- }
-
- private void readPageV1(DataPageV1 page) throws IOException {
- this.pageValueCount = page.getValueCount();
- ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
- ValuesReader dlReader;
-
- // Initialize the decoders.
- if (page.getDlEncoding() != Encoding.RLE && descriptor.getMaxDefinitionLevel() != 0) {
- throw new NotImplementedException("Unsupported encoding: " + page.getDlEncoding());
- }
- int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
- this.defColumn = new VectorizedRleValuesReader(bitWidth);
- dlReader = this.defColumn;
- this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
- this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
- try {
- byte[] bytes = page.getBytes().toByteArray();
- rlReader.initFromPage(pageValueCount, bytes, 0);
- int next = rlReader.getNextOffset();
- dlReader.initFromPage(pageValueCount, bytes, next);
- next = dlReader.getNextOffset();
- initDataReader(page.getValueEncoding(), bytes, next);
- } catch (IOException e) {
- throw new IOException("could not read page " + page + " in col " + descriptor, e);
- }
- }
-
- private void readPageV2(DataPageV2 page) throws IOException {
- this.pageValueCount = page.getValueCount();
- this.repetitionLevelColumn = createRLEIterator(descriptor.getMaxRepetitionLevel(),
- page.getRepetitionLevels(), descriptor);
-
- int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
- this.defColumn = new VectorizedRleValuesReader(bitWidth);
- this.definitionLevelColumn = new ValuesReaderIntIterator(this.defColumn);
- this.defColumn.initFromBuffer(
- this.pageValueCount, page.getDefinitionLevels().toByteArray());
- try {
- initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0);
- } catch (IOException e) {
- throw new IOException("could not read page " + page + " in col " + descriptor, e);
- }
- }
- }
-
private void checkEndOfRowGroup() throws IOException {
if (rowsReturned != totalCountLoadedSoFar) return;
PageReadStore pages = reader.readNextRowGroup();