diff options
author | Liang-Chi Hsieh <simonh@tw.ibm.com> | 2016-04-22 18:26:28 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-04-22 18:26:28 +0800 |
commit | e09ab5da8b02da98d7b2496d549c1d53cceb8728 (patch) | |
tree | f9aa98a66a2cfdb669d751d3f17d170e99eba0f8 /sql/core/src/main/scala/org/apache | |
parent | 284b15d2fbff7c0c3ffe8737838071d366ea5742 (diff) | |
download | spark-e09ab5da8b02da98d7b2496d549c1d53cceb8728.tar.gz spark-e09ab5da8b02da98d7b2496d549c1d53cceb8728.tar.bz2 spark-e09ab5da8b02da98d7b2496d549c1d53cceb8728.zip |
[SPARK-14609][SQL] Native support for LOAD DATA DDL command
## What changes were proposed in this pull request?
Add the native support for LOAD DATA DDL command that loads data into Hive table/partition.
## How was this patch tested?
`HiveDDLCommandSuite` and `HiveQuerySuite`. Besides, few Hive tests (`WindowQuerySuite`, `HiveTableScanSuite` and `HiveSerDeSuite`) also use `LOAD DATA` command.
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Closes #12412 from viirya/ddl-load-data.
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
3 files changed, 154 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 9e69274311..e983a4cee6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -272,6 +272,25 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** + * Create a [[LoadData]] command. + * + * For example: + * {{{ + * LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename + * [PARTITION (partcol1=val1, partcol2=val2 ...)] + * }}} + */ + override def visitLoadData(ctx: LoadDataContext): LogicalPlan = withOrigin(ctx) { + LoadData( + table = visitTableIdentifier(ctx.tableIdentifier), + path = string(ctx.path), + isLocal = ctx.LOCAL != null, + isOverwrite = ctx.OVERWRITE != null, + partition = Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec) + ) + } + + /** * Convert a table property list into a key-value map. */ override def visitTablePropertyList( @@ -954,6 +973,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { /** * Create a [[CreateTableLike]] command. + * + * For example: + * {{{ + * CREATE TABLE [IF NOT EXISTS] [db_name.]table_name + * LIKE [other_db_name.]existing_table_name + * }}} */ override def visitCreateTableLike(ctx: CreateTableLikeContext): LogicalPlan = withOrigin(ctx) { val targetTable = visitTableIdentifier(ctx.target) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index fc37a142cd..85f0066f3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -508,7 +508,7 @@ case class AlterTableReplaceCol( extends NativeDDLCommand(sql) with Logging -private object DDLUtils { +private[sql] object DDLUtils { def isDatasourceTable(props: Map[String, String]): Boolean = { props.contains("spark.sql.sources.provider") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 43fb38484d..11612c20d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -17,15 +17,18 @@ package org.apache.spark.sql.execution.command +import java.io.File +import java.net.URI + import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.{AnalysisException, Row, SQLContext} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType, SimpleCatalogRelation} +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType, ExternalCatalog, SimpleCatalogRelation} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode} import org.apache.spark.sql.types.{MetadataBuilder, StringType} - +import org.apache.spark.util.Utils case class CreateTableAsSelectLogicalPlan( tableDesc: CatalogTable, @@ -139,6 +142,129 @@ case class AlterTableRename( } +/** + * A command that loads data into a Hive table. + * + * The syntax of this command is: + * {{{ + * LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename + * [PARTITION (partcol1=val1, partcol2=val2 ...)] + * }}} + */ +case class LoadData( + table: TableIdentifier, + path: String, + isLocal: Boolean, + isOverwrite: Boolean, + partition: Option[ExternalCatalog.TablePartitionSpec]) extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + if (!catalog.tableExists(table)) { + throw new AnalysisException( + s"Table in LOAD DATA does not exist: '$table'") + } + + val targetTable = catalog.getTableMetadataOption(table).getOrElse { + throw new AnalysisException( + s"Table in LOAD DATA cannot be temporary: '$table'") + } + + if (DDLUtils.isDatasourceTable(targetTable)) { + throw new AnalysisException( + "LOAD DATA is not supported for datasource tables") + } + + if (targetTable.partitionColumnNames.nonEmpty) { + if (partition.isEmpty || targetTable.partitionColumnNames.size != partition.get.size) { + throw new AnalysisException( + "LOAD DATA to partitioned table must specify a specific partition of " + + "the table by specifying values for all of the partitioning columns.") + } + + partition.get.keys.foreach { colName => + if (!targetTable.partitionColumnNames.contains(colName)) { + throw new AnalysisException( + s"LOAD DATA to partitioned table specifies a non-existing partition column: '$colName'") + } + } + } else { + if (partition.nonEmpty) { + throw new AnalysisException( + "LOAD DATA to non-partitioned table cannot specify partition.") + } + } + + val loadPath = + if (isLocal) { + val uri = Utils.resolveURI(path) + if (!new File(uri.getPath()).exists()) { + throw new AnalysisException(s"LOAD DATA with non-existing path: $path") + } + uri + } else { + val uri = new URI(path) + if (uri.getScheme() != null && uri.getAuthority() != null) { + uri + } else { + // Follow Hive's behavior: + // If no schema or authority is provided with non-local inpath, + // we will use hadoop configuration "fs.default.name". + val defaultFSConf = sqlContext.sparkContext.hadoopConfiguration.get("fs.default.name") + val defaultFS = if (defaultFSConf == null) { + new URI("") + } else { + new URI(defaultFSConf) + } + + val scheme = if (uri.getScheme() != null) { + uri.getScheme() + } else { + defaultFS.getScheme() + } + val authority = if (uri.getAuthority() != null) { + uri.getAuthority() + } else { + defaultFS.getAuthority() + } + + if (scheme == null) { + throw new AnalysisException( + "LOAD DATA with non-local path must specify URI Scheme.") + } + + // Follow Hive's behavior: + // If LOCAL is not specified, and the path is relative, + // then the path is interpreted relative to "/user/<username>" + val uriPath = uri.getPath() + val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { + uriPath + } else { + s"/user/${System.getProperty("user.name")}/$uriPath" + } + new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) + } + } + + if (partition.nonEmpty) { + catalog.loadPartition( + targetTable.identifier, + loadPath.toString, + partition.get, + isOverwrite, + holdDDLTime = false, + inheritTableSpecs = true, + isSkewedStoreAsSubdir = false) + } else { + catalog.loadTable( + targetTable.identifier, + loadPath.toString, + isOverwrite, + holdDDLTime = false) + } + Seq.empty[Row] + } +} /** * Command that looks like |