aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java5
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala49
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala21
4 files changed, 78 insertions, 3 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
index 2fa476b9cf..900d7c431e 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
@@ -86,8 +86,9 @@ public class ColumnVectorUtils {
col.getChildColumn(0).putInts(0, capacity, c.months);
col.getChildColumn(1).putLongs(0, capacity, c.microseconds);
} else if (t instanceof DateType) {
- Date date = (Date)row.get(fieldIdx, t);
- col.putInts(0, capacity, DateTimeUtils.fromJavaDate(date));
+ col.putInts(0, capacity, row.getInt(fieldIdx));
+ } else if (t instanceof TimestampType) {
+ col.putLongs(0, capacity, row.getLong(fieldIdx));
}
}
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
index f3afa8f938..62abc2a821 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
@@ -137,6 +137,10 @@ public final class ColumnarBatch {
DataType dt = columns[i].dataType();
if (dt instanceof BooleanType) {
row.setBoolean(i, getBoolean(i));
+ } else if (dt instanceof ByteType) {
+ row.setByte(i, getByte(i));
+ } else if (dt instanceof ShortType) {
+ row.setShort(i, getShort(i));
} else if (dt instanceof IntegerType) {
row.setInt(i, getInt(i));
} else if (dt instanceof LongType) {
@@ -154,6 +158,8 @@ public final class ColumnarBatch {
row.setDecimal(i, getDecimal(i, t.precision(), t.scale()), t.precision());
} else if (dt instanceof DateType) {
row.setInt(i, getInt(i));
+ } else if (dt instanceof TimestampType) {
+ row.setLong(i, getLong(i));
} else {
throw new RuntimeException("Not implemented. " + dt);
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 4aa046bd91..3161a630af 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -38,11 +38,12 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser}
import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
// Write support class for nested groups: ParquetWriter initializes GroupWriteSupport
// with an empty configuration (it is after all not intended to be used in this way?)
@@ -689,6 +690,52 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}
}
+
+ test("VectorizedParquetRecordReader - partition column types") {
+ withTempPath { dir =>
+ Seq(1).toDF().repartition(1).write.parquet(dir.getCanonicalPath)
+
+ val dataTypes =
+ Seq(StringType, BooleanType, ByteType, ShortType, IntegerType, LongType,
+ FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType)
+
+ val constantValues =
+ Seq(
+ UTF8String.fromString("a string"),
+ true,
+ 1.toByte,
+ 2.toShort,
+ 3,
+ Long.MaxValue,
+ 0.25.toFloat,
+ 0.75D,
+ Decimal("1234.23456"),
+ DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")),
+ DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123")))
+
+ dataTypes.zip(constantValues).foreach { case (dt, v) =>
+ val schema = StructType(StructField("pcol", dt) :: Nil)
+ val vectorizedReader = new VectorizedParquetRecordReader
+ val partitionValues = new GenericMutableRow(Array(v))
+ val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
+
+ try {
+ vectorizedReader.initialize(file, null)
+ vectorizedReader.initBatch(schema, partitionValues)
+ vectorizedReader.nextKeyValue()
+ val row = vectorizedReader.getCurrentValue.asInstanceOf[InternalRow]
+
+ // Use `GenericMutableRow` by explicitly copying rather than `ColumnarBatch`
+ // in order to use get(...) method which is not implemented in `ColumnarBatch`.
+ val actual = row.copy().get(1, dt)
+ val expected = v
+ assert(actual == expected)
+ } finally {
+ vectorizedReader.close()
+ }
+ }
+ }
+ }
}
class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 05d0687fb7..dc4d099f0f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1787,6 +1787,27 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
+ test("SPARK-17354: Partitioning by dates/timestamps works with Parquet vectorized reader") {
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
+ sql(
+ """CREATE TABLE order(id INT)
+ |PARTITIONED BY (pd DATE, pt TIMESTAMP)
+ |STORED AS PARQUET
+ """.stripMargin)
+
+ sql("set hive.exec.dynamic.partition.mode=nonstrict")
+ sql(
+ """INSERT INTO TABLE order PARTITION(pd, pt)
+ |SELECT 1 AS id, CAST('1990-02-24' AS DATE) AS pd, CAST('1990-02-24' AS TIMESTAMP) AS pt
+ """.stripMargin)
+ val actual = sql("SELECT * FROM order")
+ val expected = sql(
+ "SELECT 1 AS id, CAST('1990-02-24' AS DATE) AS pd, CAST('1990-02-24' AS TIMESTAMP) AS pt")
+ checkAnswer(actual, expected)
+ sql("DROP TABLE order")
+ }
+ }
+
def testCommandAvailable(command: String): Boolean = {
val attempt = Try(Process(command).run(ProcessLogger(_ => ())).exitValue())
attempt.isSuccess && attempt.get == 0