From 126d7235de649ea5619dee6ad3a70970ee90df93 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 22 May 2015 15:39:58 -0700 Subject: [SPARK-7270] [SQL] Consider dynamic partition when inserting into hive table JIRA: https://issues.apache.org/jira/browse/SPARK-7270 Author: Liang-Chi Hsieh Closes #5864 from viirya/dyn_partition_insert and squashes the following commits: b5627df [Liang-Chi Hsieh] For comments. 3b21e4b [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into dyn_partition_insert 8a4352d [Liang-Chi Hsieh] Consider dynamic partition when inserting into hive table. --- .../apache/spark/sql/hive/HiveMetastoreCatalog.scala | 18 +++++++++++++----- .../spark/sql/hive/execution/HiveQuerySuite.scala | 20 ++++++++++++++++++++ 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 5b6840008f..425a4005aa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -516,17 +516,19 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive def castChildOutput(p: InsertIntoTable, table: MetastoreRelation, child: LogicalPlan) : LogicalPlan = { val childOutputDataTypes = child.output.map(_.dataType) + val numDynamicPartitions = p.partition.values.count(_.isEmpty) val tableOutputDataTypes = - (table.attributes ++ table.partitionKeys).take(child.output.length).map(_.dataType) + (table.attributes ++ table.partitionKeys.takeRight(numDynamicPartitions)) + .take(child.output.length).map(_.dataType) if (childOutputDataTypes == tableOutputDataTypes) { - p + InsertIntoHiveTable(table, p.partition, p.child, p.overwrite, p.ifNotExists) } else if (childOutputDataTypes.size == tableOutputDataTypes.size && childOutputDataTypes.zip(tableOutputDataTypes) .forall { case (left, right) => left.sameType(right) }) { // If both types ignoring nullability of ArrayType, MapType, StructType are the same, // use InsertIntoHiveTable instead of InsertIntoTable. - InsertIntoHiveTable(p.table, p.partition, p.child, p.overwrite, p.ifNotExists) + InsertIntoHiveTable(table, p.partition, p.child, p.overwrite, p.ifNotExists) } else { // Only do the casting when child output data types differ from table output data types. val castedChildOutput = child.output.zip(table.output).map { @@ -561,7 +563,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive * because Hive table doesn't have nullability for ARRAY, MAP, STRUCT types. */ private[hive] case class InsertIntoHiveTable( - table: LogicalPlan, + table: MetastoreRelation, partition: Map[String, Option[String]], child: LogicalPlan, overwrite: Boolean, @@ -571,7 +573,13 @@ private[hive] case class InsertIntoHiveTable( override def children: Seq[LogicalPlan] = child :: Nil override def output: Seq[Attribute] = child.output - override lazy val resolved: Boolean = childrenResolved && child.output.zip(table.output).forall { + val numDynamicPartitions = partition.values.count(_.isEmpty) + + // This is the expected schema of the table prepared to be inserted into, + // including dynamic partition columns. + val tableOutput = table.attributes ++ table.partitionKeys.takeRight(numDynamicPartitions) + + override lazy val resolved: Boolean = childrenResolved && child.output.zip(tableOutput).forall { case (childAttr, tableAttr) => childAttr.dataType.sameType(tableAttr.dataType) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 65c6ef03bf..4af31d482c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.spark.{SparkFiles, SparkException} import org.apache.spark.sql.{AnalysisException, DataFrame, Row} +import org.apache.spark.sql.catalyst.expressions.Cast import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.test.TestHive @@ -415,6 +416,25 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { |SELECT * FROM createdtable; """.stripMargin) + test("SPARK-7270: consider dynamic partition when comparing table output") { + sql(s"CREATE TABLE test_partition (a STRING) PARTITIONED BY (b BIGINT, c STRING)") + sql(s"CREATE TABLE ptest (a STRING, b BIGINT, c STRING)") + + val analyzedPlan = sql( + """ + |INSERT OVERWRITE table test_partition PARTITION (b=1, c) + |SELECT 'a', 'c' from ptest + """.stripMargin).queryExecution.analyzed + + assertResult(false, "Incorrect cast detected\n" + analyzedPlan) { + var hasCast = false + analyzedPlan.collect { + case p: Project => p.transformExpressionsUp { case c: Cast => hasCast = true; c } + } + hasCast + } + } + createQueryTest("transform", "SELECT TRANSFORM (key) USING 'cat' AS (tKey) FROM src") -- cgit v1.2.3