diff options
author | Liang-Chi Hsieh <simonh@tw.ibm.com> | 2016-04-22 18:26:28 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-04-22 18:26:28 +0800 |
commit | e09ab5da8b02da98d7b2496d549c1d53cceb8728 (patch) | |
tree | f9aa98a66a2cfdb669d751d3f17d170e99eba0f8 /sql/catalyst/src/main | |
parent | 284b15d2fbff7c0c3ffe8737838071d366ea5742 (diff) | |
download | spark-e09ab5da8b02da98d7b2496d549c1d53cceb8728.tar.gz spark-e09ab5da8b02da98d7b2496d549c1d53cceb8728.tar.bz2 spark-e09ab5da8b02da98d7b2496d549c1d53cceb8728.zip |
[SPARK-14609][SQL] Native support for LOAD DATA DDL command
## What changes were proposed in this pull request?
Add the native support for LOAD DATA DDL command that loads data into Hive table/partition.
## How was this patch tested?
`HiveDDLCommandSuite` and `HiveQuerySuite`. Besides, few Hive tests (`WindowQuerySuite`, `HiveTableScanSuite` and `HiveSerDeSuite`) also use `LOAD DATA` command.
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Closes #12412 from viirya/ddl-load-data.
Diffstat (limited to 'sql/catalyst/src/main')
4 files changed, 78 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 6f104a1489..db453aaa6d 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -130,6 +130,8 @@ statement | CACHE LAZY? TABLE identifier (AS? query)? #cacheTable | UNCACHE TABLE identifier #uncacheTable | CLEAR CACHE #clearCache + | LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE + tableIdentifier partitionSpec? #loadData | ADD identifier .*? #addResource | SET ROLE .*? #failNativeCommand | SET .*? #setConfiguration @@ -147,7 +149,7 @@ hiveNativeCommands | ROLLBACK WORK? | SHOW PARTITIONS tableIdentifier partitionSpec? | DFS .*? - | (CREATE | ALTER | DROP | SHOW | DESC | DESCRIBE | LOAD) .*? + | (CREATE | ALTER | DROP | SHOW | DESC | DESCRIBE) .*? ; unsupportedHiveNativeCommands @@ -651,7 +653,7 @@ nonReserved | INPUTDRIVER | OUTPUTDRIVER | DBPROPERTIES | DFS | TRUNCATE | METADATA | REPLICATION | COMPUTE | STATISTICS | ANALYZE | PARTITIONED | EXTERNAL | DEFINED | RECORDWRITER | REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE - | ROLES | COMPACTIONS | PRINCIPALS | TRANSACTIONS | INDEX | INDEXES | LOCKS | OPTION + | ROLES | COMPACTIONS | PRINCIPALS | TRANSACTIONS | INDEX | INDEXES | LOCKS | OPTION | LOCAL | INPATH | ASC | DESC | LIMIT | RENAME | SETS ; @@ -881,6 +883,8 @@ INDEXES: 'INDEXES'; LOCKS: 'LOCKS'; OPTION: 'OPTION'; ANTI: 'ANTI'; +LOCAL: 'LOCAL'; +INPATH: 'INPATH'; STRING : '\'' ( ~('\''|'\\') | ('\\' .) )* '\'' diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index f8a6fb74cc..36f4f29068 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -205,6 +205,27 @@ class InMemoryCatalog extends ExternalCatalog { StringUtils.filterPattern(listTables(db), pattern) } + override def loadTable( + db: String, + table: String, + loadPath: String, + isOverwrite: Boolean, + holdDDLTime: Boolean): Unit = { + throw new AnalysisException("loadTable is not implemented for InMemoryCatalog.") + } + + override def loadPartition( + db: String, + table: String, + loadPath: String, + partition: TablePartitionSpec, + isOverwrite: Boolean, + holdDDLTime: Boolean, + inheritTableSpecs: Boolean, + isSkewedStoreAsSubdir: Boolean): Unit = { + throw new AnalysisException("loadPartition is not implemented for InMemoryCatalog.") + } + // -------------------------------------------------------------------------- // Partitions // -------------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index ab5124ea56..152bd499a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -187,6 +187,40 @@ class SessionCatalog( externalCatalog.getTableOption(db, table) } + /** + * Load files stored in given path into an existing metastore table. + * If no database is specified, assume the table is in the current database. + * If the specified table is not found in the database then an [[AnalysisException]] is thrown. + */ + def loadTable( + name: TableIdentifier, + loadPath: String, + isOverwrite: Boolean, + holdDDLTime: Boolean): Unit = { + val db = name.database.getOrElse(currentDb) + val table = formatTableName(name.table) + externalCatalog.loadTable(db, table, loadPath, isOverwrite, holdDDLTime) + } + + /** + * Load files stored in given path into the partition of an existing metastore table. + * If no database is specified, assume the table is in the current database. + * If the specified table is not found in the database then an [[AnalysisException]] is thrown. + */ + def loadPartition( + name: TableIdentifier, + loadPath: String, + partition: TablePartitionSpec, + isOverwrite: Boolean, + holdDDLTime: Boolean, + inheritTableSpecs: Boolean, + isSkewedStoreAsSubdir: Boolean): Unit = { + val db = name.database.getOrElse(currentDb) + val table = formatTableName(name.table) + externalCatalog.loadPartition(db, table, loadPath, partition, isOverwrite, holdDDLTime, + inheritTableSpecs, isSkewedStoreAsSubdir) + } + // ------------------------------------------------------------- // | Methods that interact with temporary and metastore tables | // ------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 75cbe32c16..fd5bcad0f8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -100,6 +100,23 @@ abstract class ExternalCatalog { def listTables(db: String, pattern: String): Seq[String] + def loadTable( + db: String, + table: String, + loadPath: String, + isOverwrite: Boolean, + holdDDLTime: Boolean): Unit + + def loadPartition( + db: String, + table: String, + loadPath: String, + partition: TablePartitionSpec, + isOverwrite: Boolean, + holdDDLTime: Boolean, + inheritTableSpecs: Boolean, + isSkewedStoreAsSubdir: Boolean): Unit + // -------------------------------------------------------------------------- // Partitions // -------------------------------------------------------------------------- |