aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <simonh@tw.ibm.com>2016-04-22 18:26:28 +0800
committerWenchen Fan <wenchen@databricks.com>2016-04-22 18:26:28 +0800
commite09ab5da8b02da98d7b2496d549c1d53cceb8728 (patch)
treef9aa98a66a2cfdb669d751d3f17d170e99eba0f8 /sql/core/src/main/scala/org/apache
parent284b15d2fbff7c0c3ffe8737838071d366ea5742 (diff)
downloadspark-e09ab5da8b02da98d7b2496d549c1d53cceb8728.tar.gz
spark-e09ab5da8b02da98d7b2496d549c1d53cceb8728.tar.bz2
spark-e09ab5da8b02da98d7b2496d549c1d53cceb8728.zip
[SPARK-14609][SQL] Native support for LOAD DATA DDL command
## What changes were proposed in this pull request? Add the native support for LOAD DATA DDL command that loads data into Hive table/partition. ## How was this patch tested? `HiveDDLCommandSuite` and `HiveQuerySuite`. Besides, few Hive tests (`WindowQuerySuite`, `HiveTableScanSuite` and `HiveSerDeSuite`) also use `LOAD DATA` command. Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Closes #12412 from viirya/ddl-load-data.
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala25
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala130
3 files changed, 154 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 9e69274311..e983a4cee6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -272,6 +272,25 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
/**
+ * Create a [[LoadData]] command.
+ *
+ * For example:
+ * {{{
+ * LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename
+ * [PARTITION (partcol1=val1, partcol2=val2 ...)]
+ * }}}
+ */
+ override def visitLoadData(ctx: LoadDataContext): LogicalPlan = withOrigin(ctx) {
+ LoadData(
+ table = visitTableIdentifier(ctx.tableIdentifier),
+ path = string(ctx.path),
+ isLocal = ctx.LOCAL != null,
+ isOverwrite = ctx.OVERWRITE != null,
+ partition = Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)
+ )
+ }
+
+ /**
* Convert a table property list into a key-value map.
*/
override def visitTablePropertyList(
@@ -954,6 +973,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
/**
* Create a [[CreateTableLike]] command.
+ *
+ * For example:
+ * {{{
+ * CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
+ * LIKE [other_db_name.]existing_table_name
+ * }}}
*/
override def visitCreateTableLike(ctx: CreateTableLikeContext): LogicalPlan = withOrigin(ctx) {
val targetTable = visitTableIdentifier(ctx.target)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index fc37a142cd..85f0066f3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -508,7 +508,7 @@ case class AlterTableReplaceCol(
extends NativeDDLCommand(sql) with Logging
-private object DDLUtils {
+private[sql] object DDLUtils {
def isDatasourceTable(props: Map[String, String]): Boolean = {
props.contains("spark.sql.sources.provider")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 43fb38484d..11612c20d0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -17,15 +17,18 @@
package org.apache.spark.sql.execution.command
+import java.io.File
+import java.net.URI
+
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType, SimpleCatalogRelation}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType, ExternalCatalog, SimpleCatalogRelation}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode}
import org.apache.spark.sql.types.{MetadataBuilder, StringType}
-
+import org.apache.spark.util.Utils
case class CreateTableAsSelectLogicalPlan(
tableDesc: CatalogTable,
@@ -139,6 +142,129 @@ case class AlterTableRename(
}
+/**
+ * A command that loads data into a Hive table.
+ *
+ * The syntax of this command is:
+ * {{{
+ * LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename
+ * [PARTITION (partcol1=val1, partcol2=val2 ...)]
+ * }}}
+ */
+case class LoadData(
+ table: TableIdentifier,
+ path: String,
+ isLocal: Boolean,
+ isOverwrite: Boolean,
+ partition: Option[ExternalCatalog.TablePartitionSpec]) extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val catalog = sqlContext.sessionState.catalog
+ if (!catalog.tableExists(table)) {
+ throw new AnalysisException(
+ s"Table in LOAD DATA does not exist: '$table'")
+ }
+
+ val targetTable = catalog.getTableMetadataOption(table).getOrElse {
+ throw new AnalysisException(
+ s"Table in LOAD DATA cannot be temporary: '$table'")
+ }
+
+ if (DDLUtils.isDatasourceTable(targetTable)) {
+ throw new AnalysisException(
+ "LOAD DATA is not supported for datasource tables")
+ }
+
+ if (targetTable.partitionColumnNames.nonEmpty) {
+ if (partition.isEmpty || targetTable.partitionColumnNames.size != partition.get.size) {
+ throw new AnalysisException(
+ "LOAD DATA to partitioned table must specify a specific partition of " +
+ "the table by specifying values for all of the partitioning columns.")
+ }
+
+ partition.get.keys.foreach { colName =>
+ if (!targetTable.partitionColumnNames.contains(colName)) {
+ throw new AnalysisException(
+ s"LOAD DATA to partitioned table specifies a non-existing partition column: '$colName'")
+ }
+ }
+ } else {
+ if (partition.nonEmpty) {
+ throw new AnalysisException(
+ "LOAD DATA to non-partitioned table cannot specify partition.")
+ }
+ }
+
+ val loadPath =
+ if (isLocal) {
+ val uri = Utils.resolveURI(path)
+ if (!new File(uri.getPath()).exists()) {
+ throw new AnalysisException(s"LOAD DATA with non-existing path: $path")
+ }
+ uri
+ } else {
+ val uri = new URI(path)
+ if (uri.getScheme() != null && uri.getAuthority() != null) {
+ uri
+ } else {
+ // Follow Hive's behavior:
+ // If no schema or authority is provided with non-local inpath,
+ // we will use hadoop configuration "fs.default.name".
+ val defaultFSConf = sqlContext.sparkContext.hadoopConfiguration.get("fs.default.name")
+ val defaultFS = if (defaultFSConf == null) {
+ new URI("")
+ } else {
+ new URI(defaultFSConf)
+ }
+
+ val scheme = if (uri.getScheme() != null) {
+ uri.getScheme()
+ } else {
+ defaultFS.getScheme()
+ }
+ val authority = if (uri.getAuthority() != null) {
+ uri.getAuthority()
+ } else {
+ defaultFS.getAuthority()
+ }
+
+ if (scheme == null) {
+ throw new AnalysisException(
+ "LOAD DATA with non-local path must specify URI Scheme.")
+ }
+
+ // Follow Hive's behavior:
+ // If LOCAL is not specified, and the path is relative,
+ // then the path is interpreted relative to "/user/<username>"
+ val uriPath = uri.getPath()
+ val absolutePath = if (uriPath != null && uriPath.startsWith("/")) {
+ uriPath
+ } else {
+ s"/user/${System.getProperty("user.name")}/$uriPath"
+ }
+ new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment())
+ }
+ }
+
+ if (partition.nonEmpty) {
+ catalog.loadPartition(
+ targetTable.identifier,
+ loadPath.toString,
+ partition.get,
+ isOverwrite,
+ holdDDLTime = false,
+ inheritTableSpecs = true,
+ isSkewedStoreAsSubdir = false)
+ } else {
+ catalog.loadTable(
+ targetTable.identifier,
+ loadPath.toString,
+ isOverwrite,
+ holdDDLTime = false)
+ }
+ Seq.empty[Row]
+ }
+}
/**
* Command that looks like