aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <simonh@tw.ibm.com>2016-04-22 18:26:28 +0800
committerWenchen Fan <wenchen@databricks.com>2016-04-22 18:26:28 +0800
commite09ab5da8b02da98d7b2496d549c1d53cceb8728 (patch)
treef9aa98a66a2cfdb669d751d3f17d170e99eba0f8 /sql/catalyst
parent284b15d2fbff7c0c3ffe8737838071d366ea5742 (diff)
downloadspark-e09ab5da8b02da98d7b2496d549c1d53cceb8728.tar.gz
spark-e09ab5da8b02da98d7b2496d549c1d53cceb8728.tar.bz2
spark-e09ab5da8b02da98d7b2496d549c1d53cceb8728.zip
[SPARK-14609][SQL] Native support for LOAD DATA DDL command
## What changes were proposed in this pull request? Add the native support for LOAD DATA DDL command that loads data into Hive table/partition. ## How was this patch tested? `HiveDDLCommandSuite` and `HiveQuerySuite`. Besides, few Hive tests (`WindowQuerySuite`, `HiveTableScanSuite` and `HiveSerDeSuite`) also use `LOAD DATA` command. Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Closes #12412 from viirya/ddl-load-data.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g48
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala21
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala34
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala17
4 files changed, 78 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 6f104a1489..db453aaa6d 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -130,6 +130,8 @@ statement
| CACHE LAZY? TABLE identifier (AS? query)? #cacheTable
| UNCACHE TABLE identifier #uncacheTable
| CLEAR CACHE #clearCache
+ | LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE
+ tableIdentifier partitionSpec? #loadData
| ADD identifier .*? #addResource
| SET ROLE .*? #failNativeCommand
| SET .*? #setConfiguration
@@ -147,7 +149,7 @@ hiveNativeCommands
| ROLLBACK WORK?
| SHOW PARTITIONS tableIdentifier partitionSpec?
| DFS .*?
- | (CREATE | ALTER | DROP | SHOW | DESC | DESCRIBE | LOAD) .*?
+ | (CREATE | ALTER | DROP | SHOW | DESC | DESCRIBE) .*?
;
unsupportedHiveNativeCommands
@@ -651,7 +653,7 @@ nonReserved
| INPUTDRIVER | OUTPUTDRIVER | DBPROPERTIES | DFS | TRUNCATE | METADATA | REPLICATION | COMPUTE
| STATISTICS | ANALYZE | PARTITIONED | EXTERNAL | DEFINED | RECORDWRITER
| REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE
- | ROLES | COMPACTIONS | PRINCIPALS | TRANSACTIONS | INDEX | INDEXES | LOCKS | OPTION
+ | ROLES | COMPACTIONS | PRINCIPALS | TRANSACTIONS | INDEX | INDEXES | LOCKS | OPTION | LOCAL | INPATH
| ASC | DESC | LIMIT | RENAME | SETS
;
@@ -881,6 +883,8 @@ INDEXES: 'INDEXES';
LOCKS: 'LOCKS';
OPTION: 'OPTION';
ANTI: 'ANTI';
+LOCAL: 'LOCAL';
+INPATH: 'INPATH';
STRING
: '\'' ( ~('\''|'\\') | ('\\' .) )* '\''
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index f8a6fb74cc..36f4f29068 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -205,6 +205,27 @@ class InMemoryCatalog extends ExternalCatalog {
StringUtils.filterPattern(listTables(db), pattern)
}
+ override def loadTable(
+ db: String,
+ table: String,
+ loadPath: String,
+ isOverwrite: Boolean,
+ holdDDLTime: Boolean): Unit = {
+ throw new AnalysisException("loadTable is not implemented for InMemoryCatalog.")
+ }
+
+ override def loadPartition(
+ db: String,
+ table: String,
+ loadPath: String,
+ partition: TablePartitionSpec,
+ isOverwrite: Boolean,
+ holdDDLTime: Boolean,
+ inheritTableSpecs: Boolean,
+ isSkewedStoreAsSubdir: Boolean): Unit = {
+ throw new AnalysisException("loadPartition is not implemented for InMemoryCatalog.")
+ }
+
// --------------------------------------------------------------------------
// Partitions
// --------------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index ab5124ea56..152bd499a0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -187,6 +187,40 @@ class SessionCatalog(
externalCatalog.getTableOption(db, table)
}
+ /**
+ * Load files stored in given path into an existing metastore table.
+ * If no database is specified, assume the table is in the current database.
+ * If the specified table is not found in the database then an [[AnalysisException]] is thrown.
+ */
+ def loadTable(
+ name: TableIdentifier,
+ loadPath: String,
+ isOverwrite: Boolean,
+ holdDDLTime: Boolean): Unit = {
+ val db = name.database.getOrElse(currentDb)
+ val table = formatTableName(name.table)
+ externalCatalog.loadTable(db, table, loadPath, isOverwrite, holdDDLTime)
+ }
+
+ /**
+ * Load files stored in given path into the partition of an existing metastore table.
+ * If no database is specified, assume the table is in the current database.
+ * If the specified table is not found in the database then an [[AnalysisException]] is thrown.
+ */
+ def loadPartition(
+ name: TableIdentifier,
+ loadPath: String,
+ partition: TablePartitionSpec,
+ isOverwrite: Boolean,
+ holdDDLTime: Boolean,
+ inheritTableSpecs: Boolean,
+ isSkewedStoreAsSubdir: Boolean): Unit = {
+ val db = name.database.getOrElse(currentDb)
+ val table = formatTableName(name.table)
+ externalCatalog.loadPartition(db, table, loadPath, partition, isOverwrite, holdDDLTime,
+ inheritTableSpecs, isSkewedStoreAsSubdir)
+ }
+
// -------------------------------------------------------------
// | Methods that interact with temporary and metastore tables |
// -------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 75cbe32c16..fd5bcad0f8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -100,6 +100,23 @@ abstract class ExternalCatalog {
def listTables(db: String, pattern: String): Seq[String]
+ def loadTable(
+ db: String,
+ table: String,
+ loadPath: String,
+ isOverwrite: Boolean,
+ holdDDLTime: Boolean): Unit
+
+ def loadPartition(
+ db: String,
+ table: String,
+ loadPath: String,
+ partition: TablePartitionSpec,
+ isOverwrite: Boolean,
+ holdDDLTime: Boolean,
+ inheritTableSpecs: Boolean,
+ isSkewedStoreAsSubdir: Boolean): Unit
+
// --------------------------------------------------------------------------
// Partitions
// --------------------------------------------------------------------------