From 8161562eabc1eff430cfd9d8eaf413a8c4ef2cfb Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Wed, 27 May 2015 12:42:13 -0700 Subject: [SPARK-7790] [SQL] date and decimal conversion for dynamic partition key Author: Daoyuan Wang Closes #6318 from adrian-wang/dynpart and squashes the following commits: ad73b61 [Daoyuan Wang] not use sqlTestUtils for try catch because dont have sqlcontext here 6c33b51 [Daoyuan Wang] fix according to liancheng f0f8074 [Daoyuan Wang] some specific types as dynamic partition --- .../sql/hive/execution/InsertIntoHiveTable.scala | 2 +- .../spark/sql/hive/hiveWriterContainers.scala | 17 +++++++++-- .../spark/sql/hive/execution/SQLQuerySuite.scala | 33 ++++++++++++++++++++++ 3 files changed, 48 insertions(+), 4 deletions(-) (limited to 'sql') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index c0b0b104e9..7a6ca48b54 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -106,7 +106,7 @@ case class InsertIntoHiveTable( } writerContainer - .getLocalFileWriter(row) + .getLocalFileWriter(row, table.schema) .write(serializer.serialize(outputData, standardOI)) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index cbc381cc81..50b209f7cc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -34,8 +34,10 @@ import org.apache.hadoop.hive.common.FileUtils import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.sql.Row import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter} +import org.apache.spark.sql.catalyst.util.DateUtils import org.apache.spark.sql.hive.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive.HiveShim._ +import org.apache.spark.sql.types._ /** * Internal helper class that saves an RDD using a Hive OutputFormat. @@ -92,7 +94,7 @@ private[hive] class SparkHiveWriterContainer( "part-" + numberFormat.format(splitID) + extension } - def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = writer + def getLocalFileWriter(row: Row, schema: StructType): FileSinkOperator.RecordWriter = writer def close() { // Seems the boolean value passed into close does not matter. @@ -195,11 +197,20 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker) } - override def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = { + override def getLocalFileWriter(row: Row, schema: StructType): FileSinkOperator.RecordWriter = { + def convertToHiveRawString(col: String, value: Any): String = { + val raw = String.valueOf(value) + schema(col).dataType match { + case DateType => DateUtils.toString(raw.toInt) + case _: DecimalType => BigDecimal(raw).toString() + case _ => raw + } + } + val dynamicPartPath = dynamicPartColNames .zip(row.toSeq.takeRight(dynamicPartColNames.length)) .map { case (col, rawVal) => - val string = if (rawVal == null) null else String.valueOf(rawVal) + val string = if (rawVal == null) null else convertToHiveRawString(col, rawVal) val colString = if (string == null || string.isEmpty) { defaultPartName diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index b707f5e684..538e66125c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -837,4 +837,37 @@ class SQLQuerySuite extends QueryTest { java.lang.Math.exp(1.0).toString, java.lang.Math.floor(1.9).toString)) } + + test("dynamic partition value test") { + try { + sql("set hive.exec.dynamic.partition.mode=nonstrict") + // date + sql("drop table if exists dynparttest1") + sql("create table dynparttest1 (value int) partitioned by (pdate date)") + sql( + """ + |insert into table dynparttest1 partition(pdate) + | select count(*), cast('2015-05-21' as date) as pdate from src + """.stripMargin) + checkAnswer( + sql("select * from dynparttest1"), + Seq(Row(500, java.sql.Date.valueOf("2015-05-21")))) + + // decimal + sql("drop table if exists dynparttest2") + sql("create table dynparttest2 (value int) partitioned by (pdec decimal(5, 1))") + sql( + """ + |insert into table dynparttest2 partition(pdec) + | select count(*), cast('100.12' as decimal(5, 1)) as pdec from src + """.stripMargin) + checkAnswer( + sql("select * from dynparttest2"), + Seq(Row(500, new java.math.BigDecimal("100.1")))) + } finally { + sql("drop table if exists dynparttest1") + sql("drop table if exists dynparttest2") + sql("set hive.exec.dynamic.partition.mode=strict") + } + } } -- cgit v1.2.3