diff options
author | Liang-Chi Hsieh <simonh@tw.ibm.com> | 2016-04-22 18:26:28 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-04-22 18:26:28 +0800 |
commit | e09ab5da8b02da98d7b2496d549c1d53cceb8728 (patch) | |
tree | f9aa98a66a2cfdb669d751d3f17d170e99eba0f8 | |
parent | 284b15d2fbff7c0c3ffe8737838071d366ea5742 (diff) | |
download | spark-e09ab5da8b02da98d7b2496d549c1d53cceb8728.tar.gz spark-e09ab5da8b02da98d7b2496d549c1d53cceb8728.tar.bz2 spark-e09ab5da8b02da98d7b2496d549c1d53cceb8728.zip |
[SPARK-14609][SQL] Native support for LOAD DATA DDL command
## What changes were proposed in this pull request?
Add the native support for LOAD DATA DDL command that loads data into Hive table/partition.
## How was this patch tested?
`HiveDDLCommandSuite` and `HiveQuerySuite`. Besides, few Hive tests (`WindowQuerySuite`, `HiveTableScanSuite` and `HiveSerDeSuite`) also use `LOAD DATA` command.
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Closes #12412 from viirya/ddl-load-data.
11 files changed, 427 insertions, 8 deletions
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 6f104a1489..db453aaa6d 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -130,6 +130,8 @@ statement | CACHE LAZY? TABLE identifier (AS? query)? #cacheTable | UNCACHE TABLE identifier #uncacheTable | CLEAR CACHE #clearCache + | LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE + tableIdentifier partitionSpec? #loadData | ADD identifier .*? #addResource | SET ROLE .*? #failNativeCommand | SET .*? #setConfiguration @@ -147,7 +149,7 @@ hiveNativeCommands | ROLLBACK WORK? | SHOW PARTITIONS tableIdentifier partitionSpec? | DFS .*? - | (CREATE | ALTER | DROP | SHOW | DESC | DESCRIBE | LOAD) .*? + | (CREATE | ALTER | DROP | SHOW | DESC | DESCRIBE) .*? ; unsupportedHiveNativeCommands @@ -651,7 +653,7 @@ nonReserved | INPUTDRIVER | OUTPUTDRIVER | DBPROPERTIES | DFS | TRUNCATE | METADATA | REPLICATION | COMPUTE | STATISTICS | ANALYZE | PARTITIONED | EXTERNAL | DEFINED | RECORDWRITER | REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE - | ROLES | COMPACTIONS | PRINCIPALS | TRANSACTIONS | INDEX | INDEXES | LOCKS | OPTION + | ROLES | COMPACTIONS | PRINCIPALS | TRANSACTIONS | INDEX | INDEXES | LOCKS | OPTION | LOCAL | INPATH | ASC | DESC | LIMIT | RENAME | SETS ; @@ -881,6 +883,8 @@ INDEXES: 'INDEXES'; LOCKS: 'LOCKS'; OPTION: 'OPTION'; ANTI: 'ANTI'; +LOCAL: 'LOCAL'; +INPATH: 'INPATH'; STRING : '\'' ( ~('\''|'\\') | ('\\' .) )* '\'' diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index f8a6fb74cc..36f4f29068 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -205,6 +205,27 @@ class InMemoryCatalog extends ExternalCatalog { StringUtils.filterPattern(listTables(db), pattern) } + override def loadTable( + db: String, + table: String, + loadPath: String, + isOverwrite: Boolean, + holdDDLTime: Boolean): Unit = { + throw new AnalysisException("loadTable is not implemented for InMemoryCatalog.") + } + + override def loadPartition( + db: String, + table: String, + loadPath: String, + partition: TablePartitionSpec, + isOverwrite: Boolean, + holdDDLTime: Boolean, + inheritTableSpecs: Boolean, + isSkewedStoreAsSubdir: Boolean): Unit = { + throw new AnalysisException("loadPartition is not implemented for InMemoryCatalog.") + } + // -------------------------------------------------------------------------- // Partitions // -------------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index ab5124ea56..152bd499a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -187,6 +187,40 @@ class SessionCatalog( externalCatalog.getTableOption(db, table) } + /** + * Load files stored in given path into an existing metastore table. + * If no database is specified, assume the table is in the current database. + * If the specified table is not found in the database then an [[AnalysisException]] is thrown. + */ + def loadTable( + name: TableIdentifier, + loadPath: String, + isOverwrite: Boolean, + holdDDLTime: Boolean): Unit = { + val db = name.database.getOrElse(currentDb) + val table = formatTableName(name.table) + externalCatalog.loadTable(db, table, loadPath, isOverwrite, holdDDLTime) + } + + /** + * Load files stored in given path into the partition of an existing metastore table. + * If no database is specified, assume the table is in the current database. + * If the specified table is not found in the database then an [[AnalysisException]] is thrown. + */ + def loadPartition( + name: TableIdentifier, + loadPath: String, + partition: TablePartitionSpec, + isOverwrite: Boolean, + holdDDLTime: Boolean, + inheritTableSpecs: Boolean, + isSkewedStoreAsSubdir: Boolean): Unit = { + val db = name.database.getOrElse(currentDb) + val table = formatTableName(name.table) + externalCatalog.loadPartition(db, table, loadPath, partition, isOverwrite, holdDDLTime, + inheritTableSpecs, isSkewedStoreAsSubdir) + } + // ------------------------------------------------------------- // | Methods that interact with temporary and metastore tables | // ------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 75cbe32c16..fd5bcad0f8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -100,6 +100,23 @@ abstract class ExternalCatalog { def listTables(db: String, pattern: String): Seq[String] + def loadTable( + db: String, + table: String, + loadPath: String, + isOverwrite: Boolean, + holdDDLTime: Boolean): Unit + + def loadPartition( + db: String, + table: String, + loadPath: String, + partition: TablePartitionSpec, + isOverwrite: Boolean, + holdDDLTime: Boolean, + inheritTableSpecs: Boolean, + isSkewedStoreAsSubdir: Boolean): Unit + // -------------------------------------------------------------------------- // Partitions // -------------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 9e69274311..e983a4cee6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -272,6 +272,25 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** + * Create a [[LoadData]] command. + * + * For example: + * {{{ + * LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename + * [PARTITION (partcol1=val1, partcol2=val2 ...)] + * }}} + */ + override def visitLoadData(ctx: LoadDataContext): LogicalPlan = withOrigin(ctx) { + LoadData( + table = visitTableIdentifier(ctx.tableIdentifier), + path = string(ctx.path), + isLocal = ctx.LOCAL != null, + isOverwrite = ctx.OVERWRITE != null, + partition = Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec) + ) + } + + /** * Convert a table property list into a key-value map. */ override def visitTablePropertyList( @@ -954,6 +973,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { /** * Create a [[CreateTableLike]] command. + * + * For example: + * {{{ + * CREATE TABLE [IF NOT EXISTS] [db_name.]table_name + * LIKE [other_db_name.]existing_table_name + * }}} */ override def visitCreateTableLike(ctx: CreateTableLikeContext): LogicalPlan = withOrigin(ctx) { val targetTable = visitTableIdentifier(ctx.target) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index fc37a142cd..85f0066f3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -508,7 +508,7 @@ case class AlterTableReplaceCol( extends NativeDDLCommand(sql) with Logging -private object DDLUtils { +private[sql] object DDLUtils { def isDatasourceTable(props: Map[String, String]): Boolean = { props.contains("spark.sql.sources.provider") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 43fb38484d..11612c20d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -17,15 +17,18 @@ package org.apache.spark.sql.execution.command +import java.io.File +import java.net.URI + import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.{AnalysisException, Row, SQLContext} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType, SimpleCatalogRelation} +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType, ExternalCatalog, SimpleCatalogRelation} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode} import org.apache.spark.sql.types.{MetadataBuilder, StringType} - +import org.apache.spark.util.Utils case class CreateTableAsSelectLogicalPlan( tableDesc: CatalogTable, @@ -139,6 +142,129 @@ case class AlterTableRename( } +/** + * A command that loads data into a Hive table. + * + * The syntax of this command is: + * {{{ + * LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename + * [PARTITION (partcol1=val1, partcol2=val2 ...)] + * }}} + */ +case class LoadData( + table: TableIdentifier, + path: String, + isLocal: Boolean, + isOverwrite: Boolean, + partition: Option[ExternalCatalog.TablePartitionSpec]) extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + if (!catalog.tableExists(table)) { + throw new AnalysisException( + s"Table in LOAD DATA does not exist: '$table'") + } + + val targetTable = catalog.getTableMetadataOption(table).getOrElse { + throw new AnalysisException( + s"Table in LOAD DATA cannot be temporary: '$table'") + } + + if (DDLUtils.isDatasourceTable(targetTable)) { + throw new AnalysisException( + "LOAD DATA is not supported for datasource tables") + } + + if (targetTable.partitionColumnNames.nonEmpty) { + if (partition.isEmpty || targetTable.partitionColumnNames.size != partition.get.size) { + throw new AnalysisException( + "LOAD DATA to partitioned table must specify a specific partition of " + + "the table by specifying values for all of the partitioning columns.") + } + + partition.get.keys.foreach { colName => + if (!targetTable.partitionColumnNames.contains(colName)) { + throw new AnalysisException( + s"LOAD DATA to partitioned table specifies a non-existing partition column: '$colName'") + } + } + } else { + if (partition.nonEmpty) { + throw new AnalysisException( + "LOAD DATA to non-partitioned table cannot specify partition.") + } + } + + val loadPath = + if (isLocal) { + val uri = Utils.resolveURI(path) + if (!new File(uri.getPath()).exists()) { + throw new AnalysisException(s"LOAD DATA with non-existing path: $path") + } + uri + } else { + val uri = new URI(path) + if (uri.getScheme() != null && uri.getAuthority() != null) { + uri + } else { + // Follow Hive's behavior: + // If no schema or authority is provided with non-local inpath, + // we will use hadoop configuration "fs.default.name". + val defaultFSConf = sqlContext.sparkContext.hadoopConfiguration.get("fs.default.name") + val defaultFS = if (defaultFSConf == null) { + new URI("") + } else { + new URI(defaultFSConf) + } + + val scheme = if (uri.getScheme() != null) { + uri.getScheme() + } else { + defaultFS.getScheme() + } + val authority = if (uri.getAuthority() != null) { + uri.getAuthority() + } else { + defaultFS.getAuthority() + } + + if (scheme == null) { + throw new AnalysisException( + "LOAD DATA with non-local path must specify URI Scheme.") + } + + // Follow Hive's behavior: + // If LOCAL is not specified, and the path is relative, + // then the path is interpreted relative to "/user/<username>" + val uriPath = uri.getPath() + val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { + uriPath + } else { + s"/user/${System.getProperty("user.name")}/$uriPath" + } + new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) + } + } + + if (partition.nonEmpty) { + catalog.loadPartition( + targetTable.identifier, + loadPath.toString, + partition.get, + isOverwrite, + holdDDLTime = false, + inheritTableSpecs = true, + isSkewedStoreAsSubdir = false) + } else { + catalog.loadTable( + targetTable.identifier, + loadPath.toString, + isOverwrite, + holdDDLTime = false) + } + Seq.empty[Row] + } +} /** * Command that looks like diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 0d0f556d9e..cc8b41542e 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -166,7 +166,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { "SHOW TABLES;" -> "hive_test", s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE hive_test;" - -> "OK", + -> "", "CACHE TABLE hive_test;" -> "", "SELECT COUNT(*) FROM hive_test;" @@ -214,7 +214,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { "CREATE TABLE sourceTable (key INT, val STRING);" -> "", s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE sourceTable;" - -> "OK", + -> "", "INSERT INTO TABLE t1 SELECT key, val FROM sourceTable;" -> "", "SELECT count(key) FROM t1;" diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index f627384253..a92a94cae5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive +import java.util + import scala.util.control.NonFatal import org.apache.hadoop.hive.ql.metadata.HiveException @@ -197,6 +199,46 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat client.listTables(db, pattern) } + override def loadTable( + db: String, + table: String, + loadPath: String, + isOverwrite: Boolean, + holdDDLTime: Boolean): Unit = withClient { + requireTableExists(db, table) + client.loadTable( + loadPath, + s"$db.$table", + isOverwrite, + holdDDLTime) + } + + override def loadPartition( + db: String, + table: String, + loadPath: String, + partition: TablePartitionSpec, + isOverwrite: Boolean, + holdDDLTime: Boolean, + inheritTableSpecs: Boolean, + isSkewedStoreAsSubdir: Boolean): Unit = withClient { + requireTableExists(db, table) + + val orderedPartitionSpec = new util.LinkedHashMap[String, String]() + getTable(db, table).partitionColumnNames.foreach { colName => + orderedPartitionSpec.put(colName, partition(colName)) + } + + client.loadPartition( + loadPath, + s"$db.$table", + orderedPartitionSpec, + isOverwrite, + holdDDLTime, + inheritTableSpecs, + isSkewedStoreAsSubdir) + } + // -------------------------------------------------------------------------- // Partitions // -------------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index e3522567b9..f6e3a4bd2d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.JsonTuple import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation} -import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewAsSelectLogicalCommand, HiveNativeCommand} +import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewAsSelectLogicalCommand, HiveNativeCommand, LoadData} import org.apache.spark.sql.hive.test.TestHive class HiveDDLCommandSuite extends PlanTest { @@ -579,4 +579,29 @@ class HiveDDLCommandSuite extends PlanTest { assert(source2.table == "table2") } + test("load data") { + val v1 = "LOAD DATA INPATH 'path' INTO TABLE table1" + val (table, path, isLocal, isOverwrite, partition) = parser.parsePlan(v1).collect { + case LoadData(t, path, l, o, partition) => (t, path, l, o, partition) + }.head + assert(table.database.isEmpty) + assert(table.table == "table1") + assert(path == "path") + assert(!isLocal) + assert(!isOverwrite) + assert(partition.isEmpty) + + val v2 = "LOAD DATA LOCAL INPATH 'path' OVERWRITE INTO TABLE table1 PARTITION(c='1', d='2')" + val (table2, path2, isLocal2, isOverwrite2, partition2) = parser.parsePlan(v2).collect { + case LoadData(t, path, l, o, partition) => (t, path, l, o, partition) + }.head + assert(table2.database.isEmpty) + assert(table2.table == "table1") + assert(path2 == "path") + assert(isLocal2) + assert(isOverwrite2) + assert(partition2.nonEmpty) + assert(partition2.get.apply("c") == "1" && partition2.get.apply("d") == "2") + } + } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala index 061d1512a5..014c1009ed 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala @@ -122,4 +122,129 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto checkAnswer(sql("SHOW TBLPROPERTIES parquet_temp"), Nil) } } + + test("LOAD DATA") { + withTable("non_part_table", "part_table") { + sql( + """ + |CREATE TABLE non_part_table (employeeID INT, employeeName STRING) + |ROW FORMAT DELIMITED + |FIELDS TERMINATED BY '|' + |LINES TERMINATED BY '\n' + """.stripMargin) + + // employee.dat has two columns separated by '|', the first is an int, the second is a string. + // Its content looks like: + // 16|john + // 17|robert + val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath + + // LOAD DATA INTO non-partitioned table can't specify partition + intercept[AnalysisException] { + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE non_part_table PARTITION(ds="1")""") + } + + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE non_part_table""") + checkAnswer( + sql("SELECT * FROM non_part_table WHERE employeeID = 16"), + Row(16, "john") :: Nil) + + sql( + """ + |CREATE TABLE part_table (employeeID INT, employeeName STRING) + |PARTITIONED BY (c STRING, d STRING) + |ROW FORMAT DELIMITED + |FIELDS TERMINATED BY '|' + |LINES TERMINATED BY '\n' + """.stripMargin) + + // LOAD DATA INTO partitioned table must specify partition + intercept[AnalysisException] { + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table""") + } + + intercept[AnalysisException] { + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1")""") + } + intercept[AnalysisException] { + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(d="1")""") + } + intercept[AnalysisException] { + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1", k="2")""") + } + + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1", d="2")""") + checkAnswer( + sql("SELECT employeeID, employeeName FROM part_table WHERE c = '1' AND d = '2'"), + sql("SELECT * FROM non_part_table").collect()) + + // Different order of partition columns. + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(d="1", c="2")""") + checkAnswer( + sql("SELECT employeeID, employeeName FROM part_table WHERE c = '2' AND d = '1'"), + sql("SELECT * FROM non_part_table").collect()) + } + } + + test("LOAD DATA: input path") { + withTable("non_part_table") { + sql( + """ + |CREATE TABLE non_part_table (employeeID INT, employeeName STRING) + |ROW FORMAT DELIMITED + |FIELDS TERMINATED BY '|' + |LINES TERMINATED BY '\n' + """.stripMargin) + + // Non-existing inpath + intercept[AnalysisException] { + sql("""LOAD DATA LOCAL INPATH "/non-existing/data.txt" INTO TABLE non_part_table""") + } + + val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath + + // Non-local inpath: without URI Scheme and Authority + sql(s"""LOAD DATA INPATH "$testData" INTO TABLE non_part_table""") + checkAnswer( + sql("SELECT * FROM non_part_table WHERE employeeID = 16"), + Row(16, "john") :: Nil) + + // Use URI as LOCAL inpath: + // file:/path/to/data/files/employee.dat + val uri = "file:" + testData + sql(s"""LOAD DATA LOCAL INPATH "$uri" INTO TABLE non_part_table""") + + checkAnswer( + sql("SELECT * FROM non_part_table WHERE employeeID = 16"), + Row(16, "john") :: Row(16, "john") :: Nil) + + // Use URI as non-LOCAL inpath + sql(s"""LOAD DATA INPATH "$uri" INTO TABLE non_part_table""") + + checkAnswer( + sql("SELECT * FROM non_part_table WHERE employeeID = 16"), + Row(16, "john") :: Row(16, "john") :: Row(16, "john") :: Nil) + + sql(s"""LOAD DATA INPATH "$uri" OVERWRITE INTO TABLE non_part_table""") + + checkAnswer( + sql("SELECT * FROM non_part_table WHERE employeeID = 16"), + Row(16, "john") :: Nil) + + // Incorrect URI: + // file://path/to/data/files/employee.dat + val incorrectUri = "file:/" + testData + intercept[AnalysisException] { + sql(s"""LOAD DATA LOCAL INPATH "$incorrectUri" INTO TABLE non_part_table""") + } + + // Unset default URI Scheme and Authority: throw exception + val originalFsName = hiveContext.sparkContext.hadoopConfiguration.get("fs.default.name") + hiveContext.sparkContext.hadoopConfiguration.unset("fs.default.name") + intercept[AnalysisException] { + sql(s"""LOAD DATA INPATH "$testData" INTO TABLE non_part_table""") + } + hiveContext.sparkContext.hadoopConfiguration.set("fs.default.name", originalFsName) + } + } } |