aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSameer Agarwal <sameer@databricks.com>2016-03-23 12:13:32 -0700
committerYin Huai <yhuai@databricks.com>2016-03-23 12:13:32 -0700
commit0a64294fcb4b64bfe095c63c3a494e0f40e22743 (patch)
tree7ea8206553f976dae3154d79a98677b56d442cf9
parent02d9c352c72a16725322678ef174c5c6e9f2c617 (diff)
downloadspark-0a64294fcb4b64bfe095c63c3a494e0f40e22743.tar.gz
spark-0a64294fcb4b64bfe095c63c3a494e0f40e22743.tar.bz2
spark-0a64294fcb4b64bfe095c63c3a494e0f40e22743.zip
[SPARK-14015][SQL] Support TimestampType in vectorized parquet reader
## What changes were proposed in this pull request? This PR adds support for TimestampType in the vectorized parquet reader ## How was this patch tested? 1. `VectorizedColumnReader` initially had a gating condition on `primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96)` that made us fall back on parquet-mr for handling timestamps. This condition is now removed. 2. The `ParquetHadoopFsRelationSuite` (that tests for all supported hive types -- including `TimestampType`) fails when the gating condition is removed (https://github.com/apache/spark/pull/11808) and should now pass with this change. Similarly, the `ParquetHiveCompatibilitySuite.SPARK-10177 timestamp` test that fails when the gating condition is removed, should now pass as well. 3. Added tests in `HadoopFsRelationTest` that test both the dictionary encoded and non-encoded versions across all supported datatypes. Author: Sameer Agarwal <sameer@databricks.com> Closes #11882 from sameeragarwal/timestamp-parquet.
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java29
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java13
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java2
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala10
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala82
6 files changed, 86 insertions, 53 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 2c23ccc357..6cc2fda587 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -213,6 +213,9 @@ public class VectorizedColumnReader {
case INT64:
readLongBatch(rowId, num, column);
break;
+ case INT96:
+ readBinaryBatch(rowId, num, column);
+ break;
case FLOAT:
readFloatBatch(rowId, num, column);
break;
@@ -249,7 +252,17 @@ public class VectorizedColumnReader {
case BINARY:
column.setDictionary(dictionary);
break;
-
+ case INT96:
+ if (column.dataType() == DataTypes.TimestampType) {
+ for (int i = rowId; i < rowId + num; ++i) {
+ // TODO: Convert dictionary of Binaries to dictionary of Longs
+ Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
+ column.putLong(i, CatalystRowConverter.binaryToSQLTimestamp(v));
+ }
+ } else {
+ throw new NotImplementedException();
+ }
+ break;
case FIXED_LEN_BYTE_ARRAY:
// DecimalType written in the legacy mode
if (DecimalType.is32BitDecimalType(column.dataType())) {
@@ -342,9 +355,19 @@ public class VectorizedColumnReader {
private void readBinaryBatch(int rowId, int num, ColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: implement remaining type conversions
+ VectorizedValuesReader data = (VectorizedValuesReader) dataColumn;
if (column.isArray()) {
- defColumn.readBinarys(
- num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+ defColumn.readBinarys(num, column, rowId, maxDefLevel, data);
+ } else if (column.dataType() == DataTypes.TimestampType) {
+ for (int i = 0; i < num; i++) {
+ if (defColumn.readInteger() == maxDefLevel) {
+ column.putLong(rowId + i,
+ // Read 12 bytes for INT96
+ CatalystRowConverter.binaryToSQLTimestamp(data.readBinary(12)));
+ } else {
+ column.putNull(rowId + i);
+ }
+ }
} else {
throw new NotImplementedException("Unimplemented type: " + column.dataType());
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index 9ac251391b..ab09208d5a 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -252,26 +252,13 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
/**
* Check that the requested schema is supported.
*/
- OriginalType[] originalTypes = new OriginalType[requestedSchema.getFieldCount()];
missingColumns = new boolean[requestedSchema.getFieldCount()];
for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
Type t = requestedSchema.getFields().get(i);
if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) {
throw new IOException("Complex types not supported.");
}
- PrimitiveType primitiveType = t.asPrimitiveType();
- originalTypes[i] = t.getOriginalType();
-
- // TODO: Be extremely cautious in what is supported. Expand this.
- if (originalTypes[i] != null && originalTypes[i] != OriginalType.DECIMAL &&
- originalTypes[i] != OriginalType.UTF8 && originalTypes[i] != OriginalType.DATE &&
- originalTypes[i] != OriginalType.INT_8 && originalTypes[i] != OriginalType.INT_16) {
- throw new IOException("Unsupported type: " + t);
- }
- if (primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) {
- throw new IOException("Int96 not supported.");
- }
String[] colPath = requestedSchema.getPaths().get(i);
if (fileSchema.containsPath(colPath)) {
ColumnDescriptor fd = fileSchema.getColumnDescription(colPath);
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 689e6a2a6d..b190141135 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -407,7 +407,7 @@ public final class OffHeapColumnVector extends ColumnVector {
type instanceof DateType || DecimalType.is32BitDecimalType(type)) {
this.data = Platform.reallocateMemory(data, elementsAppended * 4, newCapacity * 4);
} else if (type instanceof LongType || type instanceof DoubleType ||
- DecimalType.is64BitDecimalType(type)) {
+ DecimalType.is64BitDecimalType(type) || type instanceof TimestampType) {
this.data = Platform.reallocateMemory(data, elementsAppended * 8, newCapacity * 8);
} else if (resultStruct != null) {
// Nothing to store.
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index f332e87016..b1429fe7cb 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -403,7 +403,8 @@ public final class OnHeapColumnVector extends ColumnVector {
int[] newData = new int[newCapacity];
if (intData != null) System.arraycopy(intData, 0, newData, 0, elementsAppended);
intData = newData;
- } else if (type instanceof LongType || DecimalType.is64BitDecimalType(type)) {
+ } else if (type instanceof LongType || type instanceof TimestampType ||
+ DecimalType.is64BitDecimalType(type)) {
long[] newData = new long[newCapacity];
if (longData != null) System.arraycopy(longData, 0, newData, 0, elementsAppended);
longData = newData;
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index de6dd0fe3e..6bf82bee67 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -33,6 +33,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -659,4 +660,13 @@ private[parquet] object CatalystRowConverter {
unscaled = (unscaled << (64 - bits)) >> (64 - bits)
unscaled
}
+
+ def binaryToSQLTimestamp(binary: Binary): SQLTimestamp = {
+ assert(binary.length() == 12, s"Timestamps (with nanoseconds) are expected to be stored in" +
+ s" 12-byte long binaries. Found a ${binary.length()}-byte binary instead.")
+ val buffer = binary.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
+ val timeOfDayNanos = buffer.getLong
+ val julianDay = buffer.getInt
+ DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 7e5506ee4a..e842caf5be 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -116,44 +116,56 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
new MyDenseVectorUDT()
).filter(supportsDataType)
- for (dataType <- supportedDataTypes) {
- test(s"test all data types - $dataType") {
- withTempPath { file =>
- val path = file.getCanonicalPath
-
- val dataGenerator = RandomDataGenerator.forType(
- dataType = dataType,
- nullable = true,
- new Random(System.nanoTime())
- ).getOrElse {
- fail(s"Failed to create data generator for schema $dataType")
+ try {
+ for (dataType <- supportedDataTypes) {
+ for (parquetDictionaryEncodingEnabled <- Seq(true, false)) {
+ test(s"test all data types - $dataType with parquet.enable.dictionary = " +
+ s"$parquetDictionaryEncodingEnabled") {
+
+ hadoopConfiguration.setBoolean("parquet.enable.dictionary",
+ parquetDictionaryEncodingEnabled)
+
+ withTempPath { file =>
+ val path = file.getCanonicalPath
+
+ val dataGenerator = RandomDataGenerator.forType(
+ dataType = dataType,
+ nullable = true,
+ new Random(System.nanoTime())
+ ).getOrElse {
+ fail(s"Failed to create data generator for schema $dataType")
+ }
+
+ // Create a DF for the schema with random data. The index field is used to sort the
+ // DataFrame. This is a workaround for SPARK-10591.
+ val schema = new StructType()
+ .add("index", IntegerType, nullable = false)
+ .add("col", dataType, nullable = true)
+ val rdd =
+ sqlContext.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
+ val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
+
+ df.write
+ .mode("overwrite")
+ .format(dataSourceName)
+ .option("dataSchema", df.schema.json)
+ .save(path)
+
+ val loadedDF = sqlContext
+ .read
+ .format(dataSourceName)
+ .option("dataSchema", df.schema.json)
+ .schema(df.schema)
+ .load(path)
+ .orderBy("index")
+
+ checkAnswer(loadedDF, df)
+ }
}
-
- // Create a DF for the schema with random data. The index field is used to sort the
- // DataFrame. This is a workaround for SPARK-10591.
- val schema = new StructType()
- .add("index", IntegerType, nullable = false)
- .add("col", dataType, nullable = true)
- val rdd = sqlContext.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
- val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
-
- df.write
- .mode("overwrite")
- .format(dataSourceName)
- .option("dataSchema", df.schema.json)
- .save(path)
-
- val loadedDF = sqlContext
- .read
- .format(dataSourceName)
- .option("dataSchema", df.schema.json)
- .schema(df.schema)
- .load(path)
- .orderBy("index")
-
- checkAnswer(loadedDF, df)
}
}
+ } finally {
+ hadoopConfiguration.unset("parquet.enable.dictionary")
}
test("save()/load() - non-partitioned table - Overwrite") {