diff options
author | Liang-Chi Hsieh <simonh@tw.ibm.com> | 2016-04-22 18:26:28 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-04-22 18:26:28 +0800 |
commit | e09ab5da8b02da98d7b2496d549c1d53cceb8728 (patch) | |
tree | f9aa98a66a2cfdb669d751d3f17d170e99eba0f8 /sql/hive | |
parent | 284b15d2fbff7c0c3ffe8737838071d366ea5742 (diff) | |
download | spark-e09ab5da8b02da98d7b2496d549c1d53cceb8728.tar.gz spark-e09ab5da8b02da98d7b2496d549c1d53cceb8728.tar.bz2 spark-e09ab5da8b02da98d7b2496d549c1d53cceb8728.zip |
[SPARK-14609][SQL] Native support for LOAD DATA DDL command
## What changes were proposed in this pull request?
Add the native support for LOAD DATA DDL command that loads data into Hive table/partition.
## How was this patch tested?
`HiveDDLCommandSuite` and `HiveQuerySuite`. Besides, few Hive tests (`WindowQuerySuite`, `HiveTableScanSuite` and `HiveSerDeSuite`) also use `LOAD DATA` command.
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Closes #12412 from viirya/ddl-load-data.
Diffstat (limited to 'sql/hive')
3 files changed, 193 insertions, 1 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index f627384253..a92a94cae5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive +import java.util + import scala.util.control.NonFatal import org.apache.hadoop.hive.ql.metadata.HiveException @@ -197,6 +199,46 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat client.listTables(db, pattern) } + override def loadTable( + db: String, + table: String, + loadPath: String, + isOverwrite: Boolean, + holdDDLTime: Boolean): Unit = withClient { + requireTableExists(db, table) + client.loadTable( + loadPath, + s"$db.$table", + isOverwrite, + holdDDLTime) + } + + override def loadPartition( + db: String, + table: String, + loadPath: String, + partition: TablePartitionSpec, + isOverwrite: Boolean, + holdDDLTime: Boolean, + inheritTableSpecs: Boolean, + isSkewedStoreAsSubdir: Boolean): Unit = withClient { + requireTableExists(db, table) + + val orderedPartitionSpec = new util.LinkedHashMap[String, String]() + getTable(db, table).partitionColumnNames.foreach { colName => + orderedPartitionSpec.put(colName, partition(colName)) + } + + client.loadPartition( + loadPath, + s"$db.$table", + orderedPartitionSpec, + isOverwrite, + holdDDLTime, + inheritTableSpecs, + isSkewedStoreAsSubdir) + } + // -------------------------------------------------------------------------- // Partitions // -------------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index e3522567b9..f6e3a4bd2d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.JsonTuple import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation} -import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewAsSelectLogicalCommand, HiveNativeCommand} +import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewAsSelectLogicalCommand, HiveNativeCommand, LoadData} import org.apache.spark.sql.hive.test.TestHive class HiveDDLCommandSuite extends PlanTest { @@ -579,4 +579,29 @@ class HiveDDLCommandSuite extends PlanTest { assert(source2.table == "table2") } + test("load data") { + val v1 = "LOAD DATA INPATH 'path' INTO TABLE table1" + val (table, path, isLocal, isOverwrite, partition) = parser.parsePlan(v1).collect { + case LoadData(t, path, l, o, partition) => (t, path, l, o, partition) + }.head + assert(table.database.isEmpty) + assert(table.table == "table1") + assert(path == "path") + assert(!isLocal) + assert(!isOverwrite) + assert(partition.isEmpty) + + val v2 = "LOAD DATA LOCAL INPATH 'path' OVERWRITE INTO TABLE table1 PARTITION(c='1', d='2')" + val (table2, path2, isLocal2, isOverwrite2, partition2) = parser.parsePlan(v2).collect { + case LoadData(t, path, l, o, partition) => (t, path, l, o, partition) + }.head + assert(table2.database.isEmpty) + assert(table2.table == "table1") + assert(path2 == "path") + assert(isLocal2) + assert(isOverwrite2) + assert(partition2.nonEmpty) + assert(partition2.get.apply("c") == "1" && partition2.get.apply("d") == "2") + } + } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala index 061d1512a5..014c1009ed 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala @@ -122,4 +122,129 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto checkAnswer(sql("SHOW TBLPROPERTIES parquet_temp"), Nil) } } + + test("LOAD DATA") { + withTable("non_part_table", "part_table") { + sql( + """ + |CREATE TABLE non_part_table (employeeID INT, employeeName STRING) + |ROW FORMAT DELIMITED + |FIELDS TERMINATED BY '|' + |LINES TERMINATED BY '\n' + """.stripMargin) + + // employee.dat has two columns separated by '|', the first is an int, the second is a string. + // Its content looks like: + // 16|john + // 17|robert + val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath + + // LOAD DATA INTO non-partitioned table can't specify partition + intercept[AnalysisException] { + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE non_part_table PARTITION(ds="1")""") + } + + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE non_part_table""") + checkAnswer( + sql("SELECT * FROM non_part_table WHERE employeeID = 16"), + Row(16, "john") :: Nil) + + sql( + """ + |CREATE TABLE part_table (employeeID INT, employeeName STRING) + |PARTITIONED BY (c STRING, d STRING) + |ROW FORMAT DELIMITED + |FIELDS TERMINATED BY '|' + |LINES TERMINATED BY '\n' + """.stripMargin) + + // LOAD DATA INTO partitioned table must specify partition + intercept[AnalysisException] { + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table""") + } + + intercept[AnalysisException] { + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1")""") + } + intercept[AnalysisException] { + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(d="1")""") + } + intercept[AnalysisException] { + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1", k="2")""") + } + + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1", d="2")""") + checkAnswer( + sql("SELECT employeeID, employeeName FROM part_table WHERE c = '1' AND d = '2'"), + sql("SELECT * FROM non_part_table").collect()) + + // Different order of partition columns. + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(d="1", c="2")""") + checkAnswer( + sql("SELECT employeeID, employeeName FROM part_table WHERE c = '2' AND d = '1'"), + sql("SELECT * FROM non_part_table").collect()) + } + } + + test("LOAD DATA: input path") { + withTable("non_part_table") { + sql( + """ + |CREATE TABLE non_part_table (employeeID INT, employeeName STRING) + |ROW FORMAT DELIMITED + |FIELDS TERMINATED BY '|' + |LINES TERMINATED BY '\n' + """.stripMargin) + + // Non-existing inpath + intercept[AnalysisException] { + sql("""LOAD DATA LOCAL INPATH "/non-existing/data.txt" INTO TABLE non_part_table""") + } + + val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath + + // Non-local inpath: without URI Scheme and Authority + sql(s"""LOAD DATA INPATH "$testData" INTO TABLE non_part_table""") + checkAnswer( + sql("SELECT * FROM non_part_table WHERE employeeID = 16"), + Row(16, "john") :: Nil) + + // Use URI as LOCAL inpath: + // file:/path/to/data/files/employee.dat + val uri = "file:" + testData + sql(s"""LOAD DATA LOCAL INPATH "$uri" INTO TABLE non_part_table""") + + checkAnswer( + sql("SELECT * FROM non_part_table WHERE employeeID = 16"), + Row(16, "john") :: Row(16, "john") :: Nil) + + // Use URI as non-LOCAL inpath + sql(s"""LOAD DATA INPATH "$uri" INTO TABLE non_part_table""") + + checkAnswer( + sql("SELECT * FROM non_part_table WHERE employeeID = 16"), + Row(16, "john") :: Row(16, "john") :: Row(16, "john") :: Nil) + + sql(s"""LOAD DATA INPATH "$uri" OVERWRITE INTO TABLE non_part_table""") + + checkAnswer( + sql("SELECT * FROM non_part_table WHERE employeeID = 16"), + Row(16, "john") :: Nil) + + // Incorrect URI: + // file://path/to/data/files/employee.dat + val incorrectUri = "file:/" + testData + intercept[AnalysisException] { + sql(s"""LOAD DATA LOCAL INPATH "$incorrectUri" INTO TABLE non_part_table""") + } + + // Unset default URI Scheme and Authority: throw exception + val originalFsName = hiveContext.sparkContext.hadoopConfiguration.get("fs.default.name") + hiveContext.sparkContext.hadoopConfiguration.unset("fs.default.name") + intercept[AnalysisException] { + sql(s"""LOAD DATA INPATH "$testData" INTO TABLE non_part_table""") + } + hiveContext.sparkContext.hadoopConfiguration.set("fs.default.name", originalFsName) + } + } } |