From e09ab5da8b02da98d7b2496d549c1d53cceb8728 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 22 Apr 2016 18:26:28 +0800 Subject: [SPARK-14609][SQL] Native support for LOAD DATA DDL command ## What changes were proposed in this pull request? Add the native support for LOAD DATA DDL command that loads data into Hive table/partition. ## How was this patch tested? `HiveDDLCommandSuite` and `HiveQuerySuite`. Besides, few Hive tests (`WindowQuerySuite`, `HiveTableScanSuite` and `HiveSerDeSuite`) also use `LOAD DATA` command. Author: Liang-Chi Hsieh Closes #12412 from viirya/ddl-load-data. --- .../spark/sql/execution/SparkSqlParser.scala | 25 ++++ .../apache/spark/sql/execution/command/ddl.scala | 2 +- .../spark/sql/execution/command/tables.scala | 130 ++++++++++++++++++++- 3 files changed, 154 insertions(+), 3 deletions(-) (limited to 'sql/core/src/main/scala/org/apache') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 9e69274311..e983a4cee6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -271,6 +271,25 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } } + /** + * Create a [[LoadData]] command. + * + * For example: + * {{{ + * LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename + * [PARTITION (partcol1=val1, partcol2=val2 ...)] + * }}} + */ + override def visitLoadData(ctx: LoadDataContext): LogicalPlan = withOrigin(ctx) { + LoadData( + table = visitTableIdentifier(ctx.tableIdentifier), + path = string(ctx.path), + isLocal = ctx.LOCAL != null, + isOverwrite = ctx.OVERWRITE != null, + partition = Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec) + ) + } + /** * Convert a table property list into a key-value map. */ @@ -954,6 +973,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { /** * Create a [[CreateTableLike]] command. + * + * For example: + * {{{ + * CREATE TABLE [IF NOT EXISTS] [db_name.]table_name + * LIKE [other_db_name.]existing_table_name + * }}} */ override def visitCreateTableLike(ctx: CreateTableLikeContext): LogicalPlan = withOrigin(ctx) { val targetTable = visitTableIdentifier(ctx.target) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index fc37a142cd..85f0066f3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -508,7 +508,7 @@ case class AlterTableReplaceCol( extends NativeDDLCommand(sql) with Logging -private object DDLUtils { +private[sql] object DDLUtils { def isDatasourceTable(props: Map[String, String]): Boolean = { props.contains("spark.sql.sources.provider") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 43fb38484d..11612c20d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -17,15 +17,18 @@ package org.apache.spark.sql.execution.command +import java.io.File +import java.net.URI + import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.{AnalysisException, Row, SQLContext} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType, SimpleCatalogRelation} +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType, ExternalCatalog, SimpleCatalogRelation} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode} import org.apache.spark.sql.types.{MetadataBuilder, StringType} - +import org.apache.spark.util.Utils case class CreateTableAsSelectLogicalPlan( tableDesc: CatalogTable, @@ -139,6 +142,129 @@ case class AlterTableRename( } +/** + * A command that loads data into a Hive table. + * + * The syntax of this command is: + * {{{ + * LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename + * [PARTITION (partcol1=val1, partcol2=val2 ...)] + * }}} + */ +case class LoadData( + table: TableIdentifier, + path: String, + isLocal: Boolean, + isOverwrite: Boolean, + partition: Option[ExternalCatalog.TablePartitionSpec]) extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + if (!catalog.tableExists(table)) { + throw new AnalysisException( + s"Table in LOAD DATA does not exist: '$table'") + } + + val targetTable = catalog.getTableMetadataOption(table).getOrElse { + throw new AnalysisException( + s"Table in LOAD DATA cannot be temporary: '$table'") + } + + if (DDLUtils.isDatasourceTable(targetTable)) { + throw new AnalysisException( + "LOAD DATA is not supported for datasource tables") + } + + if (targetTable.partitionColumnNames.nonEmpty) { + if (partition.isEmpty || targetTable.partitionColumnNames.size != partition.get.size) { + throw new AnalysisException( + "LOAD DATA to partitioned table must specify a specific partition of " + + "the table by specifying values for all of the partitioning columns.") + } + + partition.get.keys.foreach { colName => + if (!targetTable.partitionColumnNames.contains(colName)) { + throw new AnalysisException( + s"LOAD DATA to partitioned table specifies a non-existing partition column: '$colName'") + } + } + } else { + if (partition.nonEmpty) { + throw new AnalysisException( + "LOAD DATA to non-partitioned table cannot specify partition.") + } + } + + val loadPath = + if (isLocal) { + val uri = Utils.resolveURI(path) + if (!new File(uri.getPath()).exists()) { + throw new AnalysisException(s"LOAD DATA with non-existing path: $path") + } + uri + } else { + val uri = new URI(path) + if (uri.getScheme() != null && uri.getAuthority() != null) { + uri + } else { + // Follow Hive's behavior: + // If no schema or authority is provided with non-local inpath, + // we will use hadoop configuration "fs.default.name". + val defaultFSConf = sqlContext.sparkContext.hadoopConfiguration.get("fs.default.name") + val defaultFS = if (defaultFSConf == null) { + new URI("") + } else { + new URI(defaultFSConf) + } + + val scheme = if (uri.getScheme() != null) { + uri.getScheme() + } else { + defaultFS.getScheme() + } + val authority = if (uri.getAuthority() != null) { + uri.getAuthority() + } else { + defaultFS.getAuthority() + } + + if (scheme == null) { + throw new AnalysisException( + "LOAD DATA with non-local path must specify URI Scheme.") + } + + // Follow Hive's behavior: + // If LOCAL is not specified, and the path is relative, + // then the path is interpreted relative to "/user/" + val uriPath = uri.getPath() + val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { + uriPath + } else { + s"/user/${System.getProperty("user.name")}/$uriPath" + } + new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) + } + } + + if (partition.nonEmpty) { + catalog.loadPartition( + targetTable.identifier, + loadPath.toString, + partition.get, + isOverwrite, + holdDDLTime = false, + inheritTableSpecs = true, + isSkewedStoreAsSubdir = false) + } else { + catalog.loadTable( + targetTable.identifier, + loadPath.toString, + isOverwrite, + holdDDLTime = false) + } + Seq.empty[Row] + } +} /** * Command that looks like -- cgit v1.2.3