aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@gmail.com>2015-05-22 15:39:58 -0700
committerMichael Armbrust <michael@databricks.com>2015-05-22 15:39:58 -0700
commit126d7235de649ea5619dee6ad3a70970ee90df93 (patch)
treeba4bdca7d4c41f4872858748eca748f3942620c2 /sql/hive
parente4aef91fe70d6c9765d530b913a9d79103fc27ce (diff)
downloadspark-126d7235de649ea5619dee6ad3a70970ee90df93.tar.gz
spark-126d7235de649ea5619dee6ad3a70970ee90df93.tar.bz2
spark-126d7235de649ea5619dee6ad3a70970ee90df93.zip
[SPARK-7270] [SQL] Consider dynamic partition when inserting into hive table
JIRA: https://issues.apache.org/jira/browse/SPARK-7270 Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #5864 from viirya/dyn_partition_insert and squashes the following commits: b5627df [Liang-Chi Hsieh] For comments. 3b21e4b [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into dyn_partition_insert 8a4352d [Liang-Chi Hsieh] Consider dynamic partition when inserting into hive table.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala18
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala20
2 files changed, 33 insertions, 5 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 5b6840008f..425a4005aa 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -516,17 +516,19 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
def castChildOutput(p: InsertIntoTable, table: MetastoreRelation, child: LogicalPlan)
: LogicalPlan = {
val childOutputDataTypes = child.output.map(_.dataType)
+ val numDynamicPartitions = p.partition.values.count(_.isEmpty)
val tableOutputDataTypes =
- (table.attributes ++ table.partitionKeys).take(child.output.length).map(_.dataType)
+ (table.attributes ++ table.partitionKeys.takeRight(numDynamicPartitions))
+ .take(child.output.length).map(_.dataType)
if (childOutputDataTypes == tableOutputDataTypes) {
- p
+ InsertIntoHiveTable(table, p.partition, p.child, p.overwrite, p.ifNotExists)
} else if (childOutputDataTypes.size == tableOutputDataTypes.size &&
childOutputDataTypes.zip(tableOutputDataTypes)
.forall { case (left, right) => left.sameType(right) }) {
// If both types ignoring nullability of ArrayType, MapType, StructType are the same,
// use InsertIntoHiveTable instead of InsertIntoTable.
- InsertIntoHiveTable(p.table, p.partition, p.child, p.overwrite, p.ifNotExists)
+ InsertIntoHiveTable(table, p.partition, p.child, p.overwrite, p.ifNotExists)
} else {
// Only do the casting when child output data types differ from table output data types.
val castedChildOutput = child.output.zip(table.output).map {
@@ -561,7 +563,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
* because Hive table doesn't have nullability for ARRAY, MAP, STRUCT types.
*/
private[hive] case class InsertIntoHiveTable(
- table: LogicalPlan,
+ table: MetastoreRelation,
partition: Map[String, Option[String]],
child: LogicalPlan,
overwrite: Boolean,
@@ -571,7 +573,13 @@ private[hive] case class InsertIntoHiveTable(
override def children: Seq[LogicalPlan] = child :: Nil
override def output: Seq[Attribute] = child.output
- override lazy val resolved: Boolean = childrenResolved && child.output.zip(table.output).forall {
+ val numDynamicPartitions = partition.values.count(_.isEmpty)
+
+ // This is the expected schema of the table prepared to be inserted into,
+ // including dynamic partition columns.
+ val tableOutput = table.attributes ++ table.partitionKeys.takeRight(numDynamicPartitions)
+
+ override lazy val resolved: Boolean = childrenResolved && child.output.zip(tableOutput).forall {
case (childAttr, tableAttr) => childAttr.dataType.sameType(tableAttr.dataType)
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 65c6ef03bf..4af31d482c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.spark.{SparkFiles, SparkException}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
+import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.test.TestHive
@@ -415,6 +416,25 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
|SELECT * FROM createdtable;
""".stripMargin)
+ test("SPARK-7270: consider dynamic partition when comparing table output") {
+ sql(s"CREATE TABLE test_partition (a STRING) PARTITIONED BY (b BIGINT, c STRING)")
+ sql(s"CREATE TABLE ptest (a STRING, b BIGINT, c STRING)")
+
+ val analyzedPlan = sql(
+ """
+ |INSERT OVERWRITE table test_partition PARTITION (b=1, c)
+ |SELECT 'a', 'c' from ptest
+ """.stripMargin).queryExecution.analyzed
+
+ assertResult(false, "Incorrect cast detected\n" + analyzedPlan) {
+ var hasCast = false
+ analyzedPlan.collect {
+ case p: Project => p.transformExpressionsUp { case c: Cast => hasCast = true; c }
+ }
+ hasCast
+ }
+ }
+
createQueryTest("transform",
"SELECT TRANSFORM (key) USING 'cat' AS (tKey) FROM src")