aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorDaoyuan Wang <daoyuan.wang@intel.com>2015-05-27 12:42:13 -0700
committerYin Huai <yhuai@databricks.com>2015-05-27 12:42:13 -0700
commit8161562eabc1eff430cfd9d8eaf413a8c4ef2cfb (patch)
treec7df0a8515aa099d50de659b0918a19ebbe0bcff /sql
parent6fec1a9409b34d8ce58ea1c330b52cc7ef3e7e7e (diff)
downloadspark-8161562eabc1eff430cfd9d8eaf413a8c4ef2cfb.tar.gz
spark-8161562eabc1eff430cfd9d8eaf413a8c4ef2cfb.tar.bz2
spark-8161562eabc1eff430cfd9d8eaf413a8c4ef2cfb.zip
[SPARK-7790] [SQL] date and decimal conversion for dynamic partition key
Author: Daoyuan Wang <daoyuan.wang@intel.com> Closes #6318 from adrian-wang/dynpart and squashes the following commits: ad73b61 [Daoyuan Wang] not use sqlTestUtils for try catch because dont have sqlcontext here 6c33b51 [Daoyuan Wang] fix according to liancheng f0f8074 [Daoyuan Wang] some specific types as dynamic partition
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala17
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala33
3 files changed, 48 insertions, 4 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index c0b0b104e9..7a6ca48b54 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -106,7 +106,7 @@ case class InsertIntoHiveTable(
}
writerContainer
- .getLocalFileWriter(row)
+ .getLocalFileWriter(row, table.schema)
.write(serializer.serialize(outputData, standardOI))
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index cbc381cc81..50b209f7cc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -34,8 +34,10 @@ import org.apache.hadoop.hive.common.FileUtils
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.sql.Row
import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
+import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.hive.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.hive.HiveShim._
+import org.apache.spark.sql.types._
/**
* Internal helper class that saves an RDD using a Hive OutputFormat.
@@ -92,7 +94,7 @@ private[hive] class SparkHiveWriterContainer(
"part-" + numberFormat.format(splitID) + extension
}
- def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = writer
+ def getLocalFileWriter(row: Row, schema: StructType): FileSinkOperator.RecordWriter = writer
def close() {
// Seems the boolean value passed into close does not matter.
@@ -195,11 +197,20 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker)
}
- override def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = {
+ override def getLocalFileWriter(row: Row, schema: StructType): FileSinkOperator.RecordWriter = {
+ def convertToHiveRawString(col: String, value: Any): String = {
+ val raw = String.valueOf(value)
+ schema(col).dataType match {
+ case DateType => DateUtils.toString(raw.toInt)
+ case _: DecimalType => BigDecimal(raw).toString()
+ case _ => raw
+ }
+ }
+
val dynamicPartPath = dynamicPartColNames
.zip(row.toSeq.takeRight(dynamicPartColNames.length))
.map { case (col, rawVal) =>
- val string = if (rawVal == null) null else String.valueOf(rawVal)
+ val string = if (rawVal == null) null else convertToHiveRawString(col, rawVal)
val colString =
if (string == null || string.isEmpty) {
defaultPartName
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index b707f5e684..538e66125c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -837,4 +837,37 @@ class SQLQuerySuite extends QueryTest {
java.lang.Math.exp(1.0).toString,
java.lang.Math.floor(1.9).toString))
}
+
+ test("dynamic partition value test") {
+ try {
+ sql("set hive.exec.dynamic.partition.mode=nonstrict")
+ // date
+ sql("drop table if exists dynparttest1")
+ sql("create table dynparttest1 (value int) partitioned by (pdate date)")
+ sql(
+ """
+ |insert into table dynparttest1 partition(pdate)
+ | select count(*), cast('2015-05-21' as date) as pdate from src
+ """.stripMargin)
+ checkAnswer(
+ sql("select * from dynparttest1"),
+ Seq(Row(500, java.sql.Date.valueOf("2015-05-21"))))
+
+ // decimal
+ sql("drop table if exists dynparttest2")
+ sql("create table dynparttest2 (value int) partitioned by (pdec decimal(5, 1))")
+ sql(
+ """
+ |insert into table dynparttest2 partition(pdec)
+ | select count(*), cast('100.12' as decimal(5, 1)) as pdec from src
+ """.stripMargin)
+ checkAnswer(
+ sql("select * from dynparttest2"),
+ Seq(Row(500, new java.math.BigDecimal("100.1"))))
+ } finally {
+ sql("drop table if exists dynparttest1")
+ sql("drop table if exists dynparttest2")
+ sql("set hive.exec.dynamic.partition.mode=strict")
+ }
+ }
}