aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala24
2 files changed, 28 insertions, 1 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index d823275d85..04752ec5fe 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -60,6 +60,7 @@ import org.apache.parquet.hadoop.util.ConfigurationUtil;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.StructType$;
/**
* Base class for custom RecordReaders for Parquet that directly materialize to `T`.
@@ -136,7 +137,9 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema));
this.requestedSchema = readContext.getRequestedSchema();
- this.sparkSchema = new ParquetSchemaConverter(configuration).convert(requestedSchema);
+ String sparkRequestedSchemaString =
+ configuration.get(ParquetReadSupport$.MODULE$.SPARK_ROW_REQUESTED_SCHEMA());
+ this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString);
this.reader = new ParquetFileReader(configuration, file, blocks, requestedSchema.getColumns());
for (BlockMetaData block : blocks) {
this.totalRowCount += block.getRowCount();
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 02b94452a1..7e83bcbb6e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -680,6 +680,30 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
)
}
}
+
+ test("SPARK-16632: read Parquet int32 as ByteType and ShortType") {
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+
+ // When being written to Parquet, `TINYINT` and `SMALLINT` should be converted into
+ // `int32 (INT_8)` and `int32 (INT_16)` respectively. However, Hive doesn't add the `INT_8`
+ // and `INT_16` annotation properly (HIVE-14294). Thus, when reading files written by Hive
+ // using Spark with the vectorized Parquet reader enabled, we may hit error due to type
+ // mismatch.
+ //
+ // Here we are simulating Hive's behavior by writing a single `INT` field and then read it
+ // back as `TINYINT` and `SMALLINT` in Spark to verify this issue.
+ Seq(1).toDF("f").write.parquet(path)
+
+ val withByteField = new StructType().add("f", ByteType)
+ checkAnswer(spark.read.schema(withByteField).parquet(path), Row(1: Byte))
+
+ val withShortField = new StructType().add("f", ShortType)
+ checkAnswer(spark.read.schema(withShortField).parquet(path), Row(1: Short))
+ }
+ }
+ }
}
object TestingUDT {