aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-10-08 18:11:18 -0700
committerMichael Armbrust <michael@databricks.com>2014-10-08 18:11:18 -0700
commite7033572330bd48b2438f218b0d2cd3fccdeb362 (patch)
tree899c689996c0f3edcf1d19c83603fe1528be298e /sql
parent4ec931951fea4efbfe5db39cf581704df7d2775b (diff)
downloadspark-e7033572330bd48b2438f218b0d2cd3fccdeb362.tar.gz
spark-e7033572330bd48b2438f218b0d2cd3fccdeb362.tar.bz2
spark-e7033572330bd48b2438f218b0d2cd3fccdeb362.zip
[SPARK-3810][SQL] Makes PreInsertionCasts handle partitions properly
Includes partition keys into account when applying `PreInsertionCasts` rule. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #2672 from liancheng/fix-pre-insert-casts and squashes the following commits: def1a1a [Cheng Lian] Makes PreInsertionCasts handle partitions properly
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala15
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala36
2 files changed, 41 insertions, 10 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index cc0605b0ad..addd5bed84 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -19,31 +19,28 @@ package org.apache.spark.sql.hive
import scala.util.parsing.combinator.RegexParsers
-import org.apache.hadoop.hive.metastore.api.{FieldSchema, StorageDescriptor, SerDeInfo}
-import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition}
+import org.apache.hadoop.hive.metastore.api.{FieldSchema, SerDeInfo, StorageDescriptor, Partition => TPartition, Table => TTable}
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.ql.stats.StatsSetupConst
import org.apache.hadoop.hive.serde2.Deserializer
-import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.Logging
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.analysis.{EliminateAnalysisOperators, Catalog}
+import org.apache.spark.sql.catalyst.analysis.Catalog
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.types._
-import org.apache.spark.sql.columnar.InMemoryRelation
-import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.util.Utils
/* Implicit conversions */
import scala.collection.JavaConversions._
private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
- import HiveMetastoreTypes._
+ import org.apache.spark.sql.hive.HiveMetastoreTypes._
/** Connection to hive metastore. Usages should lock on `this`. */
protected[hive] val client = Hive.get(hive.hiveconf)
@@ -137,10 +134,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
def castChildOutput(p: InsertIntoTable, table: MetastoreRelation, child: LogicalPlan) = {
val childOutputDataTypes = child.output.map(_.dataType)
- // Only check attributes, not partitionKeys since they are always strings.
- // TODO: Fully support inserting into partitioned tables.
val tableOutputDataTypes =
- table.attributes.map(_.dataType) ++ table.partitionKeys.map(_.dataType)
+ (table.attributes ++ table.partitionKeys).take(child.output.length).map(_.dataType)
if (childOutputDataTypes == tableOutputDataTypes) {
p
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 2e282a9ade..2829105f43 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -22,6 +22,7 @@ import scala.util.Try
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
@@ -675,6 +676,41 @@ class HiveQuerySuite extends HiveComparisonTest {
sql("SELECT * FROM boom").queryExecution.analyzed
}
+ test("SPARK-3810: PreInsertionCasts static partitioning support") {
+ val analyzedPlan = {
+ loadTestTable("srcpart")
+ sql("DROP TABLE IF EXISTS withparts")
+ sql("CREATE TABLE withparts LIKE srcpart")
+ sql("INSERT INTO TABLE withparts PARTITION(ds='1', hr='2') SELECT key, value FROM src")
+ .queryExecution.analyzed
+ }
+
+ assertResult(1, "Duplicated project detected\n" + analyzedPlan) {
+ analyzedPlan.collect {
+ case _: Project => ()
+ }.size
+ }
+ }
+
+ test("SPARK-3810: PreInsertionCasts dynamic partitioning support") {
+ val analyzedPlan = {
+ loadTestTable("srcpart")
+ sql("DROP TABLE IF EXISTS withparts")
+ sql("CREATE TABLE withparts LIKE srcpart")
+ sql("SET hive.exec.dynamic.partition.mode=nonstrict")
+
+ sql("CREATE TABLE IF NOT EXISTS withparts LIKE srcpart")
+ sql("INSERT INTO TABLE withparts PARTITION(ds, hr) SELECT key, value FROM src")
+ .queryExecution.analyzed
+ }
+
+ assertResult(1, "Duplicated project detected\n" + analyzedPlan) {
+ analyzedPlan.collect {
+ case _: Project => ()
+ }.size
+ }
+ }
+
test("parse HQL set commands") {
// Adapted from its SQL counterpart.
val testKey = "spark.sql.key.usedfortestonly"