From 0a64294fcb4b64bfe095c63c3a494e0f40e22743 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Wed, 23 Mar 2016 12:13:32 -0700 Subject: [SPARK-14015][SQL] Support TimestampType in vectorized parquet reader ## What changes were proposed in this pull request? This PR adds support for TimestampType in the vectorized parquet reader ## How was this patch tested? 1. `VectorizedColumnReader` initially had a gating condition on `primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96)` that made us fall back on parquet-mr for handling timestamps. This condition is now removed. 2. The `ParquetHadoopFsRelationSuite` (that tests for all supported hive types -- including `TimestampType`) fails when the gating condition is removed (https://github.com/apache/spark/pull/11808) and should now pass with this change. Similarly, the `ParquetHiveCompatibilitySuite.SPARK-10177 timestamp` test that fails when the gating condition is removed, should now pass as well. 3. Added tests in `HadoopFsRelationTest` that test both the dictionary encoded and non-encoded versions across all supported datatypes. Author: Sameer Agarwal Closes #11882 from sameeragarwal/timestamp-parquet. --- .../parquet/VectorizedColumnReader.java | 29 +++++++- .../parquet/VectorizedParquetRecordReader.java | 13 ---- .../execution/vectorized/OffHeapColumnVector.java | 2 +- .../execution/vectorized/OnHeapColumnVector.java | 3 +- .../datasources/parquet/CatalystRowConverter.scala | 10 +++ .../spark/sql/sources/hadoopFsRelationSuites.scala | 82 +++++++++++++--------- 6 files changed, 86 insertions(+), 53 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index 2c23ccc357..6cc2fda587 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -213,6 +213,9 @@ public class VectorizedColumnReader { case INT64: readLongBatch(rowId, num, column); break; + case INT96: + readBinaryBatch(rowId, num, column); + break; case FLOAT: readFloatBatch(rowId, num, column); break; @@ -249,7 +252,17 @@ public class VectorizedColumnReader { case BINARY: column.setDictionary(dictionary); break; - + case INT96: + if (column.dataType() == DataTypes.TimestampType) { + for (int i = rowId; i < rowId + num; ++i) { + // TODO: Convert dictionary of Binaries to dictionary of Longs + Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i)); + column.putLong(i, CatalystRowConverter.binaryToSQLTimestamp(v)); + } + } else { + throw new NotImplementedException(); + } + break; case FIXED_LEN_BYTE_ARRAY: // DecimalType written in the legacy mode if (DecimalType.is32BitDecimalType(column.dataType())) { @@ -342,9 +355,19 @@ public class VectorizedColumnReader { private void readBinaryBatch(int rowId, int num, ColumnVector column) throws IOException { // This is where we implement support for the valid type conversions. // TODO: implement remaining type conversions + VectorizedValuesReader data = (VectorizedValuesReader) dataColumn; if (column.isArray()) { - defColumn.readBinarys( - num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + defColumn.readBinarys(num, column, rowId, maxDefLevel, data); + } else if (column.dataType() == DataTypes.TimestampType) { + for (int i = 0; i < num; i++) { + if (defColumn.readInteger() == maxDefLevel) { + column.putLong(rowId + i, + // Read 12 bytes for INT96 + CatalystRowConverter.binaryToSQLTimestamp(data.readBinary(12))); + } else { + column.putNull(rowId + i); + } + } } else { throw new NotImplementedException("Unimplemented type: " + column.dataType()); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index 9ac251391b..ab09208d5a 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -252,26 +252,13 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa /** * Check that the requested schema is supported. */ - OriginalType[] originalTypes = new OriginalType[requestedSchema.getFieldCount()]; missingColumns = new boolean[requestedSchema.getFieldCount()]; for (int i = 0; i < requestedSchema.getFieldCount(); ++i) { Type t = requestedSchema.getFields().get(i); if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) { throw new IOException("Complex types not supported."); } - PrimitiveType primitiveType = t.asPrimitiveType(); - originalTypes[i] = t.getOriginalType(); - - // TODO: Be extremely cautious in what is supported. Expand this. - if (originalTypes[i] != null && originalTypes[i] != OriginalType.DECIMAL && - originalTypes[i] != OriginalType.UTF8 && originalTypes[i] != OriginalType.DATE && - originalTypes[i] != OriginalType.INT_8 && originalTypes[i] != OriginalType.INT_16) { - throw new IOException("Unsupported type: " + t); - } - if (primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) { - throw new IOException("Int96 not supported."); - } String[] colPath = requestedSchema.getPaths().get(i); if (fileSchema.containsPath(colPath)) { ColumnDescriptor fd = fileSchema.getColumnDescription(colPath); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 689e6a2a6d..b190141135 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -407,7 +407,7 @@ public final class OffHeapColumnVector extends ColumnVector { type instanceof DateType || DecimalType.is32BitDecimalType(type)) { this.data = Platform.reallocateMemory(data, elementsAppended * 4, newCapacity * 4); } else if (type instanceof LongType || type instanceof DoubleType || - DecimalType.is64BitDecimalType(type)) { + DecimalType.is64BitDecimalType(type) || type instanceof TimestampType) { this.data = Platform.reallocateMemory(data, elementsAppended * 8, newCapacity * 8); } else if (resultStruct != null) { // Nothing to store. diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index f332e87016..b1429fe7cb 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -403,7 +403,8 @@ public final class OnHeapColumnVector extends ColumnVector { int[] newData = new int[newCapacity]; if (intData != null) System.arraycopy(intData, 0, newData, 0, elementsAppended); intData = newData; - } else if (type instanceof LongType || DecimalType.is64BitDecimalType(type)) { + } else if (type instanceof LongType || type instanceof TimestampType || + DecimalType.is64BitDecimalType(type)) { long[] newData = new long[newCapacity]; if (longData != null) System.arraycopy(longData, 0, newData, 0, elementsAppended); longData = newData; diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala index de6dd0fe3e..6bf82bee67 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala @@ -33,6 +33,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} +import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -659,4 +660,13 @@ private[parquet] object CatalystRowConverter { unscaled = (unscaled << (64 - bits)) >> (64 - bits) unscaled } + + def binaryToSQLTimestamp(binary: Binary): SQLTimestamp = { + assert(binary.length() == 12, s"Timestamps (with nanoseconds) are expected to be stored in" + + s" 12-byte long binaries. Found a ${binary.length()}-byte binary instead.") + val buffer = binary.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN) + val timeOfDayNanos = buffer.getLong + val julianDay = buffer.getInt + DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos) + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index 7e5506ee4a..e842caf5be 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -116,44 +116,56 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes new MyDenseVectorUDT() ).filter(supportsDataType) - for (dataType <- supportedDataTypes) { - test(s"test all data types - $dataType") { - withTempPath { file => - val path = file.getCanonicalPath - - val dataGenerator = RandomDataGenerator.forType( - dataType = dataType, - nullable = true, - new Random(System.nanoTime()) - ).getOrElse { - fail(s"Failed to create data generator for schema $dataType") + try { + for (dataType <- supportedDataTypes) { + for (parquetDictionaryEncodingEnabled <- Seq(true, false)) { + test(s"test all data types - $dataType with parquet.enable.dictionary = " + + s"$parquetDictionaryEncodingEnabled") { + + hadoopConfiguration.setBoolean("parquet.enable.dictionary", + parquetDictionaryEncodingEnabled) + + withTempPath { file => + val path = file.getCanonicalPath + + val dataGenerator = RandomDataGenerator.forType( + dataType = dataType, + nullable = true, + new Random(System.nanoTime()) + ).getOrElse { + fail(s"Failed to create data generator for schema $dataType") + } + + // Create a DF for the schema with random data. The index field is used to sort the + // DataFrame. This is a workaround for SPARK-10591. + val schema = new StructType() + .add("index", IntegerType, nullable = false) + .add("col", dataType, nullable = true) + val rdd = + sqlContext.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator()))) + val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1) + + df.write + .mode("overwrite") + .format(dataSourceName) + .option("dataSchema", df.schema.json) + .save(path) + + val loadedDF = sqlContext + .read + .format(dataSourceName) + .option("dataSchema", df.schema.json) + .schema(df.schema) + .load(path) + .orderBy("index") + + checkAnswer(loadedDF, df) + } } - - // Create a DF for the schema with random data. The index field is used to sort the - // DataFrame. This is a workaround for SPARK-10591. - val schema = new StructType() - .add("index", IntegerType, nullable = false) - .add("col", dataType, nullable = true) - val rdd = sqlContext.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator()))) - val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1) - - df.write - .mode("overwrite") - .format(dataSourceName) - .option("dataSchema", df.schema.json) - .save(path) - - val loadedDF = sqlContext - .read - .format(dataSourceName) - .option("dataSchema", df.schema.json) - .schema(df.schema) - .load(path) - .orderBy("index") - - checkAnswer(loadedDF, df) } } + } finally { + hadoopConfiguration.unset("parquet.enable.dictionary") } test("save()/load() - non-partitioned table - Overwrite") { -- cgit v1.2.3