aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <simonh@tw.ibm.com>2016-04-22 18:26:28 +0800
committerWenchen Fan <wenchen@databricks.com>2016-04-22 18:26:28 +0800
commite09ab5da8b02da98d7b2496d549c1d53cceb8728 (patch)
treef9aa98a66a2cfdb669d751d3f17d170e99eba0f8 /sql/hive
parent284b15d2fbff7c0c3ffe8737838071d366ea5742 (diff)
downloadspark-e09ab5da8b02da98d7b2496d549c1d53cceb8728.tar.gz
spark-e09ab5da8b02da98d7b2496d549c1d53cceb8728.tar.bz2
spark-e09ab5da8b02da98d7b2496d549c1d53cceb8728.zip
[SPARK-14609][SQL] Native support for LOAD DATA DDL command
## What changes were proposed in this pull request? Add the native support for LOAD DATA DDL command that loads data into Hive table/partition. ## How was this patch tested? `HiveDDLCommandSuite` and `HiveQuerySuite`. Besides, few Hive tests (`WindowQuerySuite`, `HiveTableScanSuite` and `HiveSerDeSuite`) also use `LOAD DATA` command. Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Closes #12412 from viirya/ddl-load-data.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala42
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala27
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala125
3 files changed, 193 insertions, 1 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index f627384253..a92a94cae5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.hive
+import java.util
+
import scala.util.control.NonFatal
import org.apache.hadoop.hive.ql.metadata.HiveException
@@ -197,6 +199,46 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat
client.listTables(db, pattern)
}
+ override def loadTable(
+ db: String,
+ table: String,
+ loadPath: String,
+ isOverwrite: Boolean,
+ holdDDLTime: Boolean): Unit = withClient {
+ requireTableExists(db, table)
+ client.loadTable(
+ loadPath,
+ s"$db.$table",
+ isOverwrite,
+ holdDDLTime)
+ }
+
+ override def loadPartition(
+ db: String,
+ table: String,
+ loadPath: String,
+ partition: TablePartitionSpec,
+ isOverwrite: Boolean,
+ holdDDLTime: Boolean,
+ inheritTableSpecs: Boolean,
+ isSkewedStoreAsSubdir: Boolean): Unit = withClient {
+ requireTableExists(db, table)
+
+ val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
+ getTable(db, table).partitionColumnNames.foreach { colName =>
+ orderedPartitionSpec.put(colName, partition(colName))
+ }
+
+ client.loadPartition(
+ loadPath,
+ s"$db.$table",
+ orderedPartitionSpec,
+ isOverwrite,
+ holdDDLTime,
+ inheritTableSpecs,
+ isSkewedStoreAsSubdir)
+ }
+
// --------------------------------------------------------------------------
// Partitions
// --------------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index e3522567b9..f6e3a4bd2d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.JsonTuple
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation}
-import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewAsSelectLogicalCommand, HiveNativeCommand}
+import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewAsSelectLogicalCommand, HiveNativeCommand, LoadData}
import org.apache.spark.sql.hive.test.TestHive
class HiveDDLCommandSuite extends PlanTest {
@@ -579,4 +579,29 @@ class HiveDDLCommandSuite extends PlanTest {
assert(source2.table == "table2")
}
+ test("load data") {
+ val v1 = "LOAD DATA INPATH 'path' INTO TABLE table1"
+ val (table, path, isLocal, isOverwrite, partition) = parser.parsePlan(v1).collect {
+ case LoadData(t, path, l, o, partition) => (t, path, l, o, partition)
+ }.head
+ assert(table.database.isEmpty)
+ assert(table.table == "table1")
+ assert(path == "path")
+ assert(!isLocal)
+ assert(!isOverwrite)
+ assert(partition.isEmpty)
+
+ val v2 = "LOAD DATA LOCAL INPATH 'path' OVERWRITE INTO TABLE table1 PARTITION(c='1', d='2')"
+ val (table2, path2, isLocal2, isOverwrite2, partition2) = parser.parsePlan(v2).collect {
+ case LoadData(t, path, l, o, partition) => (t, path, l, o, partition)
+ }.head
+ assert(table2.database.isEmpty)
+ assert(table2.table == "table1")
+ assert(path2 == "path")
+ assert(isLocal2)
+ assert(isOverwrite2)
+ assert(partition2.nonEmpty)
+ assert(partition2.get.apply("c") == "1" && partition2.get.apply("d") == "2")
+ }
+
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
index 061d1512a5..014c1009ed 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
@@ -122,4 +122,129 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
checkAnswer(sql("SHOW TBLPROPERTIES parquet_temp"), Nil)
}
}
+
+ test("LOAD DATA") {
+ withTable("non_part_table", "part_table") {
+ sql(
+ """
+ |CREATE TABLE non_part_table (employeeID INT, employeeName STRING)
+ |ROW FORMAT DELIMITED
+ |FIELDS TERMINATED BY '|'
+ |LINES TERMINATED BY '\n'
+ """.stripMargin)
+
+ // employee.dat has two columns separated by '|', the first is an int, the second is a string.
+ // Its content looks like:
+ // 16|john
+ // 17|robert
+ val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath
+
+ // LOAD DATA INTO non-partitioned table can't specify partition
+ intercept[AnalysisException] {
+ sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE non_part_table PARTITION(ds="1")""")
+ }
+
+ sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE non_part_table""")
+ checkAnswer(
+ sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
+ Row(16, "john") :: Nil)
+
+ sql(
+ """
+ |CREATE TABLE part_table (employeeID INT, employeeName STRING)
+ |PARTITIONED BY (c STRING, d STRING)
+ |ROW FORMAT DELIMITED
+ |FIELDS TERMINATED BY '|'
+ |LINES TERMINATED BY '\n'
+ """.stripMargin)
+
+ // LOAD DATA INTO partitioned table must specify partition
+ intercept[AnalysisException] {
+ sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table""")
+ }
+
+ intercept[AnalysisException] {
+ sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1")""")
+ }
+ intercept[AnalysisException] {
+ sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(d="1")""")
+ }
+ intercept[AnalysisException] {
+ sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1", k="2")""")
+ }
+
+ sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1", d="2")""")
+ checkAnswer(
+ sql("SELECT employeeID, employeeName FROM part_table WHERE c = '1' AND d = '2'"),
+ sql("SELECT * FROM non_part_table").collect())
+
+ // Different order of partition columns.
+ sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(d="1", c="2")""")
+ checkAnswer(
+ sql("SELECT employeeID, employeeName FROM part_table WHERE c = '2' AND d = '1'"),
+ sql("SELECT * FROM non_part_table").collect())
+ }
+ }
+
+ test("LOAD DATA: input path") {
+ withTable("non_part_table") {
+ sql(
+ """
+ |CREATE TABLE non_part_table (employeeID INT, employeeName STRING)
+ |ROW FORMAT DELIMITED
+ |FIELDS TERMINATED BY '|'
+ |LINES TERMINATED BY '\n'
+ """.stripMargin)
+
+ // Non-existing inpath
+ intercept[AnalysisException] {
+ sql("""LOAD DATA LOCAL INPATH "/non-existing/data.txt" INTO TABLE non_part_table""")
+ }
+
+ val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath
+
+ // Non-local inpath: without URI Scheme and Authority
+ sql(s"""LOAD DATA INPATH "$testData" INTO TABLE non_part_table""")
+ checkAnswer(
+ sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
+ Row(16, "john") :: Nil)
+
+ // Use URI as LOCAL inpath:
+ // file:/path/to/data/files/employee.dat
+ val uri = "file:" + testData
+ sql(s"""LOAD DATA LOCAL INPATH "$uri" INTO TABLE non_part_table""")
+
+ checkAnswer(
+ sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
+ Row(16, "john") :: Row(16, "john") :: Nil)
+
+ // Use URI as non-LOCAL inpath
+ sql(s"""LOAD DATA INPATH "$uri" INTO TABLE non_part_table""")
+
+ checkAnswer(
+ sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
+ Row(16, "john") :: Row(16, "john") :: Row(16, "john") :: Nil)
+
+ sql(s"""LOAD DATA INPATH "$uri" OVERWRITE INTO TABLE non_part_table""")
+
+ checkAnswer(
+ sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
+ Row(16, "john") :: Nil)
+
+ // Incorrect URI:
+ // file://path/to/data/files/employee.dat
+ val incorrectUri = "file:/" + testData
+ intercept[AnalysisException] {
+ sql(s"""LOAD DATA LOCAL INPATH "$incorrectUri" INTO TABLE non_part_table""")
+ }
+
+ // Unset default URI Scheme and Authority: throw exception
+ val originalFsName = hiveContext.sparkContext.hadoopConfiguration.get("fs.default.name")
+ hiveContext.sparkContext.hadoopConfiguration.unset("fs.default.name")
+ intercept[AnalysisException] {
+ sql(s"""LOAD DATA INPATH "$testData" INTO TABLE non_part_table""")
+ }
+ hiveContext.sparkContext.hadoopConfiguration.set("fs.default.name", originalFsName)
+ }
+ }
}