aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <simonh@tw.ibm.com>2016-04-22 18:26:28 +0800
committerWenchen Fan <wenchen@databricks.com>2016-04-22 18:26:28 +0800
commite09ab5da8b02da98d7b2496d549c1d53cceb8728 (patch)
treef9aa98a66a2cfdb669d751d3f17d170e99eba0f8
parent284b15d2fbff7c0c3ffe8737838071d366ea5742 (diff)
downloadspark-e09ab5da8b02da98d7b2496d549c1d53cceb8728.tar.gz
spark-e09ab5da8b02da98d7b2496d549c1d53cceb8728.tar.bz2
spark-e09ab5da8b02da98d7b2496d549c1d53cceb8728.zip
[SPARK-14609][SQL] Native support for LOAD DATA DDL command
## What changes were proposed in this pull request? Add the native support for LOAD DATA DDL command that loads data into Hive table/partition. ## How was this patch tested? `HiveDDLCommandSuite` and `HiveQuerySuite`. Besides, few Hive tests (`WindowQuerySuite`, `HiveTableScanSuite` and `HiveSerDeSuite`) also use `LOAD DATA` command. Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Closes #12412 from viirya/ddl-load-data.
-rw-r--r--sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g48
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala21
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala34
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala17
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala25
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala130
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala42
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala27
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala125
11 files changed, 427 insertions, 8 deletions
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 6f104a1489..db453aaa6d 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -130,6 +130,8 @@ statement
| CACHE LAZY? TABLE identifier (AS? query)? #cacheTable
| UNCACHE TABLE identifier #uncacheTable
| CLEAR CACHE #clearCache
+ | LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE
+ tableIdentifier partitionSpec? #loadData
| ADD identifier .*? #addResource
| SET ROLE .*? #failNativeCommand
| SET .*? #setConfiguration
@@ -147,7 +149,7 @@ hiveNativeCommands
| ROLLBACK WORK?
| SHOW PARTITIONS tableIdentifier partitionSpec?
| DFS .*?
- | (CREATE | ALTER | DROP | SHOW | DESC | DESCRIBE | LOAD) .*?
+ | (CREATE | ALTER | DROP | SHOW | DESC | DESCRIBE) .*?
;
unsupportedHiveNativeCommands
@@ -651,7 +653,7 @@ nonReserved
| INPUTDRIVER | OUTPUTDRIVER | DBPROPERTIES | DFS | TRUNCATE | METADATA | REPLICATION | COMPUTE
| STATISTICS | ANALYZE | PARTITIONED | EXTERNAL | DEFINED | RECORDWRITER
| REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE
- | ROLES | COMPACTIONS | PRINCIPALS | TRANSACTIONS | INDEX | INDEXES | LOCKS | OPTION
+ | ROLES | COMPACTIONS | PRINCIPALS | TRANSACTIONS | INDEX | INDEXES | LOCKS | OPTION | LOCAL | INPATH
| ASC | DESC | LIMIT | RENAME | SETS
;
@@ -881,6 +883,8 @@ INDEXES: 'INDEXES';
LOCKS: 'LOCKS';
OPTION: 'OPTION';
ANTI: 'ANTI';
+LOCAL: 'LOCAL';
+INPATH: 'INPATH';
STRING
: '\'' ( ~('\''|'\\') | ('\\' .) )* '\''
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index f8a6fb74cc..36f4f29068 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -205,6 +205,27 @@ class InMemoryCatalog extends ExternalCatalog {
StringUtils.filterPattern(listTables(db), pattern)
}
+ override def loadTable(
+ db: String,
+ table: String,
+ loadPath: String,
+ isOverwrite: Boolean,
+ holdDDLTime: Boolean): Unit = {
+ throw new AnalysisException("loadTable is not implemented for InMemoryCatalog.")
+ }
+
+ override def loadPartition(
+ db: String,
+ table: String,
+ loadPath: String,
+ partition: TablePartitionSpec,
+ isOverwrite: Boolean,
+ holdDDLTime: Boolean,
+ inheritTableSpecs: Boolean,
+ isSkewedStoreAsSubdir: Boolean): Unit = {
+ throw new AnalysisException("loadPartition is not implemented for InMemoryCatalog.")
+ }
+
// --------------------------------------------------------------------------
// Partitions
// --------------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index ab5124ea56..152bd499a0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -187,6 +187,40 @@ class SessionCatalog(
externalCatalog.getTableOption(db, table)
}
+ /**
+ * Load files stored in given path into an existing metastore table.
+ * If no database is specified, assume the table is in the current database.
+ * If the specified table is not found in the database then an [[AnalysisException]] is thrown.
+ */
+ def loadTable(
+ name: TableIdentifier,
+ loadPath: String,
+ isOverwrite: Boolean,
+ holdDDLTime: Boolean): Unit = {
+ val db = name.database.getOrElse(currentDb)
+ val table = formatTableName(name.table)
+ externalCatalog.loadTable(db, table, loadPath, isOverwrite, holdDDLTime)
+ }
+
+ /**
+ * Load files stored in given path into the partition of an existing metastore table.
+ * If no database is specified, assume the table is in the current database.
+ * If the specified table is not found in the database then an [[AnalysisException]] is thrown.
+ */
+ def loadPartition(
+ name: TableIdentifier,
+ loadPath: String,
+ partition: TablePartitionSpec,
+ isOverwrite: Boolean,
+ holdDDLTime: Boolean,
+ inheritTableSpecs: Boolean,
+ isSkewedStoreAsSubdir: Boolean): Unit = {
+ val db = name.database.getOrElse(currentDb)
+ val table = formatTableName(name.table)
+ externalCatalog.loadPartition(db, table, loadPath, partition, isOverwrite, holdDDLTime,
+ inheritTableSpecs, isSkewedStoreAsSubdir)
+ }
+
// -------------------------------------------------------------
// | Methods that interact with temporary and metastore tables |
// -------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 75cbe32c16..fd5bcad0f8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -100,6 +100,23 @@ abstract class ExternalCatalog {
def listTables(db: String, pattern: String): Seq[String]
+ def loadTable(
+ db: String,
+ table: String,
+ loadPath: String,
+ isOverwrite: Boolean,
+ holdDDLTime: Boolean): Unit
+
+ def loadPartition(
+ db: String,
+ table: String,
+ loadPath: String,
+ partition: TablePartitionSpec,
+ isOverwrite: Boolean,
+ holdDDLTime: Boolean,
+ inheritTableSpecs: Boolean,
+ isSkewedStoreAsSubdir: Boolean): Unit
+
// --------------------------------------------------------------------------
// Partitions
// --------------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 9e69274311..e983a4cee6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -272,6 +272,25 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
/**
+ * Create a [[LoadData]] command.
+ *
+ * For example:
+ * {{{
+ * LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename
+ * [PARTITION (partcol1=val1, partcol2=val2 ...)]
+ * }}}
+ */
+ override def visitLoadData(ctx: LoadDataContext): LogicalPlan = withOrigin(ctx) {
+ LoadData(
+ table = visitTableIdentifier(ctx.tableIdentifier),
+ path = string(ctx.path),
+ isLocal = ctx.LOCAL != null,
+ isOverwrite = ctx.OVERWRITE != null,
+ partition = Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)
+ )
+ }
+
+ /**
* Convert a table property list into a key-value map.
*/
override def visitTablePropertyList(
@@ -954,6 +973,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
/**
* Create a [[CreateTableLike]] command.
+ *
+ * For example:
+ * {{{
+ * CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
+ * LIKE [other_db_name.]existing_table_name
+ * }}}
*/
override def visitCreateTableLike(ctx: CreateTableLikeContext): LogicalPlan = withOrigin(ctx) {
val targetTable = visitTableIdentifier(ctx.target)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index fc37a142cd..85f0066f3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -508,7 +508,7 @@ case class AlterTableReplaceCol(
extends NativeDDLCommand(sql) with Logging
-private object DDLUtils {
+private[sql] object DDLUtils {
def isDatasourceTable(props: Map[String, String]): Boolean = {
props.contains("spark.sql.sources.provider")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 43fb38484d..11612c20d0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -17,15 +17,18 @@
package org.apache.spark.sql.execution.command
+import java.io.File
+import java.net.URI
+
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType, SimpleCatalogRelation}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType, ExternalCatalog, SimpleCatalogRelation}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode}
import org.apache.spark.sql.types.{MetadataBuilder, StringType}
-
+import org.apache.spark.util.Utils
case class CreateTableAsSelectLogicalPlan(
tableDesc: CatalogTable,
@@ -139,6 +142,129 @@ case class AlterTableRename(
}
+/**
+ * A command that loads data into a Hive table.
+ *
+ * The syntax of this command is:
+ * {{{
+ * LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename
+ * [PARTITION (partcol1=val1, partcol2=val2 ...)]
+ * }}}
+ */
+case class LoadData(
+ table: TableIdentifier,
+ path: String,
+ isLocal: Boolean,
+ isOverwrite: Boolean,
+ partition: Option[ExternalCatalog.TablePartitionSpec]) extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val catalog = sqlContext.sessionState.catalog
+ if (!catalog.tableExists(table)) {
+ throw new AnalysisException(
+ s"Table in LOAD DATA does not exist: '$table'")
+ }
+
+ val targetTable = catalog.getTableMetadataOption(table).getOrElse {
+ throw new AnalysisException(
+ s"Table in LOAD DATA cannot be temporary: '$table'")
+ }
+
+ if (DDLUtils.isDatasourceTable(targetTable)) {
+ throw new AnalysisException(
+ "LOAD DATA is not supported for datasource tables")
+ }
+
+ if (targetTable.partitionColumnNames.nonEmpty) {
+ if (partition.isEmpty || targetTable.partitionColumnNames.size != partition.get.size) {
+ throw new AnalysisException(
+ "LOAD DATA to partitioned table must specify a specific partition of " +
+ "the table by specifying values for all of the partitioning columns.")
+ }
+
+ partition.get.keys.foreach { colName =>
+ if (!targetTable.partitionColumnNames.contains(colName)) {
+ throw new AnalysisException(
+ s"LOAD DATA to partitioned table specifies a non-existing partition column: '$colName'")
+ }
+ }
+ } else {
+ if (partition.nonEmpty) {
+ throw new AnalysisException(
+ "LOAD DATA to non-partitioned table cannot specify partition.")
+ }
+ }
+
+ val loadPath =
+ if (isLocal) {
+ val uri = Utils.resolveURI(path)
+ if (!new File(uri.getPath()).exists()) {
+ throw new AnalysisException(s"LOAD DATA with non-existing path: $path")
+ }
+ uri
+ } else {
+ val uri = new URI(path)
+ if (uri.getScheme() != null && uri.getAuthority() != null) {
+ uri
+ } else {
+ // Follow Hive's behavior:
+ // If no schema or authority is provided with non-local inpath,
+ // we will use hadoop configuration "fs.default.name".
+ val defaultFSConf = sqlContext.sparkContext.hadoopConfiguration.get("fs.default.name")
+ val defaultFS = if (defaultFSConf == null) {
+ new URI("")
+ } else {
+ new URI(defaultFSConf)
+ }
+
+ val scheme = if (uri.getScheme() != null) {
+ uri.getScheme()
+ } else {
+ defaultFS.getScheme()
+ }
+ val authority = if (uri.getAuthority() != null) {
+ uri.getAuthority()
+ } else {
+ defaultFS.getAuthority()
+ }
+
+ if (scheme == null) {
+ throw new AnalysisException(
+ "LOAD DATA with non-local path must specify URI Scheme.")
+ }
+
+ // Follow Hive's behavior:
+ // If LOCAL is not specified, and the path is relative,
+ // then the path is interpreted relative to "/user/<username>"
+ val uriPath = uri.getPath()
+ val absolutePath = if (uriPath != null && uriPath.startsWith("/")) {
+ uriPath
+ } else {
+ s"/user/${System.getProperty("user.name")}/$uriPath"
+ }
+ new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment())
+ }
+ }
+
+ if (partition.nonEmpty) {
+ catalog.loadPartition(
+ targetTable.identifier,
+ loadPath.toString,
+ partition.get,
+ isOverwrite,
+ holdDDLTime = false,
+ inheritTableSpecs = true,
+ isSkewedStoreAsSubdir = false)
+ } else {
+ catalog.loadTable(
+ targetTable.identifier,
+ loadPath.toString,
+ isOverwrite,
+ holdDDLTime = false)
+ }
+ Seq.empty[Row]
+ }
+}
/**
* Command that looks like
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index 0d0f556d9e..cc8b41542e 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -166,7 +166,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
"SHOW TABLES;"
-> "hive_test",
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE hive_test;"
- -> "OK",
+ -> "",
"CACHE TABLE hive_test;"
-> "",
"SELECT COUNT(*) FROM hive_test;"
@@ -214,7 +214,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
"CREATE TABLE sourceTable (key INT, val STRING);"
-> "",
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE sourceTable;"
- -> "OK",
+ -> "",
"INSERT INTO TABLE t1 SELECT key, val FROM sourceTable;"
-> "",
"SELECT count(key) FROM t1;"
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index f627384253..a92a94cae5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.hive
+import java.util
+
import scala.util.control.NonFatal
import org.apache.hadoop.hive.ql.metadata.HiveException
@@ -197,6 +199,46 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat
client.listTables(db, pattern)
}
+ override def loadTable(
+ db: String,
+ table: String,
+ loadPath: String,
+ isOverwrite: Boolean,
+ holdDDLTime: Boolean): Unit = withClient {
+ requireTableExists(db, table)
+ client.loadTable(
+ loadPath,
+ s"$db.$table",
+ isOverwrite,
+ holdDDLTime)
+ }
+
+ override def loadPartition(
+ db: String,
+ table: String,
+ loadPath: String,
+ partition: TablePartitionSpec,
+ isOverwrite: Boolean,
+ holdDDLTime: Boolean,
+ inheritTableSpecs: Boolean,
+ isSkewedStoreAsSubdir: Boolean): Unit = withClient {
+ requireTableExists(db, table)
+
+ val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
+ getTable(db, table).partitionColumnNames.foreach { colName =>
+ orderedPartitionSpec.put(colName, partition(colName))
+ }
+
+ client.loadPartition(
+ loadPath,
+ s"$db.$table",
+ orderedPartitionSpec,
+ isOverwrite,
+ holdDDLTime,
+ inheritTableSpecs,
+ isSkewedStoreAsSubdir)
+ }
+
// --------------------------------------------------------------------------
// Partitions
// --------------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index e3522567b9..f6e3a4bd2d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.JsonTuple
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation}
-import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewAsSelectLogicalCommand, HiveNativeCommand}
+import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewAsSelectLogicalCommand, HiveNativeCommand, LoadData}
import org.apache.spark.sql.hive.test.TestHive
class HiveDDLCommandSuite extends PlanTest {
@@ -579,4 +579,29 @@ class HiveDDLCommandSuite extends PlanTest {
assert(source2.table == "table2")
}
+ test("load data") {
+ val v1 = "LOAD DATA INPATH 'path' INTO TABLE table1"
+ val (table, path, isLocal, isOverwrite, partition) = parser.parsePlan(v1).collect {
+ case LoadData(t, path, l, o, partition) => (t, path, l, o, partition)
+ }.head
+ assert(table.database.isEmpty)
+ assert(table.table == "table1")
+ assert(path == "path")
+ assert(!isLocal)
+ assert(!isOverwrite)
+ assert(partition.isEmpty)
+
+ val v2 = "LOAD DATA LOCAL INPATH 'path' OVERWRITE INTO TABLE table1 PARTITION(c='1', d='2')"
+ val (table2, path2, isLocal2, isOverwrite2, partition2) = parser.parsePlan(v2).collect {
+ case LoadData(t, path, l, o, partition) => (t, path, l, o, partition)
+ }.head
+ assert(table2.database.isEmpty)
+ assert(table2.table == "table1")
+ assert(path2 == "path")
+ assert(isLocal2)
+ assert(isOverwrite2)
+ assert(partition2.nonEmpty)
+ assert(partition2.get.apply("c") == "1" && partition2.get.apply("d") == "2")
+ }
+
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
index 061d1512a5..014c1009ed 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
@@ -122,4 +122,129 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
checkAnswer(sql("SHOW TBLPROPERTIES parquet_temp"), Nil)
}
}
+
+ test("LOAD DATA") {
+ withTable("non_part_table", "part_table") {
+ sql(
+ """
+ |CREATE TABLE non_part_table (employeeID INT, employeeName STRING)
+ |ROW FORMAT DELIMITED
+ |FIELDS TERMINATED BY '|'
+ |LINES TERMINATED BY '\n'
+ """.stripMargin)
+
+ // employee.dat has two columns separated by '|', the first is an int, the second is a string.
+ // Its content looks like:
+ // 16|john
+ // 17|robert
+ val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath
+
+ // LOAD DATA INTO non-partitioned table can't specify partition
+ intercept[AnalysisException] {
+ sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE non_part_table PARTITION(ds="1")""")
+ }
+
+ sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE non_part_table""")
+ checkAnswer(
+ sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
+ Row(16, "john") :: Nil)
+
+ sql(
+ """
+ |CREATE TABLE part_table (employeeID INT, employeeName STRING)
+ |PARTITIONED BY (c STRING, d STRING)
+ |ROW FORMAT DELIMITED
+ |FIELDS TERMINATED BY '|'
+ |LINES TERMINATED BY '\n'
+ """.stripMargin)
+
+ // LOAD DATA INTO partitioned table must specify partition
+ intercept[AnalysisException] {
+ sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table""")
+ }
+
+ intercept[AnalysisException] {
+ sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1")""")
+ }
+ intercept[AnalysisException] {
+ sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(d="1")""")
+ }
+ intercept[AnalysisException] {
+ sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1", k="2")""")
+ }
+
+ sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1", d="2")""")
+ checkAnswer(
+ sql("SELECT employeeID, employeeName FROM part_table WHERE c = '1' AND d = '2'"),
+ sql("SELECT * FROM non_part_table").collect())
+
+ // Different order of partition columns.
+ sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(d="1", c="2")""")
+ checkAnswer(
+ sql("SELECT employeeID, employeeName FROM part_table WHERE c = '2' AND d = '1'"),
+ sql("SELECT * FROM non_part_table").collect())
+ }
+ }
+
+ test("LOAD DATA: input path") {
+ withTable("non_part_table") {
+ sql(
+ """
+ |CREATE TABLE non_part_table (employeeID INT, employeeName STRING)
+ |ROW FORMAT DELIMITED
+ |FIELDS TERMINATED BY '|'
+ |LINES TERMINATED BY '\n'
+ """.stripMargin)
+
+ // Non-existing inpath
+ intercept[AnalysisException] {
+ sql("""LOAD DATA LOCAL INPATH "/non-existing/data.txt" INTO TABLE non_part_table""")
+ }
+
+ val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath
+
+ // Non-local inpath: without URI Scheme and Authority
+ sql(s"""LOAD DATA INPATH "$testData" INTO TABLE non_part_table""")
+ checkAnswer(
+ sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
+ Row(16, "john") :: Nil)
+
+ // Use URI as LOCAL inpath:
+ // file:/path/to/data/files/employee.dat
+ val uri = "file:" + testData
+ sql(s"""LOAD DATA LOCAL INPATH "$uri" INTO TABLE non_part_table""")
+
+ checkAnswer(
+ sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
+ Row(16, "john") :: Row(16, "john") :: Nil)
+
+ // Use URI as non-LOCAL inpath
+ sql(s"""LOAD DATA INPATH "$uri" INTO TABLE non_part_table""")
+
+ checkAnswer(
+ sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
+ Row(16, "john") :: Row(16, "john") :: Row(16, "john") :: Nil)
+
+ sql(s"""LOAD DATA INPATH "$uri" OVERWRITE INTO TABLE non_part_table""")
+
+ checkAnswer(
+ sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
+ Row(16, "john") :: Nil)
+
+ // Incorrect URI:
+ // file://path/to/data/files/employee.dat
+ val incorrectUri = "file:/" + testData
+ intercept[AnalysisException] {
+ sql(s"""LOAD DATA LOCAL INPATH "$incorrectUri" INTO TABLE non_part_table""")
+ }
+
+ // Unset default URI Scheme and Authority: throw exception
+ val originalFsName = hiveContext.sparkContext.hadoopConfiguration.get("fs.default.name")
+ hiveContext.sparkContext.hadoopConfiguration.unset("fs.default.name")
+ intercept[AnalysisException] {
+ sql(s"""LOAD DATA INPATH "$testData" INTO TABLE non_part_table""")
+ }
+ hiveContext.sparkContext.hadoopConfiguration.set("fs.default.name", originalFsName)
+ }
+ }
}