aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorDaoyuan Wang <daoyuan.wang@intel.com>2015-03-23 11:46:16 +0800
committerCheng Lian <lian@databricks.com>2015-03-23 11:46:16 +0800
commit4659468f369d69e7f777130e5e3b4c5d47a624f1 (patch)
treebddb91224696ae9ebfa36c22bac1923495acbf5f /sql
parent2bf40c58e6e89e061783c999204107069df17f73 (diff)
downloadspark-4659468f369d69e7f777130e5e3b4c5d47a624f1.tar.gz
spark-4659468f369d69e7f777130e5e3b4c5d47a624f1.tar.bz2
spark-4659468f369d69e7f777130e5e3b4c5d47a624f1.zip
[SPARK-4985] [SQL] parquet support for date type
This PR might have some issues with #3732 , and this would have merge conflicts with #3820 so the review can be delayed till that 2 were merged. Author: Daoyuan Wang <daoyuan.wang@intel.com> Closes #3822 from adrian-wang/parquetdate and squashes the following commits: 2c5d54d [Daoyuan Wang] add a test case faef887 [Daoyuan Wang] parquet support for primitive date 97e9080 [Daoyuan Wang] parquet support for date type
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala15
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala3
5 files changed, 35 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index f898e4b37a..43ca359b51 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -127,6 +127,12 @@ private[sql] object CatalystConverter {
parent.updateByte(fieldIndex, value.asInstanceOf[ByteType.JvmType])
}
}
+ case DateType => {
+ new CatalystPrimitiveConverter(parent, fieldIndex) {
+ override def addInt(value: Int): Unit =
+ parent.updateDate(fieldIndex, value.asInstanceOf[DateType.JvmType])
+ }
+ }
case d: DecimalType => {
new CatalystPrimitiveConverter(parent, fieldIndex) {
override def addBinary(value: Binary): Unit =
@@ -192,6 +198,9 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit =
updateField(fieldIndex, value)
+ protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit =
+ updateField(fieldIndex, value)
+
protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit =
updateField(fieldIndex, value)
@@ -388,6 +397,9 @@ private[parquet] class CatalystPrimitiveRowConverter(
override protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit =
current.setInt(fieldIndex, value)
+ override protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit =
+ current.update(fieldIndex, value)
+
override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit =
current.setLong(fieldIndex, value)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index 19bfba34b8..5a1b15490d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -212,6 +212,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
case DoubleType => writer.addDouble(value.asInstanceOf[Double])
case FloatType => writer.addFloat(value.asInstanceOf[Float])
case BooleanType => writer.addBoolean(value.asInstanceOf[Boolean])
+ case DateType => writer.addInteger(value.asInstanceOf[Int])
case d: DecimalType =>
if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) {
sys.error(s"Unsupported datatype $d, cannot write to consumer")
@@ -358,6 +359,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
case DoubleType => writer.addDouble(record.getDouble(index))
case FloatType => writer.addFloat(record.getFloat(index))
case BooleanType => writer.addBoolean(record.getBoolean(index))
+ case DateType => writer.addInteger(record.getInt(index))
case TimestampType => writeTimestamp(record(index).asInstanceOf[java.sql.Timestamp])
case d: DecimalType =>
if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index 5209581fa8..da668f0686 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -64,6 +64,8 @@ private[parquet] object ParquetTypesConverter extends Logging {
case ParquetPrimitiveTypeName.BOOLEAN => BooleanType
case ParquetPrimitiveTypeName.DOUBLE => DoubleType
case ParquetPrimitiveTypeName.FLOAT => FloatType
+ case ParquetPrimitiveTypeName.INT32
+ if originalType == ParquetOriginalType.DATE => DateType
case ParquetPrimitiveTypeName.INT32 => IntegerType
case ParquetPrimitiveTypeName.INT64 => LongType
case ParquetPrimitiveTypeName.INT96 if int96AsTimestamp => TimestampType
@@ -222,6 +224,8 @@ private[parquet] object ParquetTypesConverter extends Logging {
// There is no type for Byte or Short so we promote them to INT32.
case ShortType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32))
case ByteType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32))
+ case DateType => Some(ParquetTypeInfo(
+ ParquetPrimitiveTypeName.INT32, Some(ParquetOriginalType.DATE)))
case LongType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT64))
case TimestampType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT96))
case DecimalType.Fixed(precision, scale) if precision <= 18 =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index 5438095add..203bc79f15 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -135,6 +135,21 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
}
}
+ test("date type") {
+ def makeDateRDD(): DataFrame =
+ sparkContext
+ .parallelize(0 to 1000)
+ .map(i => Tuple1(DateUtils.toJavaDate(i)))
+ .toDF()
+ .select($"_1")
+
+ withTempPath { dir =>
+ val data = makeDateRDD()
+ data.saveAsParquetFile(dir.getCanonicalPath)
+ checkAnswer(parquetFile(dir.getCanonicalPath), data.collect().toSeq)
+ }
+ }
+
test("map") {
val data = (1 to 4).map(i => Tuple1(Map(i -> s"val_$i")))
checkParquetFile(data)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
index ad880e2bc3..321832cd43 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
@@ -57,7 +57,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
|}
""".stripMargin)
- testSchema[(Byte, Short, Int, Long)](
+ testSchema[(Byte, Short, Int, Long, java.sql.Date)](
"logical integral types",
"""
|message root {
@@ -65,6 +65,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
| required int32 _2 (INT_16);
| required int32 _3 (INT_32);
| required int64 _4 (INT_64);
+ | optional int32 _5 (DATE);
|}
""".stripMargin)