diff options
Diffstat (limited to 'sql/hive/src/main')
5 files changed, 59 insertions, 25 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 3ed5c5b031..f1c0bd92aa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -527,7 +527,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with // Collects all `MetastoreRelation`s which should be replaced val toBeReplaced = plan.collect { // Write path - case InsertIntoTable(relation: MetastoreRelation, _, _, _) + case InsertIntoTable(relation: MetastoreRelation, _, _, _, _) // Inserting into partitioned table is not supported in Parquet data source (yet). if !relation.hiveQlTable.isPartitioned && hive.convertMetastoreParquet && @@ -538,7 +538,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with (relation, parquetRelation, attributedRewrites) // Write path - case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _) + case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _, _) // Inserting into partitioned table is not supported in Parquet data source (yet). if !relation.hiveQlTable.isPartitioned && hive.convertMetastoreParquet && @@ -569,15 +569,15 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val alias = r.alias.getOrElse(r.tableName) Subquery(alias, parquetRelation) - case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite) + case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) if relationMap.contains(r) => val parquetRelation = relationMap(r) - InsertIntoTable(parquetRelation, partition, child, overwrite) + InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists) - case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite) + case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) if relationMap.contains(r) => val parquetRelation = relationMap(r) - InsertIntoTable(parquetRelation, partition, child, overwrite) + InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists) case other => other.transformExpressions { case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a) @@ -698,7 +698,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with // Wait until children are resolved. case p: LogicalPlan if !p.childrenResolved => p - case p @ InsertIntoTable(table: MetastoreRelation, _, child, _) => + case p @ InsertIntoTable(table: MetastoreRelation, _, child, _, _) => castChildOutput(p, table, child) } @@ -715,7 +715,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with .forall { case (left, right) => left.sameType(right) }) { // If both types ignoring nullability of ArrayType, MapType, StructType are the same, // use InsertIntoHiveTable instead of InsertIntoTable. - InsertIntoHiveTable(p.table, p.partition, p.child, p.overwrite) + InsertIntoHiveTable(p.table, p.partition, p.child, p.overwrite, p.ifNotExists) } else { // Only do the casting when child output data types differ from table output data types. val castedChildOutput = child.output.zip(table.output).map { @@ -753,7 +753,8 @@ private[hive] case class InsertIntoHiveTable( table: LogicalPlan, partition: Map[String, Option[String]], child: LogicalPlan, - overwrite: Boolean) + overwrite: Boolean, + ifNotExists: Boolean) extends LogicalPlan { override def children: Seq[LogicalPlan] = child :: Nil diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index b2ae74efeb..53a204b8c2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -1002,7 +1002,27 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C cleanIdentifier(key.toLowerCase) -> None }.toMap).getOrElse(Map.empty) - InsertIntoTable(UnresolvedRelation(tableIdent, None), partitionKeys, query, overwrite) + InsertIntoTable(UnresolvedRelation(tableIdent, None), partitionKeys, query, overwrite, false) + + case Token(destinationToken(), + Token("TOK_TAB", + tableArgs) :: + Token("TOK_IFNOTEXISTS", + ifNotExists) :: Nil) => + val Some(tableNameParts) :: partitionClause :: Nil = + getClauses(Seq("TOK_TABNAME", "TOK_PARTSPEC"), tableArgs) + + val tableIdent = extractTableIdent(tableNameParts) + + val partitionKeys = partitionClause.map(_.getChildren.map { + // Parse partitions. We also make keys case insensitive. + case Token("TOK_PARTVAL", Token(key, Nil) :: Token(value, Nil) :: Nil) => + cleanIdentifier(key.toLowerCase) -> Some(PlanUtils.stripQuotes(value)) + case Token("TOK_PARTVAL", Token(key, Nil) :: Nil) => + cleanIdentifier(key.toLowerCase) -> None + }.toMap).getOrElse(Map.empty) + + InsertIntoTable(UnresolvedRelation(tableIdent, None), partitionKeys, query, overwrite, true) case a: ASTNode => throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 5f7e897295..1ccb0c279c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -184,12 +184,14 @@ private[hive] trait HiveStrategies { object DataSinks extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) => + case logical.InsertIntoTable( + table: MetastoreRelation, partition, child, overwrite, ifNotExists) => execution.InsertIntoHiveTable( - table, partition, planLater(child), overwrite) :: Nil - case hive.InsertIntoHiveTable(table: MetastoreRelation, partition, child, overwrite) => + table, partition, planLater(child), overwrite, ifNotExists) :: Nil + case hive.InsertIntoHiveTable( + table: MetastoreRelation, partition, child, overwrite, ifNotExists) => execution.InsertIntoHiveTable( - table, partition, planLater(child), overwrite) :: Nil + table, partition, planLater(child), overwrite, ifNotExists) :: Nil case _ => Nil } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index fade9e5852..76a1965f3c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -67,7 +67,7 @@ case class CreateTableAsSelect( new org.apache.hadoop.hive.metastore.api.AlreadyExistsException(s"$database.$tableName") } } else { - hiveContext.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true)).toRdd + hiveContext.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true, false)).toRdd } Seq.empty[Row] diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 6c96747439..89995a91b1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -45,7 +45,8 @@ case class InsertIntoHiveTable( table: MetastoreRelation, partition: Map[String, Option[String]], child: SparkPlan, - overwrite: Boolean) extends UnaryNode with HiveInspectors { + overwrite: Boolean, + ifNotExists: Boolean) extends UnaryNode with HiveInspectors { @transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext] @transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass @@ -219,15 +220,25 @@ case class InsertIntoHiveTable( isSkewedStoreAsSubdir) } } else { - catalog.synchronized { - catalog.client.loadPartition( - outputPath, - qualifiedTableName, - orderedPartitionSpec, - overwrite, - holdDDLTime, - inheritTableSpecs, - isSkewedStoreAsSubdir) + // scalastyle:off + // ifNotExists is only valid with static partition, refer to + // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries + // scalastyle:on + val oldPart = catalog.synchronized { + catalog.client.getPartition( + catalog.client.getTable(qualifiedTableName), partitionSpec, false) + } + if (oldPart == null || !ifNotExists) { + catalog.synchronized { + catalog.client.loadPartition( + outputPath, + qualifiedTableName, + orderedPartitionSpec, + overwrite, + holdDDLTime, + inheritTableSpecs, + isSkewedStoreAsSubdir) + } } } } else { |