aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-02-21 15:00:24 -0800
committerReynold Xin <rxin@databricks.com>2016-02-21 15:00:24 -0800
commit6c3832b26e119626205732b8fd03c8f5ba986896 (patch)
treec23d83055b66647662414f4a5f835ec30efbe64f /sql
parent7eb83fefd19e137d80a23b5174b66b14831c291a (diff)
downloadspark-6c3832b26e119626205732b8fd03c8f5ba986896.tar.gz
spark-6c3832b26e119626205732b8fd03c8f5ba986896.tar.bz2
spark-6c3832b26e119626205732b8fd03c8f5ba986896.zip
[SPARK-13080][SQL] Implement new Catalog API using Hive
## What changes were proposed in this pull request? This is a step towards merging `SQLContext` and `HiveContext`. A new internal Catalog API was introduced in #10982 and extended in #11069. This patch introduces an implementation of this API using `HiveClient`, an existing interface to Hive. It also extends `HiveClient` with additional calls to Hive that are needed to complete the catalog implementation. *Where should I start reviewing?* The new catalog introduced is `HiveCatalog`. This class is relatively simple because it just calls `HiveClientImpl`, where most of the new logic is. I would not start with `HiveClient`, `HiveQl`, or `HiveMetastoreCatalog`, which are modified mainly because of a refactor. *Why is this patch so big?* I had to refactor HiveClient to remove an intermediate representation of databases, tables, partitions etc. After this refactor `CatalogTable` convert directly to and from `HiveTable` (etc.). Otherwise we would have to first convert `CatalogTable` to the intermediate representation and then convert that to HiveTable, which is messy. The new class hierarchy is as follows: ``` org.apache.spark.sql.catalyst.catalog.Catalog - org.apache.spark.sql.catalyst.catalog.InMemoryCatalog - org.apache.spark.sql.hive.HiveCatalog ``` Note that, as of this patch, none of these classes are currently used anywhere yet. This will come in the future before the Spark 2.0 release. ## How was the this patch tested? All existing unit tests, and HiveCatalogSuite that extends CatalogTestCases. Author: Andrew Or <andrew@databricks.com> Author: Reynold Xin <rxin@databricks.com> Closes #11293 from rxin/hive-catalog.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala9
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala52
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala154
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala190
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala284
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala293
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala171
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala145
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala210
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala379
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala20
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala20
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala49
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala40
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala94
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala25
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala37
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala2
21 files changed, 1483 insertions, 700 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
index f9992185a4..97f28fad62 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
@@ -19,6 +19,9 @@ package org.apache.spark.sql
import org.apache.spark.annotation.DeveloperApi
+
+// TODO: don't swallow original stack trace if it exists
+
/**
* :: DeveloperApi ::
* Thrown when a query fails to analyze, usually because the query itself is invalid.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index 67edab55db..52b284b757 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -20,20 +20,11 @@ package org.apache.spark.sql.catalyst.analysis
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, EmptyConf, TableIdentifier}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
-/**
- * Thrown by a catalog when a table cannot be found. The analyzer will rethrow the exception
- * as an AnalysisException with the correct position information.
- */
-class NoSuchTableException extends Exception
-
-class NoSuchDatabaseException extends Exception
/**
* An interface for looking up relations by name. Used by an [[Analyzer]].
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
new file mode 100644
index 0000000000..81399db9bc
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.catalog.Catalog.TablePartitionSpec
+
+
+/**
+ * Thrown by a catalog when an item cannot be found. The analyzer will rethrow the exception
+ * as an [[org.apache.spark.sql.AnalysisException]] with the correct position information.
+ */
+abstract class NoSuchItemException extends Exception {
+ override def getMessage: String
+}
+
+class NoSuchDatabaseException(db: String) extends NoSuchItemException {
+ override def getMessage: String = s"Database $db not found"
+}
+
+class NoSuchTableException(db: String, table: String) extends NoSuchItemException {
+ override def getMessage: String = s"Table $table not found in database $db"
+}
+
+class NoSuchPartitionException(
+ db: String,
+ table: String,
+ spec: TablePartitionSpec)
+ extends NoSuchItemException {
+
+ override def getMessage: String = {
+ s"Partition not found in table $table database $db:\n" + spec.mkString("\n")
+ }
+}
+
+class NoSuchFunctionException(db: String, func: String) extends NoSuchItemException {
+ override def getMessage: String = s"Function $func not found in database $db"
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 38be61c52a..cba4de34f2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -30,15 +30,16 @@ import org.apache.spark.sql.AnalysisException
class InMemoryCatalog extends Catalog {
import Catalog._
- private class TableDesc(var table: Table) {
- val partitions = new mutable.HashMap[PartitionSpec, TablePartition]
+ private class TableDesc(var table: CatalogTable) {
+ val partitions = new mutable.HashMap[TablePartitionSpec, CatalogTablePartition]
}
- private class DatabaseDesc(var db: Database) {
+ private class DatabaseDesc(var db: CatalogDatabase) {
val tables = new mutable.HashMap[String, TableDesc]
- val functions = new mutable.HashMap[String, Function]
+ val functions = new mutable.HashMap[String, CatalogFunction]
}
+ // Database name -> description
private val catalog = new scala.collection.mutable.HashMap[String, DatabaseDesc]
private def filterPattern(names: Seq[String], pattern: String): Seq[String] = {
@@ -47,39 +48,33 @@ class InMemoryCatalog extends Catalog {
}
private def existsFunction(db: String, funcName: String): Boolean = {
- assertDbExists(db)
+ requireDbExists(db)
catalog(db).functions.contains(funcName)
}
private def existsTable(db: String, table: String): Boolean = {
- assertDbExists(db)
+ requireDbExists(db)
catalog(db).tables.contains(table)
}
- private def existsPartition(db: String, table: String, spec: PartitionSpec): Boolean = {
- assertTableExists(db, table)
+ private def existsPartition(db: String, table: String, spec: TablePartitionSpec): Boolean = {
+ requireTableExists(db, table)
catalog(db).tables(table).partitions.contains(spec)
}
- private def assertDbExists(db: String): Unit = {
- if (!catalog.contains(db)) {
- throw new AnalysisException(s"Database $db does not exist")
- }
- }
-
- private def assertFunctionExists(db: String, funcName: String): Unit = {
+ private def requireFunctionExists(db: String, funcName: String): Unit = {
if (!existsFunction(db, funcName)) {
throw new AnalysisException(s"Function $funcName does not exist in $db database")
}
}
- private def assertTableExists(db: String, table: String): Unit = {
+ private def requireTableExists(db: String, table: String): Unit = {
if (!existsTable(db, table)) {
throw new AnalysisException(s"Table $table does not exist in $db database")
}
}
- private def assertPartitionExists(db: String, table: String, spec: PartitionSpec): Unit = {
+ private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = {
if (!existsPartition(db, table, spec)) {
throw new AnalysisException(s"Partition does not exist in database $db table $table: $spec")
}
@@ -90,7 +85,7 @@ class InMemoryCatalog extends Catalog {
// --------------------------------------------------------------------------
override def createDatabase(
- dbDefinition: Database,
+ dbDefinition: CatalogDatabase,
ignoreIfExists: Boolean): Unit = synchronized {
if (catalog.contains(dbDefinition.name)) {
if (!ignoreIfExists) {
@@ -124,17 +119,20 @@ class InMemoryCatalog extends Catalog {
}
}
- override def alterDatabase(db: String, dbDefinition: Database): Unit = synchronized {
- assertDbExists(db)
- assert(db == dbDefinition.name)
- catalog(db).db = dbDefinition
+ override def alterDatabase(dbDefinition: CatalogDatabase): Unit = synchronized {
+ requireDbExists(dbDefinition.name)
+ catalog(dbDefinition.name).db = dbDefinition
}
- override def getDatabase(db: String): Database = synchronized {
- assertDbExists(db)
+ override def getDatabase(db: String): CatalogDatabase = synchronized {
+ requireDbExists(db)
catalog(db).db
}
+ override def databaseExists(db: String): Boolean = synchronized {
+ catalog.contains(db)
+ }
+
override def listDatabases(): Seq[String] = synchronized {
catalog.keySet.toSeq
}
@@ -143,15 +141,17 @@ class InMemoryCatalog extends Catalog {
filterPattern(listDatabases(), pattern)
}
+ override def setCurrentDatabase(db: String): Unit = { /* no-op */ }
+
// --------------------------------------------------------------------------
// Tables
// --------------------------------------------------------------------------
override def createTable(
db: String,
- tableDefinition: Table,
+ tableDefinition: CatalogTable,
ignoreIfExists: Boolean): Unit = synchronized {
- assertDbExists(db)
+ requireDbExists(db)
if (existsTable(db, tableDefinition.name)) {
if (!ignoreIfExists) {
throw new AnalysisException(s"Table ${tableDefinition.name} already exists in $db database")
@@ -165,7 +165,7 @@ class InMemoryCatalog extends Catalog {
db: String,
table: String,
ignoreIfNotExists: Boolean): Unit = synchronized {
- assertDbExists(db)
+ requireDbExists(db)
if (existsTable(db, table)) {
catalog(db).tables.remove(table)
} else {
@@ -176,31 +176,30 @@ class InMemoryCatalog extends Catalog {
}
override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
- assertTableExists(db, oldName)
+ requireTableExists(db, oldName)
val oldDesc = catalog(db).tables(oldName)
oldDesc.table = oldDesc.table.copy(name = newName)
catalog(db).tables.put(newName, oldDesc)
catalog(db).tables.remove(oldName)
}
- override def alterTable(db: String, table: String, tableDefinition: Table): Unit = synchronized {
- assertTableExists(db, table)
- assert(table == tableDefinition.name)
- catalog(db).tables(table).table = tableDefinition
+ override def alterTable(db: String, tableDefinition: CatalogTable): Unit = synchronized {
+ requireTableExists(db, tableDefinition.name)
+ catalog(db).tables(tableDefinition.name).table = tableDefinition
}
- override def getTable(db: String, table: String): Table = synchronized {
- assertTableExists(db, table)
+ override def getTable(db: String, table: String): CatalogTable = synchronized {
+ requireTableExists(db, table)
catalog(db).tables(table).table
}
override def listTables(db: String): Seq[String] = synchronized {
- assertDbExists(db)
+ requireDbExists(db)
catalog(db).tables.keySet.toSeq
}
override def listTables(db: String, pattern: String): Seq[String] = synchronized {
- assertDbExists(db)
+ requireDbExists(db)
filterPattern(listTables(db), pattern)
}
@@ -211,9 +210,9 @@ class InMemoryCatalog extends Catalog {
override def createPartitions(
db: String,
table: String,
- parts: Seq[TablePartition],
+ parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = synchronized {
- assertTableExists(db, table)
+ requireTableExists(db, table)
val existingParts = catalog(db).tables(table).partitions
if (!ignoreIfExists) {
val dupSpecs = parts.collect { case p if existingParts.contains(p.spec) => p.spec }
@@ -229,9 +228,9 @@ class InMemoryCatalog extends Catalog {
override def dropPartitions(
db: String,
table: String,
- partSpecs: Seq[PartitionSpec],
+ partSpecs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean): Unit = synchronized {
- assertTableExists(db, table)
+ requireTableExists(db, table)
val existingParts = catalog(db).tables(table).partitions
if (!ignoreIfNotExists) {
val missingSpecs = partSpecs.collect { case s if !existingParts.contains(s) => s }
@@ -244,30 +243,42 @@ class InMemoryCatalog extends Catalog {
partSpecs.foreach(existingParts.remove)
}
- override def alterPartition(
+ override def renamePartitions(
db: String,
table: String,
- spec: Map[String, String],
- newPart: TablePartition): Unit = synchronized {
- assertPartitionExists(db, table, spec)
- val existingParts = catalog(db).tables(table).partitions
- if (spec != newPart.spec) {
- // Also a change in specs; remove the old one and add the new one back
- existingParts.remove(spec)
+ specs: Seq[TablePartitionSpec],
+ newSpecs: Seq[TablePartitionSpec]): Unit = synchronized {
+ require(specs.size == newSpecs.size, "number of old and new partition specs differ")
+ specs.zip(newSpecs).foreach { case (oldSpec, newSpec) =>
+ val newPart = getPartition(db, table, oldSpec).copy(spec = newSpec)
+ val existingParts = catalog(db).tables(table).partitions
+ existingParts.remove(oldSpec)
+ existingParts.put(newSpec, newPart)
+ }
+ }
+
+ override def alterPartitions(
+ db: String,
+ table: String,
+ parts: Seq[CatalogTablePartition]): Unit = synchronized {
+ parts.foreach { p =>
+ requirePartitionExists(db, table, p.spec)
+ catalog(db).tables(table).partitions.put(p.spec, p)
}
- existingParts.put(newPart.spec, newPart)
}
override def getPartition(
db: String,
table: String,
- spec: Map[String, String]): TablePartition = synchronized {
- assertPartitionExists(db, table, spec)
+ spec: TablePartitionSpec): CatalogTablePartition = synchronized {
+ requirePartitionExists(db, table, spec)
catalog(db).tables(table).partitions(spec)
}
- override def listPartitions(db: String, table: String): Seq[TablePartition] = synchronized {
- assertTableExists(db, table)
+ override def listPartitions(
+ db: String,
+ table: String): Seq[CatalogTablePartition] = synchronized {
+ requireTableExists(db, table)
catalog(db).tables(table).partitions.values.toSeq
}
@@ -275,44 +286,39 @@ class InMemoryCatalog extends Catalog {
// Functions
// --------------------------------------------------------------------------
- override def createFunction(
- db: String,
- func: Function,
- ignoreIfExists: Boolean): Unit = synchronized {
- assertDbExists(db)
+ override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
+ requireDbExists(db)
if (existsFunction(db, func.name)) {
- if (!ignoreIfExists) {
- throw new AnalysisException(s"Function $func already exists in $db database")
- }
+ throw new AnalysisException(s"Function $func already exists in $db database")
} else {
catalog(db).functions.put(func.name, func)
}
}
override def dropFunction(db: String, funcName: String): Unit = synchronized {
- assertFunctionExists(db, funcName)
+ requireFunctionExists(db, funcName)
catalog(db).functions.remove(funcName)
}
- override def alterFunction(
- db: String,
- funcName: String,
- funcDefinition: Function): Unit = synchronized {
- assertFunctionExists(db, funcName)
- if (funcName != funcDefinition.name) {
- // Also a rename; remove the old one and add the new one back
- catalog(db).functions.remove(funcName)
- }
+ override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized {
+ requireFunctionExists(db, oldName)
+ val newFunc = getFunction(db, oldName).copy(name = newName)
+ catalog(db).functions.remove(oldName)
+ catalog(db).functions.put(newName, newFunc)
+ }
+
+ override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = synchronized {
+ requireFunctionExists(db, funcDefinition.name)
catalog(db).functions.put(funcDefinition.name, funcDefinition)
}
- override def getFunction(db: String, funcName: String): Function = synchronized {
- assertFunctionExists(db, funcName)
+ override def getFunction(db: String, funcName: String): CatalogFunction = synchronized {
+ requireFunctionExists(db, funcName)
catalog(db).functions(funcName)
}
override def listFunctions(db: String, pattern: String): Seq[String] = synchronized {
- assertDbExists(db)
+ requireDbExists(db)
filterPattern(catalog(db).functions.keysIterator.toSeq, pattern)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 56aaa6bc6c..dac5f023d1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.catalog
+import javax.annotation.Nullable
+
import org.apache.spark.sql.AnalysisException
@@ -31,41 +33,59 @@ import org.apache.spark.sql.AnalysisException
abstract class Catalog {
import Catalog._
+ protected def requireDbExists(db: String): Unit = {
+ if (!databaseExists(db)) {
+ throw new AnalysisException(s"Database $db does not exist")
+ }
+ }
+
// --------------------------------------------------------------------------
// Databases
// --------------------------------------------------------------------------
- def createDatabase(dbDefinition: Database, ignoreIfExists: Boolean): Unit
+ def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit
def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
/**
- * Alter an existing database. This operation does not support renaming.
+ * Alter a database whose name matches the one specified in `dbDefinition`,
+ * assuming the database exists.
+ *
+ * Note: If the underlying implementation does not support altering a certain field,
+ * this becomes a no-op.
*/
- def alterDatabase(db: String, dbDefinition: Database): Unit
+ def alterDatabase(dbDefinition: CatalogDatabase): Unit
- def getDatabase(db: String): Database
+ def getDatabase(db: String): CatalogDatabase
+
+ def databaseExists(db: String): Boolean
def listDatabases(): Seq[String]
def listDatabases(pattern: String): Seq[String]
+ def setCurrentDatabase(db: String): Unit
+
// --------------------------------------------------------------------------
// Tables
// --------------------------------------------------------------------------
- def createTable(db: String, tableDefinition: Table, ignoreIfExists: Boolean): Unit
+ def createTable(db: String, tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
def dropTable(db: String, table: String, ignoreIfNotExists: Boolean): Unit
def renameTable(db: String, oldName: String, newName: String): Unit
/**
- * Alter an existing table. This operation does not support renaming.
+ * Alter a table whose name that matches the one specified in `tableDefinition`,
+ * assuming the table exists.
+ *
+ * Note: If the underlying implementation does not support altering a certain field,
+ * this becomes a no-op.
*/
- def alterTable(db: String, table: String, tableDefinition: Table): Unit
+ def alterTable(db: String, tableDefinition: CatalogTable): Unit
- def getTable(db: String, table: String): Table
+ def getTable(db: String, table: String): CatalogTable
def listTables(db: String): Seq[String]
@@ -78,43 +98,62 @@ abstract class Catalog {
def createPartitions(
db: String,
table: String,
- parts: Seq[TablePartition],
+ parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit
def dropPartitions(
db: String,
table: String,
- parts: Seq[PartitionSpec],
+ parts: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean): Unit
/**
- * Alter an existing table partition and optionally override its spec.
+ * Override the specs of one or many existing table partitions, assuming they exist.
+ * This assumes index i of `specs` corresponds to index i of `newSpecs`.
+ */
+ def renamePartitions(
+ db: String,
+ table: String,
+ specs: Seq[TablePartitionSpec],
+ newSpecs: Seq[TablePartitionSpec]): Unit
+
+ /**
+ * Alter one or many table partitions whose specs that match those specified in `parts`,
+ * assuming the partitions exist.
+ *
+ * Note: If the underlying implementation does not support altering a certain field,
+ * this becomes a no-op.
*/
- def alterPartition(
+ def alterPartitions(
db: String,
table: String,
- spec: PartitionSpec,
- newPart: TablePartition): Unit
+ parts: Seq[CatalogTablePartition]): Unit
- def getPartition(db: String, table: String, spec: PartitionSpec): TablePartition
+ def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition
// TODO: support listing by pattern
- def listPartitions(db: String, table: String): Seq[TablePartition]
+ def listPartitions(db: String, table: String): Seq[CatalogTablePartition]
// --------------------------------------------------------------------------
// Functions
// --------------------------------------------------------------------------
- def createFunction(db: String, funcDefinition: Function, ignoreIfExists: Boolean): Unit
+ def createFunction(db: String, funcDefinition: CatalogFunction): Unit
def dropFunction(db: String, funcName: String): Unit
+ def renameFunction(db: String, oldName: String, newName: String): Unit
+
/**
- * Alter an existing function and optionally override its name.
+ * Alter a function whose name that matches the one specified in `funcDefinition`,
+ * assuming the function exists.
+ *
+ * Note: If the underlying implementation does not support altering a certain field,
+ * this becomes a no-op.
*/
- def alterFunction(db: String, funcName: String, funcDefinition: Function): Unit
+ def alterFunction(db: String, funcDefinition: CatalogFunction): Unit
- def getFunction(db: String, funcName: String): Function
+ def getFunction(db: String, funcName: String): CatalogFunction
def listFunctions(db: String, pattern: String): Seq[String]
@@ -127,33 +166,30 @@ abstract class Catalog {
* @param name name of the function
* @param className fully qualified class name, e.g. "org.apache.spark.util.MyFunc"
*/
-case class Function(
- name: String,
- className: String
-)
+case class CatalogFunction(name: String, className: String)
/**
* Storage format, used to describe how a partition or a table is stored.
*/
-case class StorageFormat(
- locationUri: String,
- inputFormat: String,
- outputFormat: String,
- serde: String,
- serdeProperties: Map[String, String]
-)
+case class CatalogStorageFormat(
+ locationUri: Option[String],
+ inputFormat: Option[String],
+ outputFormat: Option[String],
+ serde: Option[String],
+ serdeProperties: Map[String, String])
/**
* A column in a table.
*/
-case class Column(
- name: String,
- dataType: String,
- nullable: Boolean,
- comment: String
-)
+case class CatalogColumn(
+ name: String,
+ // This may be null when used to create views. TODO: make this type-safe; this is left
+ // as a string due to issues in converting Hive varchars to and from SparkSQL strings.
+ @Nullable dataType: String,
+ nullable: Boolean = true,
+ comment: Option[String] = None)
/**
@@ -162,10 +198,7 @@ case class Column(
* @param spec partition spec values indexed by column name
* @param storage storage format of the partition
*/
-case class TablePartition(
- spec: Catalog.PartitionSpec,
- storage: StorageFormat
-)
+case class CatalogTablePartition(spec: Catalog.TablePartitionSpec, storage: CatalogStorageFormat)
/**
@@ -174,40 +207,65 @@ case class TablePartition(
* Note that Hive's metastore also tracks skewed columns. We should consider adding that in the
* future once we have a better understanding of how we want to handle skewed columns.
*/
-case class Table(
- name: String,
- description: String,
- schema: Seq[Column],
- partitionColumns: Seq[Column],
- sortColumns: Seq[Column],
- storage: StorageFormat,
- numBuckets: Int,
- properties: Map[String, String],
- tableType: String,
- createTime: Long,
- lastAccessTime: Long,
- viewOriginalText: Option[String],
- viewText: Option[String]) {
-
- require(tableType == "EXTERNAL_TABLE" || tableType == "INDEX_TABLE" ||
- tableType == "MANAGED_TABLE" || tableType == "VIRTUAL_VIEW")
+case class CatalogTable(
+ specifiedDatabase: Option[String],
+ name: String,
+ tableType: CatalogTableType,
+ storage: CatalogStorageFormat,
+ schema: Seq[CatalogColumn],
+ partitionColumns: Seq[CatalogColumn] = Seq.empty,
+ sortColumns: Seq[CatalogColumn] = Seq.empty,
+ numBuckets: Int = 0,
+ createTime: Long = System.currentTimeMillis,
+ lastAccessTime: Long = System.currentTimeMillis,
+ properties: Map[String, String] = Map.empty,
+ viewOriginalText: Option[String] = None,
+ viewText: Option[String] = None) {
+
+ /** Return the database this table was specified to belong to, assuming it exists. */
+ def database: String = specifiedDatabase.getOrElse {
+ throw new AnalysisException(s"table $name did not specify database")
+ }
+
+ /** Return the fully qualified name of this table, assuming the database was specified. */
+ def qualifiedName: String = s"$database.$name"
+
+ /** Syntactic sugar to update a field in `storage`. */
+ def withNewStorage(
+ locationUri: Option[String] = storage.locationUri,
+ inputFormat: Option[String] = storage.inputFormat,
+ outputFormat: Option[String] = storage.outputFormat,
+ serde: Option[String] = storage.serde,
+ serdeProperties: Map[String, String] = storage.serdeProperties): CatalogTable = {
+ copy(storage = CatalogStorageFormat(
+ locationUri, inputFormat, outputFormat, serde, serdeProperties))
+ }
+
+}
+
+
+case class CatalogTableType private(name: String)
+object CatalogTableType {
+ val EXTERNAL_TABLE = new CatalogTableType("EXTERNAL_TABLE")
+ val MANAGED_TABLE = new CatalogTableType("MANAGED_TABLE")
+ val INDEX_TABLE = new CatalogTableType("INDEX_TABLE")
+ val VIRTUAL_VIEW = new CatalogTableType("VIRTUAL_VIEW")
}
/**
* A database defined in the catalog.
*/
-case class Database(
- name: String,
- description: String,
- locationUri: String,
- properties: Map[String, String]
-)
+case class CatalogDatabase(
+ name: String,
+ description: String,
+ locationUri: String,
+ properties: Map[String, String])
object Catalog {
/**
- * Specifications of a table partition indexed by column name.
+ * Specifications of a table partition. Mapping column name to column value.
*/
- type PartitionSpec = Map[String, String]
+ type TablePartitionSpec = Map[String, String]
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
index 45c5ceecb0..e0d1220d13 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.catalog
+import org.scalatest.BeforeAndAfterEach
+
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
@@ -26,18 +28,38 @@ import org.apache.spark.sql.AnalysisException
*
* Implementations of the [[Catalog]] interface can create test suites by extending this.
*/
-abstract class CatalogTestCases extends SparkFunSuite {
- private val storageFormat = StorageFormat("usa", "$", "zzz", "serde", Map())
- private val part1 = TablePartition(Map("a" -> "1"), storageFormat)
- private val part2 = TablePartition(Map("b" -> "2"), storageFormat)
- private val part3 = TablePartition(Map("c" -> "3"), storageFormat)
+abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
+ private lazy val storageFormat = CatalogStorageFormat(
+ locationUri = None,
+ inputFormat = Some(tableInputFormat),
+ outputFormat = Some(tableOutputFormat),
+ serde = None,
+ serdeProperties = Map.empty)
+ private lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat)
+ private lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat)
+ private lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), storageFormat)
private val funcClass = "org.apache.spark.myFunc"
+ // Things subclasses should override
+ protected val tableInputFormat: String = "org.apache.park.serde.MyInputFormat"
+ protected val tableOutputFormat: String = "org.apache.park.serde.MyOutputFormat"
+ protected def newUriForDatabase(): String = "uri"
+ protected def resetState(): Unit = { }
protected def newEmptyCatalog(): Catalog
+ // Clear all state after each test
+ override def afterEach(): Unit = {
+ try {
+ resetState()
+ } finally {
+ super.afterEach()
+ }
+ }
+
/**
* Creates a basic catalog, with the following structure:
*
+ * default
* db1
* db2
* - tbl1
@@ -48,37 +70,65 @@ abstract class CatalogTestCases extends SparkFunSuite {
*/
private def newBasicCatalog(): Catalog = {
val catalog = newEmptyCatalog()
+ // When testing against a real catalog, the default database may already exist
+ catalog.createDatabase(newDb("default"), ignoreIfExists = true)
catalog.createDatabase(newDb("db1"), ignoreIfExists = false)
catalog.createDatabase(newDb("db2"), ignoreIfExists = false)
- catalog.createTable("db2", newTable("tbl1"), ignoreIfExists = false)
- catalog.createTable("db2", newTable("tbl2"), ignoreIfExists = false)
+ catalog.createTable("db2", newTable("tbl1", "db2"), ignoreIfExists = false)
+ catalog.createTable("db2", newTable("tbl2", "db2"), ignoreIfExists = false)
catalog.createPartitions("db2", "tbl2", Seq(part1, part2), ignoreIfExists = false)
- catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = false)
+ catalog.createFunction("db2", newFunc("func1"))
catalog
}
- private def newFunc(): Function = Function("funcname", funcClass)
+ private def newFunc(): CatalogFunction = CatalogFunction("funcname", funcClass)
+
+ private def newDb(name: String): CatalogDatabase = {
+ CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty)
+ }
+
+ private def newTable(name: String, db: String): CatalogTable = {
+ CatalogTable(
+ specifiedDatabase = Some(db),
+ name = name,
+ tableType = CatalogTableType.EXTERNAL_TABLE,
+ storage = storageFormat,
+ schema = Seq(CatalogColumn("col1", "int"), CatalogColumn("col2", "string")),
+ partitionColumns = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string")))
+ }
- private def newDb(name: String = "default"): Database =
- Database(name, name + " description", "uri", Map.empty)
+ private def newFunc(name: String): CatalogFunction = CatalogFunction(name, funcClass)
- private def newTable(name: String): Table =
- Table(name, "", Seq.empty, Seq.empty, Seq.empty, null, 0, Map.empty, "EXTERNAL_TABLE", 0, 0,
- None, None)
+ /**
+ * Whether the catalog's table partitions equal the ones given.
+ * Note: Hive sets some random serde things, so we just compare the specs here.
+ */
+ private def catalogPartitionsEqual(
+ catalog: Catalog,
+ db: String,
+ table: String,
+ parts: Seq[CatalogTablePartition]): Boolean = {
+ catalog.listPartitions(db, table).map(_.spec).toSet == parts.map(_.spec).toSet
+ }
- private def newFunc(name: String): Function = Function(name, funcClass)
// --------------------------------------------------------------------------
// Databases
// --------------------------------------------------------------------------
- test("basic create, drop and list databases") {
+ test("basic create and list databases") {
val catalog = newEmptyCatalog()
- catalog.createDatabase(newDb(), ignoreIfExists = false)
- assert(catalog.listDatabases().toSet == Set("default"))
-
- catalog.createDatabase(newDb("default2"), ignoreIfExists = false)
- assert(catalog.listDatabases().toSet == Set("default", "default2"))
+ catalog.createDatabase(newDb("default"), ignoreIfExists = true)
+ assert(catalog.databaseExists("default"))
+ assert(!catalog.databaseExists("testing"))
+ assert(!catalog.databaseExists("testing2"))
+ catalog.createDatabase(newDb("testing"), ignoreIfExists = false)
+ assert(catalog.databaseExists("testing"))
+ assert(catalog.listDatabases().toSet == Set("default", "testing"))
+ catalog.createDatabase(newDb("testing2"), ignoreIfExists = false)
+ assert(catalog.listDatabases().toSet == Set("default", "testing", "testing2"))
+ assert(catalog.databaseExists("testing2"))
+ assert(!catalog.databaseExists("does_not_exist"))
}
test("get database when a database exists") {
@@ -93,7 +143,7 @@ abstract class CatalogTestCases extends SparkFunSuite {
test("list databases without pattern") {
val catalog = newBasicCatalog()
- assert(catalog.listDatabases().toSet == Set("db1", "db2"))
+ assert(catalog.listDatabases().toSet == Set("default", "db1", "db2"))
}
test("list databases with pattern") {
@@ -107,7 +157,7 @@ abstract class CatalogTestCases extends SparkFunSuite {
test("drop database") {
val catalog = newBasicCatalog()
catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = false)
- assert(catalog.listDatabases().toSet == Set("db2"))
+ assert(catalog.listDatabases().toSet == Set("default", "db2"))
}
test("drop database when the database is not empty") {
@@ -118,6 +168,7 @@ abstract class CatalogTestCases extends SparkFunSuite {
intercept[AnalysisException] {
catalog1.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
}
+ resetState()
// Throw exception if there are tables left
val catalog2 = newBasicCatalog()
@@ -125,11 +176,12 @@ abstract class CatalogTestCases extends SparkFunSuite {
intercept[AnalysisException] {
catalog2.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
}
+ resetState()
// When cascade is true, it should drop them
val catalog3 = newBasicCatalog()
catalog3.dropDatabase("db2", ignoreIfNotExists = false, cascade = true)
- assert(catalog3.listDatabases().toSet == Set("db1"))
+ assert(catalog3.listDatabases().toSet == Set("default", "db1"))
}
test("drop database when the database does not exist") {
@@ -144,13 +196,19 @@ abstract class CatalogTestCases extends SparkFunSuite {
test("alter database") {
val catalog = newBasicCatalog()
- catalog.alterDatabase("db1", Database("db1", "new description", "lll", Map.empty))
- assert(catalog.getDatabase("db1").description == "new description")
+ val db1 = catalog.getDatabase("db1")
+ // Note: alter properties here because Hive does not support altering other fields
+ catalog.alterDatabase(db1.copy(properties = Map("k" -> "v3", "good" -> "true")))
+ val newDb1 = catalog.getDatabase("db1")
+ assert(db1.properties.isEmpty)
+ assert(newDb1.properties.size == 2)
+ assert(newDb1.properties.get("k") == Some("v3"))
+ assert(newDb1.properties.get("good") == Some("true"))
}
test("alter database should throw exception when the database does not exist") {
intercept[AnalysisException] {
- newBasicCatalog().alterDatabase("no_db", Database("no_db", "ddd", "lll", Map.empty))
+ newBasicCatalog().alterDatabase(newDb("does_not_exist"))
}
}
@@ -165,61 +223,56 @@ abstract class CatalogTestCases extends SparkFunSuite {
assert(catalog.listTables("db2").toSet == Set("tbl2"))
}
- test("drop table when database / table does not exist") {
+ test("drop table when database/table does not exist") {
val catalog = newBasicCatalog()
-
// Should always throw exception when the database does not exist
intercept[AnalysisException] {
catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = false)
}
-
intercept[AnalysisException] {
catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = true)
}
-
// Should throw exception when the table does not exist, if ignoreIfNotExists is false
intercept[AnalysisException] {
catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = false)
}
-
catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = true)
}
test("rename table") {
val catalog = newBasicCatalog()
-
assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
catalog.renameTable("db2", "tbl1", "tblone")
assert(catalog.listTables("db2").toSet == Set("tblone", "tbl2"))
}
- test("rename table when database / table does not exist") {
+ test("rename table when database/table does not exist") {
val catalog = newBasicCatalog()
-
- intercept[AnalysisException] { // Throw exception when the database does not exist
+ intercept[AnalysisException] {
catalog.renameTable("unknown_db", "unknown_table", "unknown_table")
}
-
- intercept[AnalysisException] { // Throw exception when the table does not exist
+ intercept[AnalysisException] {
catalog.renameTable("db2", "unknown_table", "unknown_table")
}
}
test("alter table") {
val catalog = newBasicCatalog()
- catalog.alterTable("db2", "tbl1", newTable("tbl1").copy(createTime = 10))
- assert(catalog.getTable("db2", "tbl1").createTime == 10)
+ val tbl1 = catalog.getTable("db2", "tbl1")
+ catalog.alterTable("db2", tbl1.copy(properties = Map("toh" -> "frem")))
+ val newTbl1 = catalog.getTable("db2", "tbl1")
+ assert(!tbl1.properties.contains("toh"))
+ assert(newTbl1.properties.size == tbl1.properties.size + 1)
+ assert(newTbl1.properties.get("toh") == Some("frem"))
}
- test("alter table when database / table does not exist") {
+ test("alter table when database/table does not exist") {
val catalog = newBasicCatalog()
-
- intercept[AnalysisException] { // Throw exception when the database does not exist
- catalog.alterTable("unknown_db", "unknown_table", newTable("unknown_table"))
+ intercept[AnalysisException] {
+ catalog.alterTable("unknown_db", newTable("tbl1", "unknown_db"))
}
-
- intercept[AnalysisException] { // Throw exception when the table does not exist
- catalog.alterTable("db2", "unknown_table", newTable("unknown_table"))
+ intercept[AnalysisException] {
+ catalog.alterTable("db2", newTable("unknown_table", "db2"))
}
}
@@ -227,12 +280,11 @@ abstract class CatalogTestCases extends SparkFunSuite {
assert(newBasicCatalog().getTable("db2", "tbl1").name == "tbl1")
}
- test("get table when database / table does not exist") {
+ test("get table when database/table does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.getTable("unknown_db", "unknown_table")
}
-
intercept[AnalysisException] {
catalog.getTable("db2", "unknown_table")
}
@@ -246,10 +298,7 @@ abstract class CatalogTestCases extends SparkFunSuite {
test("list tables with pattern") {
val catalog = newBasicCatalog()
-
- // Test when database does not exist
intercept[AnalysisException] { catalog.listTables("unknown_db") }
-
assert(catalog.listTables("db1", "*").toSet == Set.empty)
assert(catalog.listTables("db2", "*").toSet == Set("tbl1", "tbl2"))
assert(catalog.listTables("db2", "tbl*").toSet == Set("tbl1", "tbl2"))
@@ -263,12 +312,12 @@ abstract class CatalogTestCases extends SparkFunSuite {
test("basic create and list partitions") {
val catalog = newEmptyCatalog()
catalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
- catalog.createTable("mydb", newTable("mytbl"), ignoreIfExists = false)
- catalog.createPartitions("mydb", "mytbl", Seq(part1, part2), ignoreIfExists = false)
- assert(catalog.listPartitions("mydb", "mytbl").toSet == Set(part1, part2))
+ catalog.createTable("mydb", newTable("tbl", "mydb"), ignoreIfExists = false)
+ catalog.createPartitions("mydb", "tbl", Seq(part1, part2), ignoreIfExists = false)
+ assert(catalogPartitionsEqual(catalog, "mydb", "tbl", Seq(part1, part2)))
}
- test("create partitions when database / table does not exist") {
+ test("create partitions when database/table does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.createPartitions("does_not_exist", "tbl1", Seq(), ignoreIfExists = false)
@@ -288,16 +337,17 @@ abstract class CatalogTestCases extends SparkFunSuite {
test("drop partitions") {
val catalog = newBasicCatalog()
- assert(catalog.listPartitions("db2", "tbl2").toSet == Set(part1, part2))
+ assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2)))
catalog.dropPartitions("db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false)
- assert(catalog.listPartitions("db2", "tbl2").toSet == Set(part2))
+ assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2)))
+ resetState()
val catalog2 = newBasicCatalog()
- assert(catalog2.listPartitions("db2", "tbl2").toSet == Set(part1, part2))
+ assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2)))
catalog2.dropPartitions("db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false)
assert(catalog2.listPartitions("db2", "tbl2").isEmpty)
}
- test("drop partitions when database / table does not exist") {
+ test("drop partitions when database/table does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.dropPartitions("does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false)
@@ -317,14 +367,14 @@ abstract class CatalogTestCases extends SparkFunSuite {
test("get partition") {
val catalog = newBasicCatalog()
- assert(catalog.getPartition("db2", "tbl2", part1.spec) == part1)
- assert(catalog.getPartition("db2", "tbl2", part2.spec) == part2)
+ assert(catalog.getPartition("db2", "tbl2", part1.spec).spec == part1.spec)
+ assert(catalog.getPartition("db2", "tbl2", part2.spec).spec == part2.spec)
intercept[AnalysisException] {
catalog.getPartition("db2", "tbl1", part3.spec)
}
}
- test("get partition when database / table does not exist") {
+ test("get partition when database/table does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.getPartition("does_not_exist", "tbl1", part1.spec)
@@ -334,28 +384,69 @@ abstract class CatalogTestCases extends SparkFunSuite {
}
}
- test("alter partitions") {
+ test("rename partitions") {
+ val catalog = newBasicCatalog()
+ val newPart1 = part1.copy(spec = Map("a" -> "100", "b" -> "101"))
+ val newPart2 = part2.copy(spec = Map("a" -> "200", "b" -> "201"))
+ val newSpecs = Seq(newPart1.spec, newPart2.spec)
+ catalog.renamePartitions("db2", "tbl2", Seq(part1.spec, part2.spec), newSpecs)
+ assert(catalog.getPartition("db2", "tbl2", newPart1.spec).spec === newPart1.spec)
+ assert(catalog.getPartition("db2", "tbl2", newPart2.spec).spec === newPart2.spec)
+ // The old partitions should no longer exist
+ intercept[AnalysisException] { catalog.getPartition("db2", "tbl2", part1.spec) }
+ intercept[AnalysisException] { catalog.getPartition("db2", "tbl2", part2.spec) }
+ }
+
+ test("rename partitions when database/table does not exist") {
val catalog = newBasicCatalog()
- val partSameSpec = part1.copy(storage = storageFormat.copy(serde = "myserde"))
- val partNewSpec = part1.copy(spec = Map("x" -> "10"))
- // alter but keep spec the same
- catalog.alterPartition("db2", "tbl2", part1.spec, partSameSpec)
- assert(catalog.getPartition("db2", "tbl2", part1.spec) == partSameSpec)
- // alter and change spec
- catalog.alterPartition("db2", "tbl2", part1.spec, partNewSpec)
intercept[AnalysisException] {
- catalog.getPartition("db2", "tbl2", part1.spec)
+ catalog.renamePartitions("does_not_exist", "tbl1", Seq(part1.spec), Seq(part2.spec))
+ }
+ intercept[AnalysisException] {
+ catalog.renamePartitions("db2", "does_not_exist", Seq(part1.spec), Seq(part2.spec))
}
- assert(catalog.getPartition("db2", "tbl2", partNewSpec.spec) == partNewSpec)
}
- test("alter partition when database / table does not exist") {
+ test("alter partitions") {
+ val catalog = newBasicCatalog()
+ try{
+ // Note: Before altering table partitions in Hive, you *must* set the current database
+ // to the one that contains the table of interest. Otherwise you will end up with the
+ // most helpful error message ever: "Unable to alter partition. alter is not possible."
+ // See HIVE-2742 for more detail.
+ catalog.setCurrentDatabase("db2")
+ val newLocation = newUriForDatabase()
+ // alter but keep spec the same
+ val oldPart1 = catalog.getPartition("db2", "tbl2", part1.spec)
+ val oldPart2 = catalog.getPartition("db2", "tbl2", part2.spec)
+ catalog.alterPartitions("db2", "tbl2", Seq(
+ oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocation))),
+ oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocation)))))
+ val newPart1 = catalog.getPartition("db2", "tbl2", part1.spec)
+ val newPart2 = catalog.getPartition("db2", "tbl2", part2.spec)
+ assert(newPart1.storage.locationUri == Some(newLocation))
+ assert(newPart2.storage.locationUri == Some(newLocation))
+ assert(oldPart1.storage.locationUri != Some(newLocation))
+ assert(oldPart2.storage.locationUri != Some(newLocation))
+ // alter but change spec, should fail because new partition specs do not exist yet
+ val badPart1 = part1.copy(spec = Map("a" -> "v1", "b" -> "v2"))
+ val badPart2 = part2.copy(spec = Map("a" -> "v3", "b" -> "v4"))
+ intercept[AnalysisException] {
+ catalog.alterPartitions("db2", "tbl2", Seq(badPart1, badPart2))
+ }
+ } finally {
+ // Remember to restore the original current database, which we assume to be "default"
+ catalog.setCurrentDatabase("default")
+ }
+ }
+
+ test("alter partitions when database/table does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
- catalog.alterPartition("does_not_exist", "tbl1", part1.spec, part1)
+ catalog.alterPartitions("does_not_exist", "tbl1", Seq(part1))
}
intercept[AnalysisException] {
- catalog.alterPartition("db2", "does_not_exist", part1.spec, part1)
+ catalog.alterPartitions("db2", "does_not_exist", Seq(part1))
}
}
@@ -366,23 +457,22 @@ abstract class CatalogTestCases extends SparkFunSuite {
test("basic create and list functions") {
val catalog = newEmptyCatalog()
catalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
- catalog.createFunction("mydb", newFunc("myfunc"), ignoreIfExists = false)
+ catalog.createFunction("mydb", newFunc("myfunc"))
assert(catalog.listFunctions("mydb", "*").toSet == Set("myfunc"))
}
test("create function when database does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
- catalog.createFunction("does_not_exist", newFunc(), ignoreIfExists = false)
+ catalog.createFunction("does_not_exist", newFunc())
}
}
test("create function that already exists") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
- catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = false)
+ catalog.createFunction("db2", newFunc("func1"))
}
- catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = true)
}
test("drop function") {
@@ -421,31 +511,43 @@ abstract class CatalogTestCases extends SparkFunSuite {
}
}
- test("alter function") {
+ test("rename function") {
val catalog = newBasicCatalog()
+ val newName = "funcky"
assert(catalog.getFunction("db2", "func1").className == funcClass)
- // alter func but keep name
- catalog.alterFunction("db2", "func1", newFunc("func1").copy(className = "muhaha"))
- assert(catalog.getFunction("db2", "func1").className == "muhaha")
- // alter func and change name
- catalog.alterFunction("db2", "func1", newFunc("funcky"))
+ catalog.renameFunction("db2", "func1", newName)
+ intercept[AnalysisException] { catalog.getFunction("db2", "func1") }
+ assert(catalog.getFunction("db2", newName).name == newName)
+ assert(catalog.getFunction("db2", newName).className == funcClass)
+ intercept[AnalysisException] { catalog.renameFunction("db2", "does_not_exist", "me") }
+ }
+
+ test("rename function when database does not exist") {
+ val catalog = newBasicCatalog()
intercept[AnalysisException] {
- catalog.getFunction("db2", "func1")
+ catalog.renameFunction("does_not_exist", "func1", "func5")
}
- assert(catalog.getFunction("db2", "funcky").className == funcClass)
+ }
+
+ test("alter function") {
+ val catalog = newBasicCatalog()
+ assert(catalog.getFunction("db2", "func1").className == funcClass)
+ catalog.alterFunction("db2", newFunc("func1").copy(className = "muhaha"))
+ assert(catalog.getFunction("db2", "func1").className == "muhaha")
+ intercept[AnalysisException] { catalog.alterFunction("db2", newFunc("funcky")) }
}
test("alter function when database does not exist") {
val catalog = newBasicCatalog()
intercept[AnalysisException] {
- catalog.alterFunction("does_not_exist", "func1", newFunc())
+ catalog.alterFunction("does_not_exist", newFunc())
}
}
test("list functions") {
val catalog = newBasicCatalog()
- catalog.createFunction("db2", newFunc("func2"), ignoreIfExists = false)
- catalog.createFunction("db2", newFunc("not_me"), ignoreIfExists = false)
+ catalog.createFunction("db2", newFunc("func2"))
+ catalog.createFunction("db2", newFunc("not_me"))
assert(catalog.listFunctions("db2", "*").toSet == Set("func1", "func2", "not_me"))
assert(catalog.listFunctions("db2", "func*").toSet == Set("func1", "func2"))
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
new file mode 100644
index 0000000000..21b9cfb820
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.hive.ql.metadata.HiveException
+import org.apache.thrift.TException
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.NoSuchItemException
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.hive.client.HiveClient
+
+
+/**
+ * A persistent implementation of the system catalog using Hive.
+ * All public methods must be synchronized for thread-safety.
+ */
+private[spark] class HiveCatalog(client: HiveClient) extends Catalog with Logging {
+ import Catalog._
+
+ // Exceptions thrown by the hive client that we would like to wrap
+ private val clientExceptions = Set(
+ classOf[HiveException].getCanonicalName,
+ classOf[TException].getCanonicalName)
+
+ /**
+ * Whether this is an exception thrown by the hive client that should be wrapped.
+ *
+ * Due to classloader isolation issues, pattern matching won't work here so we need
+ * to compare the canonical names of the exceptions, which we assume to be stable.
+ */
+ private def isClientException(e: Throwable): Boolean = {
+ var temp: Class[_] = e.getClass
+ var found = false
+ while (temp != null && !found) {
+ found = clientExceptions.contains(temp.getCanonicalName)
+ temp = temp.getSuperclass
+ }
+ found
+ }
+
+ /**
+ * Run some code involving `client` in a [[synchronized]] block and wrap certain
+ * exceptions thrown in the process in [[AnalysisException]].
+ */
+ private def withClient[T](body: => T): T = synchronized {
+ try {
+ body
+ } catch {
+ case e: NoSuchItemException =>
+ throw new AnalysisException(e.getMessage)
+ case NonFatal(e) if isClientException(e) =>
+ throw new AnalysisException(e.getClass.getCanonicalName + ": " + e.getMessage)
+ }
+ }
+
+ private def requireDbMatches(db: String, table: CatalogTable): Unit = {
+ if (table.specifiedDatabase != Some(db)) {
+ throw new AnalysisException(
+ s"Provided database $db does not much the one specified in the " +
+ s"table definition (${table.specifiedDatabase.getOrElse("n/a")})")
+ }
+ }
+
+ private def requireTableExists(db: String, table: String): Unit = {
+ withClient { getTable(db, table) }
+ }
+
+
+ // --------------------------------------------------------------------------
+ // Databases
+ // --------------------------------------------------------------------------
+
+ override def createDatabase(
+ dbDefinition: CatalogDatabase,
+ ignoreIfExists: Boolean): Unit = withClient {
+ client.createDatabase(dbDefinition, ignoreIfExists)
+ }
+
+ override def dropDatabase(
+ db: String,
+ ignoreIfNotExists: Boolean,
+ cascade: Boolean): Unit = withClient {
+ client.dropDatabase(db, ignoreIfNotExists, cascade)
+ }
+
+ /**
+ * Alter a database whose name matches the one specified in `dbDefinition`,
+ * assuming the database exists.
+ *
+ * Note: As of now, this only supports altering database properties!
+ */
+ override def alterDatabase(dbDefinition: CatalogDatabase): Unit = withClient {
+ val existingDb = getDatabase(dbDefinition.name)
+ if (existingDb.properties == dbDefinition.properties) {
+ logWarning(s"Request to alter database ${dbDefinition.name} is a no-op because " +
+ s"the provided database properties are the same as the old ones. Hive does not " +
+ s"currently support altering other database fields.")
+ }
+ client.alterDatabase(dbDefinition)
+ }
+
+ override def getDatabase(db: String): CatalogDatabase = withClient {
+ client.getDatabase(db)
+ }
+
+ override def databaseExists(db: String): Boolean = withClient {
+ client.getDatabaseOption(db).isDefined
+ }
+
+ override def listDatabases(): Seq[String] = withClient {
+ client.listDatabases("*")
+ }
+
+ override def listDatabases(pattern: String): Seq[String] = withClient {
+ client.listDatabases(pattern)
+ }
+
+ override def setCurrentDatabase(db: String): Unit = withClient {
+ client.setCurrentDatabase(db)
+ }
+
+ // --------------------------------------------------------------------------
+ // Tables
+ // --------------------------------------------------------------------------
+
+ override def createTable(
+ db: String,
+ tableDefinition: CatalogTable,
+ ignoreIfExists: Boolean): Unit = withClient {
+ requireDbExists(db)
+ requireDbMatches(db, tableDefinition)
+ client.createTable(tableDefinition, ignoreIfExists)
+ }
+
+ override def dropTable(
+ db: String,
+ table: String,
+ ignoreIfNotExists: Boolean): Unit = withClient {
+ requireDbExists(db)
+ client.dropTable(db, table, ignoreIfNotExists)
+ }
+
+ override def renameTable(db: String, oldName: String, newName: String): Unit = withClient {
+ val newTable = client.getTable(db, oldName).copy(name = newName)
+ client.alterTable(oldName, newTable)
+ }
+
+ /**
+ * Alter a table whose name that matches the one specified in `tableDefinition`,
+ * assuming the table exists.
+ *
+ * Note: As of now, this only supports altering table properties, serde properties,
+ * and num buckets!
+ */
+ override def alterTable(db: String, tableDefinition: CatalogTable): Unit = withClient {
+ requireDbMatches(db, tableDefinition)
+ requireTableExists(db, tableDefinition.name)
+ client.alterTable(tableDefinition)
+ }
+
+ override def getTable(db: String, table: String): CatalogTable = withClient {
+ client.getTable(db, table)
+ }
+
+ override def listTables(db: String): Seq[String] = withClient {
+ requireDbExists(db)
+ client.listTables(db)
+ }
+
+ override def listTables(db: String, pattern: String): Seq[String] = withClient {
+ requireDbExists(db)
+ client.listTables(db, pattern)
+ }
+
+ // --------------------------------------------------------------------------
+ // Partitions
+ // --------------------------------------------------------------------------
+
+ override def createPartitions(
+ db: String,
+ table: String,
+ parts: Seq[CatalogTablePartition],
+ ignoreIfExists: Boolean): Unit = withClient {
+ requireTableExists(db, table)
+ client.createPartitions(db, table, parts, ignoreIfExists)
+ }
+
+ override def dropPartitions(
+ db: String,
+ table: String,
+ parts: Seq[TablePartitionSpec],
+ ignoreIfNotExists: Boolean): Unit = withClient {
+ requireTableExists(db, table)
+ // Note: Unfortunately Hive does not currently support `ignoreIfNotExists` so we
+ // need to implement it here ourselves. This is currently somewhat expensive because
+ // we make multiple synchronous calls to Hive for each partition we want to drop.
+ val partsToDrop =
+ if (ignoreIfNotExists) {
+ parts.filter { spec =>
+ try {
+ getPartition(db, table, spec)
+ true
+ } catch {
+ // Filter out the partitions that do not actually exist
+ case _: AnalysisException => false
+ }
+ }
+ } else {
+ parts
+ }
+ if (partsToDrop.nonEmpty) {
+ client.dropPartitions(db, table, partsToDrop)
+ }
+ }
+
+ override def renamePartitions(
+ db: String,
+ table: String,
+ specs: Seq[TablePartitionSpec],
+ newSpecs: Seq[TablePartitionSpec]): Unit = withClient {
+ client.renamePartitions(db, table, specs, newSpecs)
+ }
+
+ override def alterPartitions(
+ db: String,
+ table: String,
+ newParts: Seq[CatalogTablePartition]): Unit = withClient {
+ client.alterPartitions(db, table, newParts)
+ }
+
+ override def getPartition(
+ db: String,
+ table: String,
+ spec: TablePartitionSpec): CatalogTablePartition = withClient {
+ client.getPartition(db, table, spec)
+ }
+
+ override def listPartitions(
+ db: String,
+ table: String): Seq[CatalogTablePartition] = withClient {
+ client.getAllPartitions(db, table)
+ }
+
+ // --------------------------------------------------------------------------
+ // Functions
+ // --------------------------------------------------------------------------
+
+ override def createFunction(
+ db: String,
+ funcDefinition: CatalogFunction): Unit = withClient {
+ client.createFunction(db, funcDefinition)
+ }
+
+ override def dropFunction(db: String, name: String): Unit = withClient {
+ client.dropFunction(db, name)
+ }
+
+ override def renameFunction(db: String, oldName: String, newName: String): Unit = withClient {
+ client.renameFunction(db, oldName, newName)
+ }
+
+ override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = withClient {
+ client.alterFunction(db, funcDefinition)
+ }
+
+ override def getFunction(db: String, funcName: String): CatalogFunction = withClient {
+ client.getFunction(db, funcName)
+ }
+
+ override def listFunctions(db: String, pattern: String): Seq[String] = withClient {
+ client.listFunctions(db, pattern)
+ }
+
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index c222b006a0..3788736fd1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -25,15 +25,16 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.metastore.Warehouse
+import org.apache.hadoop.hive.metastore.{TableType => HiveTableType, Warehouse}
import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.hadoop.hive.ql.metadata._
+import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable, _}
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.spark.Logging
import org.apache.spark.sql.{AnalysisException, SaveMode, SQLContext}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog}
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
@@ -96,6 +97,8 @@ private[hive] object HiveSerDe {
}
}
+
+// TODO: replace this with o.a.s.sql.hive.HiveCatalog once we merge SQLContext and HiveContext
private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveContext)
extends Catalog with Logging {
@@ -107,16 +110,16 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
/** A fully qualified identifier for a table (i.e., database.tableName) */
case class QualifiedTableName(database: String, name: String)
- private def getQualifiedTableName(tableIdent: TableIdentifier) = {
+ private def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = {
QualifiedTableName(
tableIdent.database.getOrElse(client.currentDatabase).toLowerCase,
tableIdent.table.toLowerCase)
}
- private def getQualifiedTableName(hiveTable: HiveTable) = {
+ private def getQualifiedTableName(t: CatalogTable): QualifiedTableName = {
QualifiedTableName(
- hiveTable.specifiedDatabase.getOrElse(client.currentDatabase).toLowerCase,
- hiveTable.name.toLowerCase)
+ t.specifiedDatabase.getOrElse(client.currentDatabase).toLowerCase,
+ t.name.toLowerCase)
}
/** A cache of Spark SQL data source tables that have been accessed. */
@@ -175,7 +178,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
// It does not appear that the ql client for the metastore has a way to enumerate all the
// SerDe properties directly...
- val options = table.serdeProperties
+ val options = table.storage.serdeProperties
val resolvedRelation =
ResolvedDataSource(
@@ -276,53 +279,54 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
val tableType = if (isExternal) {
tableProperties.put("EXTERNAL", "TRUE")
- ExternalTable
+ CatalogTableType.EXTERNAL_TABLE
} else {
tableProperties.put("EXTERNAL", "FALSE")
- ManagedTable
+ CatalogTableType.MANAGED_TABLE
}
val maybeSerDe = HiveSerDe.sourceToSerDe(provider, hive.hiveconf)
val dataSource = ResolvedDataSource(
hive, userSpecifiedSchema, partitionColumns, bucketSpec, provider, options)
- def newSparkSQLSpecificMetastoreTable(): HiveTable = {
- HiveTable(
+ def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
+ CatalogTable(
specifiedDatabase = Option(dbName),
name = tblName,
- schema = Nil,
- partitionColumns = Nil,
tableType = tableType,
- properties = tableProperties.toMap,
- serdeProperties = options)
+ schema = Nil,
+ storage = CatalogStorageFormat(
+ locationUri = None,
+ inputFormat = None,
+ outputFormat = None,
+ serde = None,
+ serdeProperties = options
+ ),
+ properties = tableProperties.toMap)
}
- def newHiveCompatibleMetastoreTable(relation: HadoopFsRelation, serde: HiveSerDe): HiveTable = {
- def schemaToHiveColumn(schema: StructType): Seq[HiveColumn] = {
- schema.map { field =>
- HiveColumn(
- name = field.name,
- hiveType = HiveMetastoreTypes.toMetastoreType(field.dataType),
- comment = "")
- }
- }
-
+ def newHiveCompatibleMetastoreTable(
+ relation: HadoopFsRelation,
+ serde: HiveSerDe): CatalogTable = {
assert(partitionColumns.isEmpty)
assert(relation.partitionColumns.isEmpty)
- HiveTable(
+ CatalogTable(
specifiedDatabase = Option(dbName),
name = tblName,
- schema = schemaToHiveColumn(relation.schema),
- partitionColumns = Nil,
tableType = tableType,
+ storage = CatalogStorageFormat(
+ locationUri = Some(relation.paths.head),
+ inputFormat = serde.inputFormat,
+ outputFormat = serde.outputFormat,
+ serde = serde.serde,
+ serdeProperties = options
+ ),
+ schema = relation.schema.map { f =>
+ CatalogColumn(f.name, HiveMetastoreTypes.toMetastoreType(f.dataType))
+ },
properties = tableProperties.toMap,
- serdeProperties = options,
- location = Some(relation.paths.head),
- viewText = None, // TODO We need to place the SQL string here.
- inputFormat = serde.inputFormat,
- outputFormat = serde.outputFormat,
- serde = serde.serde)
+ viewText = None) // TODO: We need to place the SQL string here
}
// TODO: Support persisting partitioned data source relations in Hive compatible format
@@ -379,7 +383,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
// specific way.
try {
logInfo(message)
- client.createTable(table)
+ client.createTable(table, ignoreIfExists = false)
} catch {
case throwable: Throwable =>
val warningMessage =
@@ -387,20 +391,20 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
s"it into Hive metastore in Spark SQL specific format."
logWarning(warningMessage, throwable)
val sparkSqlSpecificTable = newSparkSQLSpecificMetastoreTable()
- client.createTable(sparkSqlSpecificTable)
+ client.createTable(sparkSqlSpecificTable, ignoreIfExists = false)
}
case (None, message) =>
logWarning(message)
val hiveTable = newSparkSQLSpecificMetastoreTable()
- client.createTable(hiveTable)
+ client.createTable(hiveTable, ignoreIfExists = false)
}
}
def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = {
// Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName)
val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
- new Path(new Path(client.getDatabase(dbName).location), tblName).toString
+ new Path(new Path(client.getDatabase(dbName).locationUri), tblName).toString
}
override def tableExists(tableIdent: TableIdentifier): Boolean = {
@@ -420,7 +424,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
// Then, if alias is specified, wrap the table with a Subquery using the alias.
// Otherwise, wrap the table with a Subquery using the table name.
alias.map(a => SubqueryAlias(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)
- } else if (table.tableType == VirtualView) {
+ } else if (table.tableType == CatalogTableType.VIRTUAL_VIEW) {
val viewText = table.viewText.getOrElse(sys.error("Invalid view without text."))
alias match {
// because hive use things like `_c0` to build the expanded text
@@ -429,7 +433,8 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
case Some(aliasText) => SubqueryAlias(aliasText, hive.parseSql(viewText))
}
} else {
- MetastoreRelation(qualifiedTableName.database, qualifiedTableName.name, alias)(table)(hive)
+ MetastoreRelation(
+ qualifiedTableName.database, qualifiedTableName.name, alias)(table, client, hive)
}
}
@@ -602,16 +607,14 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
val schema = if (table.schema.nonEmpty) {
table.schema
} else {
- child.output.map {
- attr => new HiveColumn(
- attr.name,
- HiveMetastoreTypes.toMetastoreType(attr.dataType), null)
+ child.output.map { a =>
+ CatalogColumn(a.name, HiveMetastoreTypes.toMetastoreType(a.dataType), a.nullable)
}
}
val desc = table.copy(schema = schema)
- if (hive.convertCTAS && table.serde.isEmpty) {
+ if (hive.convertCTAS && table.storage.serde.isEmpty) {
// Do the conversion when spark.sql.hive.convertCTAS is true and the query
// does not specify any storage format (file format and storage handler).
if (table.specifiedDatabase.isDefined) {
@@ -632,9 +635,9 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
child
)
} else {
- val desc = if (table.serde.isEmpty) {
+ val desc = if (table.storage.serde.isEmpty) {
// add default serde
- table.copy(
+ table.withNewStorage(
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
} else {
table
@@ -744,10 +747,13 @@ private[hive] case class InsertIntoHiveTable(
}
}
-private[hive] case class MetastoreRelation
- (databaseName: String, tableName: String, alias: Option[String])
- (val table: HiveTable)
- (@transient private val sqlContext: SQLContext)
+private[hive] case class MetastoreRelation(
+ databaseName: String,
+ tableName: String,
+ alias: Option[String])
+ (val table: CatalogTable,
+ @transient private val client: HiveClient,
+ @transient private val sqlContext: SQLContext)
extends LeafNode with MultiInstanceRelation with FileRelation {
override def equals(other: Any): Boolean = other match {
@@ -765,7 +771,12 @@ private[hive] case class MetastoreRelation
override protected def otherCopyArgs: Seq[AnyRef] = table :: sqlContext :: Nil
- @transient val hiveQlTable: Table = {
+ private def toHiveColumn(c: CatalogColumn): FieldSchema = {
+ new FieldSchema(c.name, c.dataType, c.comment.orNull)
+ }
+
+ // TODO: merge this with HiveClientImpl#toHiveTable
+ @transient val hiveQlTable: HiveTable = {
// We start by constructing an API table as Hive performs several important transformations
// internally when converting an API table to a QL table.
val tTable = new org.apache.hadoop.hive.metastore.api.Table()
@@ -776,27 +787,31 @@ private[hive] case class MetastoreRelation
tTable.setParameters(tableParameters)
table.properties.foreach { case (k, v) => tableParameters.put(k, v) }
- tTable.setTableType(table.tableType.name)
+ tTable.setTableType(table.tableType match {
+ case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE.toString
+ case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE.toString
+ case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE.toString
+ case CatalogTableType.VIRTUAL_VIEW => HiveTableType.VIRTUAL_VIEW.toString
+ })
val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
tTable.setSd(sd)
- sd.setCols(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
- tTable.setPartitionKeys(
- table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
+ sd.setCols(table.schema.map(toHiveColumn).asJava)
+ tTable.setPartitionKeys(table.partitionColumns.map(toHiveColumn).asJava)
- table.location.foreach(sd.setLocation)
- table.inputFormat.foreach(sd.setInputFormat)
- table.outputFormat.foreach(sd.setOutputFormat)
+ table.storage.locationUri.foreach(sd.setLocation)
+ table.storage.inputFormat.foreach(sd.setInputFormat)
+ table.storage.outputFormat.foreach(sd.setOutputFormat)
val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
- table.serde.foreach(serdeInfo.setSerializationLib)
+ table.storage.serde.foreach(serdeInfo.setSerializationLib)
sd.setSerdeInfo(serdeInfo)
val serdeParameters = new java.util.HashMap[String, String]()
- table.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
+ table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
serdeInfo.setParameters(serdeParameters)
- new Table(tTable)
+ new HiveTable(tTable)
}
@transient override lazy val statistics: Statistics = Statistics(
@@ -821,11 +836,11 @@ private[hive] case class MetastoreRelation
// When metastore partition pruning is turned off, we cache the list of all partitions to
// mimic the behavior of Spark < 1.5
- lazy val allPartitions = table.getAllPartitions
+ private lazy val allPartitions: Seq[CatalogTablePartition] = client.getAllPartitions(table)
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
- table.getPartitions(predicates)
+ client.getPartitionsByFilter(table, predicates)
} else {
allPartitions
}
@@ -834,23 +849,22 @@ private[hive] case class MetastoreRelation
val tPartition = new org.apache.hadoop.hive.metastore.api.Partition
tPartition.setDbName(databaseName)
tPartition.setTableName(tableName)
- tPartition.setValues(p.values.asJava)
+ tPartition.setValues(p.spec.values.toList.asJava)
val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
tPartition.setSd(sd)
- sd.setCols(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
-
- sd.setLocation(p.storage.location)
- sd.setInputFormat(p.storage.inputFormat)
- sd.setOutputFormat(p.storage.outputFormat)
+ sd.setCols(table.schema.map(toHiveColumn).asJava)
+ p.storage.locationUri.foreach(sd.setLocation)
+ p.storage.inputFormat.foreach(sd.setInputFormat)
+ p.storage.outputFormat.foreach(sd.setOutputFormat)
val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
sd.setSerdeInfo(serdeInfo)
// maps and lists should be set only after all elements are ready (see HIVE-7975)
- serdeInfo.setSerializationLib(p.storage.serde)
+ p.storage.serde.foreach(serdeInfo.setSerializationLib)
val serdeParameters = new java.util.HashMap[String, String]()
- table.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
+ table.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
p.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
serdeInfo.setParameters(serdeParameters)
@@ -877,10 +891,10 @@ private[hive] case class MetastoreRelation
hiveQlTable.getMetadata
)
- implicit class SchemaAttribute(f: HiveColumn) {
+ implicit class SchemaAttribute(f: CatalogColumn) {
def toAttribute: AttributeReference = AttributeReference(
f.name,
- HiveMetastoreTypes.toDataType(f.hiveType),
+ HiveMetastoreTypes.toDataType(f.dataType),
// Since data can be dumped in randomly with no validation, everything is nullable.
nullable = true
)(qualifiers = Seq(alias.getOrElse(tableName)))
@@ -901,19 +915,22 @@ private[hive] case class MetastoreRelation
val columnOrdinals = AttributeMap(attributes.zipWithIndex)
override def inputFiles: Array[String] = {
- val partLocations = table.getPartitions(Nil).map(_.storage.location).toArray
+ val partLocations = client
+ .getPartitionsByFilter(table, Nil)
+ .flatMap(_.storage.locationUri)
+ .toArray
if (partLocations.nonEmpty) {
partLocations
} else {
Array(
- table.location.getOrElse(
+ table.storage.locationUri.getOrElse(
sys.error(s"Could not get the location of ${table.qualifiedName}.")))
}
}
override def newInstance(): MetastoreRelation = {
- MetastoreRelation(databaseName, tableName, alias)(table)(sqlContext)
+ MetastoreRelation(databaseName, tableName, alias)(table, client, sqlContext)
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 752c037a84..5801051353 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.ParseUtils._
@@ -39,7 +40,6 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.SparkQl
import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
-import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.types._
import org.apache.spark.sql.AnalysisException
@@ -55,7 +55,7 @@ private[hive] case object NativePlaceholder extends LogicalPlan {
}
private[hive] case class CreateTableAsSelect(
- tableDesc: HiveTable,
+ tableDesc: CatalogTable,
child: LogicalPlan,
allowExisting: Boolean) extends UnaryNode with Command {
@@ -63,14 +63,14 @@ private[hive] case class CreateTableAsSelect(
override lazy val resolved: Boolean =
tableDesc.specifiedDatabase.isDefined &&
tableDesc.schema.nonEmpty &&
- tableDesc.serde.isDefined &&
- tableDesc.inputFormat.isDefined &&
- tableDesc.outputFormat.isDefined &&
+ tableDesc.storage.serde.isDefined &&
+ tableDesc.storage.inputFormat.isDefined &&
+ tableDesc.storage.outputFormat.isDefined &&
childrenResolved
}
private[hive] case class CreateViewAsSelect(
- tableDesc: HiveTable,
+ tableDesc: CatalogTable,
child: LogicalPlan,
allowExisting: Boolean,
replace: Boolean,
@@ -193,7 +193,7 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
view: ASTNode,
viewNameParts: ASTNode,
query: ASTNode,
- schema: Seq[HiveColumn],
+ schema: Seq[CatalogColumn],
properties: Map[String, String],
allowExist: Boolean,
replace: Boolean): CreateViewAsSelect = {
@@ -201,18 +201,20 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
val originalText = query.source
- val tableDesc = HiveTable(
+ val tableDesc = CatalogTable(
specifiedDatabase = dbName,
name = viewName,
+ tableType = CatalogTableType.VIRTUAL_VIEW,
schema = schema,
- partitionColumns = Seq.empty[HiveColumn],
+ storage = CatalogStorageFormat(
+ locationUri = None,
+ inputFormat = None,
+ outputFormat = None,
+ serde = None,
+ serdeProperties = Map.empty[String, String]
+ ),
properties = properties,
- serdeProperties = Map[String, String](),
- tableType = VirtualView,
- location = None,
- inputFormat = None,
- outputFormat = None,
- serde = None,
+ viewOriginalText = Some(originalText),
viewText = Some(originalText))
// We need to keep the original SQL string so that if `spark.sql.nativeView` is
@@ -314,8 +316,8 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
val schema = maybeColumns.map { cols =>
// We can't specify column types when create view, so fill it with null first, and
// update it after the schema has been resolved later.
- nodeToColumns(cols, lowerCase = true).map(_.copy(hiveType = null))
- }.getOrElse(Seq.empty[HiveColumn])
+ nodeToColumns(cols, lowerCase = true).map(_.copy(dataType = null))
+ }.getOrElse(Seq.empty[CatalogColumn])
val properties = scala.collection.mutable.Map.empty[String, String]
@@ -369,19 +371,23 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
val TableIdentifier(tblName, dbName) = extractTableIdent(tableNameParts)
// TODO add bucket support
- var tableDesc: HiveTable = HiveTable(
+ var tableDesc: CatalogTable = CatalogTable(
specifiedDatabase = dbName,
name = tblName,
- schema = Seq.empty[HiveColumn],
- partitionColumns = Seq.empty[HiveColumn],
- properties = Map[String, String](),
- serdeProperties = Map[String, String](),
- tableType = if (externalTable.isDefined) ExternalTable else ManagedTable,
- location = None,
- inputFormat = None,
- outputFormat = None,
- serde = None,
- viewText = None)
+ tableType =
+ if (externalTable.isDefined) {
+ CatalogTableType.EXTERNAL_TABLE
+ } else {
+ CatalogTableType.MANAGED_TABLE
+ },
+ storage = CatalogStorageFormat(
+ locationUri = None,
+ inputFormat = None,
+ outputFormat = None,
+ serde = None,
+ serdeProperties = Map.empty[String, String]
+ ),
+ schema = Seq.empty[CatalogColumn])
// default storage type abbreviation (e.g. RCFile, ORC, PARQUET etc.)
val defaultStorageType = hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT)
@@ -392,9 +398,10 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
}
- hiveSerDe.inputFormat.foreach(f => tableDesc = tableDesc.copy(inputFormat = Some(f)))
- hiveSerDe.outputFormat.foreach(f => tableDesc = tableDesc.copy(outputFormat = Some(f)))
- hiveSerDe.serde.foreach(f => tableDesc = tableDesc.copy(serde = Some(f)))
+ tableDesc = tableDesc.withNewStorage(
+ inputFormat = hiveSerDe.inputFormat.orElse(tableDesc.storage.inputFormat),
+ outputFormat = hiveSerDe.outputFormat.orElse(tableDesc.storage.outputFormat),
+ serde = hiveSerDe.serde.orElse(tableDesc.storage.serde))
children.collect {
case list @ Token("TOK_TABCOLLIST", _) =>
@@ -440,13 +447,13 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
// TODO support the nullFormat
case _ => assert(false)
}
- tableDesc = tableDesc.copy(
- serdeProperties = tableDesc.serdeProperties ++ serdeParams.asScala)
+ tableDesc = tableDesc.withNewStorage(
+ serdeProperties = tableDesc.storage.serdeProperties ++ serdeParams.asScala)
case Token("TOK_TABLELOCATION", child :: Nil) =>
val location = EximUtil.relativeToAbsolutePath(hiveConf, unescapeSQLString(child.text))
- tableDesc = tableDesc.copy(location = Option(location))
+ tableDesc = tableDesc.withNewStorage(locationUri = Option(location))
case Token("TOK_TABLESERIALIZER", child :: Nil) =>
- tableDesc = tableDesc.copy(
+ tableDesc = tableDesc.withNewStorage(
serde = Option(unescapeSQLString(child.children.head.text)))
if (child.numChildren == 2) {
// This is based on the readProps(..) method in
@@ -459,59 +466,59 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
.orNull
(unescapeSQLString(prop), value)
}.toMap
- tableDesc = tableDesc.copy(serdeProperties = tableDesc.serdeProperties ++ serdeParams)
+ tableDesc = tableDesc.withNewStorage(
+ serdeProperties = tableDesc.storage.serdeProperties ++ serdeParams)
}
case Token("TOK_FILEFORMAT_GENERIC", child :: Nil) =>
child.text.toLowerCase(Locale.ENGLISH) match {
case "orc" =>
- tableDesc = tableDesc.copy(
+ tableDesc = tableDesc.withNewStorage(
inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"))
- if (tableDesc.serde.isEmpty) {
- tableDesc = tableDesc.copy(
+ if (tableDesc.storage.serde.isEmpty) {
+ tableDesc = tableDesc.withNewStorage(
serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
}
case "parquet" =>
- tableDesc = tableDesc.copy(
+ tableDesc = tableDesc.withNewStorage(
inputFormat =
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
outputFormat =
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
- if (tableDesc.serde.isEmpty) {
- tableDesc = tableDesc.copy(
+ if (tableDesc.storage.serde.isEmpty) {
+ tableDesc = tableDesc.withNewStorage(
serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
}
case "rcfile" =>
- tableDesc = tableDesc.copy(
+ tableDesc = tableDesc.withNewStorage(
inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
- if (tableDesc.serde.isEmpty) {
- tableDesc = tableDesc.copy(serde =
- Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))
+ if (tableDesc.storage.serde.isEmpty) {
+ tableDesc = tableDesc.withNewStorage(
+ serde =
+ Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))
}
case "textfile" =>
- tableDesc = tableDesc.copy(
- inputFormat =
- Option("org.apache.hadoop.mapred.TextInputFormat"),
- outputFormat =
- Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
+ tableDesc = tableDesc.withNewStorage(
+ inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"),
+ outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
case "sequencefile" =>
- tableDesc = tableDesc.copy(
+ tableDesc = tableDesc.withNewStorage(
inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat"))
case "avro" =>
- tableDesc = tableDesc.copy(
+ tableDesc = tableDesc.withNewStorage(
inputFormat =
Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"),
outputFormat =
Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"))
- if (tableDesc.serde.isEmpty) {
- tableDesc = tableDesc.copy(
+ if (tableDesc.storage.serde.isEmpty) {
+ tableDesc = tableDesc.withNewStorage(
serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe"))
}
@@ -522,23 +529,21 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
case Token("TOK_TABLESERIALIZER",
Token("TOK_SERDENAME", Token(serdeName, Nil) :: otherProps) :: Nil) =>
- tableDesc = tableDesc.copy(serde = Option(unquoteString(serdeName)))
+ tableDesc = tableDesc.withNewStorage(serde = Option(unquoteString(serdeName)))
otherProps match {
case Token("TOK_TABLEPROPERTIES", list :: Nil) :: Nil =>
- tableDesc = tableDesc.copy(
- serdeProperties = tableDesc.serdeProperties ++ getProperties(list))
+ tableDesc = tableDesc.withNewStorage(
+ serdeProperties = tableDesc.storage.serdeProperties ++ getProperties(list))
case _ =>
}
case Token("TOK_TABLEPROPERTIES", list :: Nil) =>
tableDesc = tableDesc.copy(properties = tableDesc.properties ++ getProperties(list))
case list @ Token("TOK_TABLEFILEFORMAT", _) =>
- tableDesc = tableDesc.copy(
- inputFormat =
- Option(unescapeSQLString(list.children.head.text)),
- outputFormat =
- Option(unescapeSQLString(list.children(1).text)))
+ tableDesc = tableDesc.withNewStorage(
+ inputFormat = Option(unescapeSQLString(list.children.head.text)),
+ outputFormat = Option(unescapeSQLString(list.children(1).text)))
case Token("TOK_STORAGEHANDLER", _) =>
throw new AnalysisException(
"CREATE TABLE AS SELECT cannot be used for a non-native table")
@@ -678,15 +683,15 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
// This is based the getColumns methods in
// ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
- protected def nodeToColumns(node: ASTNode, lowerCase: Boolean): Seq[HiveColumn] = {
+ protected def nodeToColumns(node: ASTNode, lowerCase: Boolean): Seq[CatalogColumn] = {
node.children.map(_.children).collect {
case Token(rawColName, Nil) :: colTypeNode :: comment =>
- val colName = if (!lowerCase) rawColName
- else rawColName.toLowerCase
- HiveColumn(
- cleanIdentifier(colName),
- nodeToTypeString(colTypeNode),
- comment.headOption.map(n => unescapeSQLString(n.text)).orNull)
+ val colName = if (!lowerCase) rawColName else rawColName.toLowerCase
+ CatalogColumn(
+ name = cleanIdentifier(colName),
+ dataType = nodeToTypeString(colTypeNode),
+ nullable = true,
+ comment.headOption.map(n => unescapeSQLString(n.text)))
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index f681cc6704..6a0a089fd1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -18,67 +18,11 @@
package org.apache.spark.sql.hive.client
import java.io.PrintStream
-import java.util.{Map => JMap}
-import javax.annotation.Nullable
-import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}
+import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Expression
-private[hive] case class HiveDatabase(name: String, location: String)
-
-private[hive] abstract class TableType { val name: String }
-private[hive] case object ExternalTable extends TableType { override val name = "EXTERNAL_TABLE" }
-private[hive] case object IndexTable extends TableType { override val name = "INDEX_TABLE" }
-private[hive] case object ManagedTable extends TableType { override val name = "MANAGED_TABLE" }
-private[hive] case object VirtualView extends TableType { override val name = "VIRTUAL_VIEW" }
-
-// TODO: Use this for Tables and Partitions
-private[hive] case class HiveStorageDescriptor(
- location: String,
- inputFormat: String,
- outputFormat: String,
- serde: String,
- serdeProperties: Map[String, String])
-
-private[hive] case class HivePartition(
- values: Seq[String],
- storage: HiveStorageDescriptor)
-
-private[hive] case class HiveColumn(name: String, @Nullable hiveType: String, comment: String)
-private[hive] case class HiveTable(
- specifiedDatabase: Option[String],
- name: String,
- schema: Seq[HiveColumn],
- partitionColumns: Seq[HiveColumn],
- properties: Map[String, String],
- serdeProperties: Map[String, String],
- tableType: TableType,
- location: Option[String] = None,
- inputFormat: Option[String] = None,
- outputFormat: Option[String] = None,
- serde: Option[String] = None,
- viewText: Option[String] = None) {
-
- @transient
- private[client] var client: HiveClient = _
-
- private[client] def withClient(ci: HiveClient): this.type = {
- client = ci
- this
- }
-
- def database: String = specifiedDatabase.getOrElse(sys.error("database not resolved"))
-
- def isPartitioned: Boolean = partitionColumns.nonEmpty
-
- def getAllPartitions: Seq[HivePartition] = client.getAllPartitions(this)
-
- def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
- client.getPartitionsByFilter(this, predicates)
-
- // Hive does not support backticks when passing names to the client.
- def qualifiedName: String = s"$database.$name"
-}
/**
* An externally visible interface to the Hive client. This interface is shared across both the
@@ -106,6 +50,9 @@ private[hive] trait HiveClient {
/** Returns the names of all tables in the given database. */
def listTables(dbName: String): Seq[String]
+ /** Returns the names of tables in the given database that matches the given pattern. */
+ def listTables(dbName: String, pattern: String): Seq[String]
+
/** Returns the name of the active database. */
def currentDatabase: String
@@ -113,46 +60,133 @@ private[hive] trait HiveClient {
def setCurrentDatabase(databaseName: String): Unit
/** Returns the metadata for specified database, throwing an exception if it doesn't exist */
- def getDatabase(name: String): HiveDatabase = {
- getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException)
+ final def getDatabase(name: String): CatalogDatabase = {
+ getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException(name))
}
/** Returns the metadata for a given database, or None if it doesn't exist. */
- def getDatabaseOption(name: String): Option[HiveDatabase]
+ def getDatabaseOption(name: String): Option[CatalogDatabase]
+
+ /** List the names of all the databases that match the specified pattern. */
+ def listDatabases(pattern: String): Seq[String]
/** Returns the specified table, or throws [[NoSuchTableException]]. */
- def getTable(dbName: String, tableName: String): HiveTable = {
- getTableOption(dbName, tableName).getOrElse(throw new NoSuchTableException)
+ final def getTable(dbName: String, tableName: String): CatalogTable = {
+ getTableOption(dbName, tableName).getOrElse(throw new NoSuchTableException(dbName, tableName))
}
- /** Returns the metadata for the specified table or None if it doens't exist. */
- def getTableOption(dbName: String, tableName: String): Option[HiveTable]
+ /** Returns the metadata for the specified table or None if it doesn't exist. */
+ def getTableOption(dbName: String, tableName: String): Option[CatalogTable]
/** Creates a view with the given metadata. */
- def createView(view: HiveTable): Unit
+ def createView(view: CatalogTable): Unit
/** Updates the given view with new metadata. */
- def alertView(view: HiveTable): Unit
+ def alertView(view: CatalogTable): Unit
/** Creates a table with the given metadata. */
- def createTable(table: HiveTable): Unit
+ def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
- /** Updates the given table with new metadata. */
- def alterTable(table: HiveTable): Unit
+ /** Drop the specified table. */
+ def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit
+
+ /** Alter a table whose name matches the one specified in `table`, assuming it exists. */
+ final def alterTable(table: CatalogTable): Unit = alterTable(table.name, table)
+
+ /** Updates the given table with new metadata, optionally renaming the table. */
+ def alterTable(tableName: String, table: CatalogTable): Unit
/** Creates a new database with the given name. */
- def createDatabase(database: HiveDatabase): Unit
+ def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit
+
+ /**
+ * Drop the specified database, if it exists.
+ *
+ * @param name database to drop
+ * @param ignoreIfNotExists if true, do not throw error if the database does not exist
+ * @param cascade whether to remove all associated objects such as tables and functions
+ */
+ def dropDatabase(name: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
+
+ /**
+ * Alter a database whose name matches the one specified in `database`, assuming it exists.
+ */
+ def alterDatabase(database: CatalogDatabase): Unit
+
+ /**
+ * Create one or many partitions in the given table.
+ */
+ def createPartitions(
+ db: String,
+ table: String,
+ parts: Seq[CatalogTablePartition],
+ ignoreIfExists: Boolean): Unit
+
+ /**
+ * Drop one or many partitions in the given table.
+ *
+ * Note: Unfortunately, Hive does not currently provide a way to ignore this call if the
+ * partitions do not already exist. The seemingly relevant flag `ifExists` in
+ * [[org.apache.hadoop.hive.metastore.PartitionDropOptions]] is not read anywhere.
+ */
+ def dropPartitions(
+ db: String,
+ table: String,
+ specs: Seq[Catalog.TablePartitionSpec]): Unit
- /** Returns the specified paritition or None if it does not exist. */
+ /**
+ * Rename one or many existing table partitions, assuming they exist.
+ */
+ def renamePartitions(
+ db: String,
+ table: String,
+ specs: Seq[Catalog.TablePartitionSpec],
+ newSpecs: Seq[Catalog.TablePartitionSpec]): Unit
+
+ /**
+ * Alter one or more table partitions whose specs match the ones specified in `newParts`,
+ * assuming the partitions exist.
+ */
+ def alterPartitions(
+ db: String,
+ table: String,
+ newParts: Seq[CatalogTablePartition]): Unit
+
+ /** Returns the specified partition, or throws [[NoSuchPartitionException]]. */
+ final def getPartition(
+ dbName: String,
+ tableName: String,
+ spec: Catalog.TablePartitionSpec): CatalogTablePartition = {
+ getPartitionOption(dbName, tableName, spec).getOrElse {
+ throw new NoSuchPartitionException(dbName, tableName, spec)
+ }
+ }
+
+ /** Returns the specified partition or None if it does not exist. */
+ final def getPartitionOption(
+ db: String,
+ table: String,
+ spec: Catalog.TablePartitionSpec): Option[CatalogTablePartition] = {
+ getPartitionOption(getTable(db, table), spec)
+ }
+
+ /** Returns the specified partition or None if it does not exist. */
def getPartitionOption(
- hTable: HiveTable,
- partitionSpec: JMap[String, String]): Option[HivePartition]
+ table: CatalogTable,
+ spec: Catalog.TablePartitionSpec): Option[CatalogTablePartition]
+
+ /** Returns all partitions for the given table. */
+ final def getAllPartitions(db: String, table: String): Seq[CatalogTablePartition] = {
+ getAllPartitions(getTable(db, table))
+ }
/** Returns all partitions for the given table. */
- def getAllPartitions(hTable: HiveTable): Seq[HivePartition]
+ def getAllPartitions(table: CatalogTable): Seq[CatalogTablePartition]
/** Returns partitions filtered by predicates for the given table. */
- def getPartitionsByFilter(hTable: HiveTable, predicates: Seq[Expression]): Seq[HivePartition]
+ def getPartitionsByFilter(
+ table: CatalogTable,
+ predicates: Seq[Expression]): Seq[CatalogTablePartition]
/** Loads a static partition into an existing table. */
def loadPartition(
@@ -181,6 +215,29 @@ private[hive] trait HiveClient {
holdDDLTime: Boolean,
listBucketingEnabled: Boolean): Unit
+ /** Create a function in an existing database. */
+ def createFunction(db: String, func: CatalogFunction): Unit
+
+ /** Drop an existing function an the database. */
+ def dropFunction(db: String, name: String): Unit
+
+ /** Rename an existing function in the database. */
+ def renameFunction(db: String, oldName: String, newName: String): Unit
+
+ /** Alter a function whose name matches the one specified in `func`, assuming it exists. */
+ def alterFunction(db: String, func: CatalogFunction): Unit
+
+ /** Return an existing function in the database, assuming it exists. */
+ final def getFunction(db: String, name: String): CatalogFunction = {
+ getFunctionOption(db, name).getOrElse(throw new NoSuchFunctionException(db, name))
+ }
+
+ /** Return an existing function in the database, or None if it doesn't exist. */
+ def getFunctionOption(db: String, name: String): Option[CatalogFunction]
+
+ /** Return the names of all functions that match the given pattern in the database. */
+ def listFunctions(db: String, pattern: String): Seq[String]
+
/** Add a jar into class loader */
def addJar(path: String): Unit
@@ -192,4 +249,5 @@ private[hive] trait HiveClient {
/** Used for testing only. Removes all metadata from this instance of Hive. */
def reset(): Unit
+
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index cf1ff55c96..7a007d2acc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -18,24 +18,25 @@
package org.apache.spark.sql.hive.client
import java.io.{File, PrintStream}
-import java.util.{Map => JMap}
import scala.collection.JavaConverters._
import scala.language.reflectiveCalls
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.metastore.{TableType => HTableType}
-import org.apache.hadoop.hive.metastore.api.{Database, FieldSchema}
-import org.apache.hadoop.hive.ql.{metadata, Driver}
-import org.apache.hadoop.hive.ql.metadata.Hive
+import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
+import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Function => HiveFunction, FunctionType, PrincipalType, ResourceUri}
+import org.apache.hadoop.hive.ql.Driver
+import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable}
+import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.{Logging, SparkConf, SparkException}
-import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
+import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException}
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.util.{CircularBuffer, Utils}
@@ -234,167 +235,184 @@ private[hive] class HiveClientImpl(
if (getDatabaseOption(databaseName).isDefined) {
state.setCurrentDatabase(databaseName)
} else {
- throw new NoSuchDatabaseException
+ throw new NoSuchDatabaseException(databaseName)
}
}
- override def createDatabase(database: HiveDatabase): Unit = withHiveState {
+ override def createDatabase(
+ database: CatalogDatabase,
+ ignoreIfExists: Boolean): Unit = withHiveState {
client.createDatabase(
- new Database(
+ new HiveDatabase(
database.name,
- "",
- new File(database.location).toURI.toString,
- new java.util.HashMap),
- true)
+ database.description,
+ database.locationUri,
+ database.properties.asJava),
+ ignoreIfExists)
}
- override def getDatabaseOption(name: String): Option[HiveDatabase] = withHiveState {
+ override def dropDatabase(
+ name: String,
+ ignoreIfNotExists: Boolean,
+ cascade: Boolean): Unit = withHiveState {
+ client.dropDatabase(name, true, ignoreIfNotExists, cascade)
+ }
+
+ override def alterDatabase(database: CatalogDatabase): Unit = withHiveState {
+ client.alterDatabase(
+ database.name,
+ new HiveDatabase(
+ database.name,
+ database.description,
+ database.locationUri,
+ database.properties.asJava))
+ }
+
+ override def getDatabaseOption(name: String): Option[CatalogDatabase] = withHiveState {
Option(client.getDatabase(name)).map { d =>
- HiveDatabase(
+ CatalogDatabase(
name = d.getName,
- location = d.getLocationUri)
+ description = d.getDescription,
+ locationUri = d.getLocationUri,
+ properties = d.getParameters.asScala.toMap)
}
}
+ override def listDatabases(pattern: String): Seq[String] = withHiveState {
+ client.getDatabasesByPattern(pattern).asScala.toSeq
+ }
+
override def getTableOption(
dbName: String,
- tableName: String): Option[HiveTable] = withHiveState {
-
+ tableName: String): Option[CatalogTable] = withHiveState {
logDebug(s"Looking up $dbName.$tableName")
-
- val hiveTable = Option(client.getTable(dbName, tableName, false))
- val converted = hiveTable.map { h =>
-
- HiveTable(
- name = h.getTableName,
+ Option(client.getTable(dbName, tableName, false)).map { h =>
+ CatalogTable(
specifiedDatabase = Option(h.getDbName),
- schema = h.getCols.asScala.map(f => HiveColumn(f.getName, f.getType, f.getComment)),
- partitionColumns = h.getPartCols.asScala.map(f =>
- HiveColumn(f.getName, f.getType, f.getComment)),
- properties = h.getParameters.asScala.toMap,
- serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap,
+ name = h.getTableName,
tableType = h.getTableType match {
- case HTableType.MANAGED_TABLE => ManagedTable
- case HTableType.EXTERNAL_TABLE => ExternalTable
- case HTableType.VIRTUAL_VIEW => VirtualView
- case HTableType.INDEX_TABLE => IndexTable
+ case HiveTableType.EXTERNAL_TABLE => CatalogTableType.EXTERNAL_TABLE
+ case HiveTableType.MANAGED_TABLE => CatalogTableType.MANAGED_TABLE
+ case HiveTableType.INDEX_TABLE => CatalogTableType.INDEX_TABLE
+ case HiveTableType.VIRTUAL_VIEW => CatalogTableType.VIRTUAL_VIEW
},
- location = shim.getDataLocation(h),
- inputFormat = Option(h.getInputFormatClass).map(_.getName),
- outputFormat = Option(h.getOutputFormatClass).map(_.getName),
- serde = Option(h.getSerializationLib),
- viewText = Option(h.getViewExpandedText)).withClient(this)
+ schema = h.getCols.asScala.map(fromHiveColumn),
+ partitionColumns = h.getPartCols.asScala.map(fromHiveColumn),
+ sortColumns = Seq(),
+ numBuckets = h.getNumBuckets,
+ createTime = h.getTTable.getCreateTime.toLong * 1000,
+ lastAccessTime = h.getLastAccessTime.toLong * 1000,
+ storage = CatalogStorageFormat(
+ locationUri = shim.getDataLocation(h),
+ inputFormat = Option(h.getInputFormatClass).map(_.getName),
+ outputFormat = Option(h.getOutputFormatClass).map(_.getName),
+ serde = Option(h.getSerializationLib),
+ serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap
+ ),
+ properties = h.getParameters.asScala.toMap,
+ viewOriginalText = Option(h.getViewOriginalText),
+ viewText = Option(h.getViewExpandedText))
}
- converted
}
- private def toInputFormat(name: String) =
- Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]]
-
- private def toOutputFormat(name: String) =
- Utils.classForName(name)
- .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
-
- private def toQlTable(table: HiveTable): metadata.Table = {
- val qlTable = new metadata.Table(table.database, table.name)
-
- qlTable.setFields(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
- qlTable.setPartCols(
- table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
- table.properties.foreach { case (k, v) => qlTable.setProperty(k, v) }
- table.serdeProperties.foreach { case (k, v) => qlTable.setSerdeParam(k, v) }
-
- // set owner
- qlTable.setOwner(conf.getUser)
- // set create time
- qlTable.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int])
-
- table.location.foreach { loc => shim.setDataLocation(qlTable, loc) }
- table.inputFormat.map(toInputFormat).foreach(qlTable.setInputFormatClass)
- table.outputFormat.map(toOutputFormat).foreach(qlTable.setOutputFormatClass)
- table.serde.foreach(qlTable.setSerializationLib)
-
- qlTable
+ override def createView(view: CatalogTable): Unit = withHiveState {
+ client.createTable(toHiveViewTable(view))
}
- private def toViewTable(view: HiveTable): metadata.Table = {
- // TODO: this is duplicated with `toQlTable` except the table type stuff.
- val tbl = new metadata.Table(view.database, view.name)
- tbl.setTableType(HTableType.VIRTUAL_VIEW)
- tbl.setSerializationLib(null)
- tbl.clearSerDeInfo()
-
- // TODO: we will save the same SQL string to original and expanded text, which is different
- // from Hive.
- tbl.setViewOriginalText(view.viewText.get)
- tbl.setViewExpandedText(view.viewText.get)
-
- tbl.setFields(view.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
- view.properties.foreach { case (k, v) => tbl.setProperty(k, v) }
-
- // set owner
- tbl.setOwner(conf.getUser)
- // set create time
- tbl.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int])
-
- tbl
+ override def alertView(view: CatalogTable): Unit = withHiveState {
+ client.alterTable(view.qualifiedName, toHiveViewTable(view))
}
- override def createView(view: HiveTable): Unit = withHiveState {
- client.createTable(toViewTable(view))
+ override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState {
+ client.createTable(toHiveTable(table), ignoreIfExists)
}
- override def alertView(view: HiveTable): Unit = withHiveState {
- client.alterTable(view.qualifiedName, toViewTable(view))
+ override def dropTable(
+ dbName: String,
+ tableName: String,
+ ignoreIfNotExists: Boolean): Unit = withHiveState {
+ client.dropTable(dbName, tableName, true, ignoreIfNotExists)
}
- override def createTable(table: HiveTable): Unit = withHiveState {
- val qlTable = toQlTable(table)
- client.createTable(qlTable)
+ override def alterTable(tableName: String, table: CatalogTable): Unit = withHiveState {
+ val hiveTable = toHiveTable(table)
+ // Do not use `table.qualifiedName` here because this may be a rename
+ val qualifiedTableName = s"${table.database}.$tableName"
+ client.alterTable(qualifiedTableName, hiveTable)
}
- override def alterTable(table: HiveTable): Unit = withHiveState {
- val qlTable = toQlTable(table)
- client.alterTable(table.qualifiedName, qlTable)
+ override def createPartitions(
+ db: String,
+ table: String,
+ parts: Seq[CatalogTablePartition],
+ ignoreIfExists: Boolean): Unit = withHiveState {
+ val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists)
+ parts.foreach { s =>
+ addPartitionDesc.addPartition(s.spec.asJava, s.storage.locationUri.orNull)
+ }
+ client.createPartitions(addPartitionDesc)
+ }
+
+ override def dropPartitions(
+ db: String,
+ table: String,
+ specs: Seq[Catalog.TablePartitionSpec]): Unit = withHiveState {
+ // TODO: figure out how to drop multiple partitions in one call
+ specs.foreach { s => client.dropPartition(db, table, s.values.toList.asJava, true) }
+ }
+
+ override def renamePartitions(
+ db: String,
+ table: String,
+ specs: Seq[Catalog.TablePartitionSpec],
+ newSpecs: Seq[Catalog.TablePartitionSpec]): Unit = withHiveState {
+ require(specs.size == newSpecs.size, "number of old and new partition specs differ")
+ val catalogTable = getTable(db, table)
+ val hiveTable = toHiveTable(catalogTable)
+ specs.zip(newSpecs).foreach { case (oldSpec, newSpec) =>
+ val hivePart = getPartitionOption(catalogTable, oldSpec)
+ .map { p => toHivePartition(p.copy(spec = newSpec), hiveTable) }
+ .getOrElse { throw new NoSuchPartitionException(db, table, oldSpec) }
+ client.renamePartition(hiveTable, oldSpec.asJava, hivePart)
+ }
}
- private def toHivePartition(partition: metadata.Partition): HivePartition = {
- val apiPartition = partition.getTPartition
- HivePartition(
- values = Option(apiPartition.getValues).map(_.asScala).getOrElse(Seq.empty),
- storage = HiveStorageDescriptor(
- location = apiPartition.getSd.getLocation,
- inputFormat = apiPartition.getSd.getInputFormat,
- outputFormat = apiPartition.getSd.getOutputFormat,
- serde = apiPartition.getSd.getSerdeInfo.getSerializationLib,
- serdeProperties = apiPartition.getSd.getSerdeInfo.getParameters.asScala.toMap))
+ override def alterPartitions(
+ db: String,
+ table: String,
+ newParts: Seq[CatalogTablePartition]): Unit = withHiveState {
+ val hiveTable = toHiveTable(getTable(db, table))
+ client.alterPartitions(table, newParts.map { p => toHivePartition(p, hiveTable) }.asJava)
}
override def getPartitionOption(
- table: HiveTable,
- partitionSpec: JMap[String, String]): Option[HivePartition] = withHiveState {
-
- val qlTable = toQlTable(table)
- val qlPartition = client.getPartition(qlTable, partitionSpec, false)
- Option(qlPartition).map(toHivePartition)
+ table: CatalogTable,
+ spec: Catalog.TablePartitionSpec): Option[CatalogTablePartition] = withHiveState {
+ val hiveTable = toHiveTable(table)
+ val hivePartition = client.getPartition(hiveTable, spec.asJava, false)
+ Option(hivePartition).map(fromHivePartition)
}
- override def getAllPartitions(hTable: HiveTable): Seq[HivePartition] = withHiveState {
- val qlTable = toQlTable(hTable)
- shim.getAllPartitions(client, qlTable).map(toHivePartition)
+ override def getAllPartitions(table: CatalogTable): Seq[CatalogTablePartition] = withHiveState {
+ val hiveTable = toHiveTable(table)
+ shim.getAllPartitions(client, hiveTable).map(fromHivePartition)
}
override def getPartitionsByFilter(
- hTable: HiveTable,
- predicates: Seq[Expression]): Seq[HivePartition] = withHiveState {
- val qlTable = toQlTable(hTable)
- shim.getPartitionsByFilter(client, qlTable, predicates).map(toHivePartition)
+ table: CatalogTable,
+ predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState {
+ val hiveTable = toHiveTable(table)
+ shim.getPartitionsByFilter(client, hiveTable, predicates).map(fromHivePartition)
}
override def listTables(dbName: String): Seq[String] = withHiveState {
client.getAllTables(dbName).asScala
}
+ override def listTables(dbName: String, pattern: String): Seq[String] = withHiveState {
+ client.getTablesByPattern(dbName, pattern).asScala
+ }
+
/**
* Runs the specified SQL query using Hive.
*/
@@ -508,6 +526,34 @@ private[hive] class HiveClientImpl(
listBucketingEnabled)
}
+ override def createFunction(db: String, func: CatalogFunction): Unit = withHiveState {
+ client.createFunction(toHiveFunction(func, db))
+ }
+
+ override def dropFunction(db: String, name: String): Unit = withHiveState {
+ client.dropFunction(db, name)
+ }
+
+ override def renameFunction(db: String, oldName: String, newName: String): Unit = withHiveState {
+ val catalogFunc = getFunction(db, oldName).copy(name = newName)
+ val hiveFunc = toHiveFunction(catalogFunc, db)
+ client.alterFunction(db, oldName, hiveFunc)
+ }
+
+ override def alterFunction(db: String, func: CatalogFunction): Unit = withHiveState {
+ client.alterFunction(db, func.name, toHiveFunction(func, db))
+ }
+
+ override def getFunctionOption(
+ db: String,
+ name: String): Option[CatalogFunction] = withHiveState {
+ Option(client.getFunction(db, name)).map(fromHiveFunction)
+ }
+
+ override def listFunctions(db: String, pattern: String): Seq[String] = withHiveState {
+ client.getFunctions(db, pattern).asScala
+ }
+
def addJar(path: String): Unit = {
val uri = new Path(path).toUri
val jarURL = if (uri.getScheme == null) {
@@ -541,4 +587,97 @@ private[hive] class HiveClientImpl(
client.dropDatabase(db, true, false, true)
}
}
+
+
+ /* -------------------------------------------------------- *
+ | Helper methods for converting to and from Hive classes |
+ * -------------------------------------------------------- */
+
+ private def toInputFormat(name: String) =
+ Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]]
+
+ private def toOutputFormat(name: String) =
+ Utils.classForName(name)
+ .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
+
+ private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = {
+ new HiveFunction(
+ f.name,
+ db,
+ f.className,
+ null,
+ PrincipalType.USER,
+ (System.currentTimeMillis / 1000).toInt,
+ FunctionType.JAVA,
+ List.empty[ResourceUri].asJava)
+ }
+
+ private def fromHiveFunction(hf: HiveFunction): CatalogFunction = {
+ new CatalogFunction(hf.getFunctionName, hf.getClassName)
+ }
+
+ private def toHiveColumn(c: CatalogColumn): FieldSchema = {
+ new FieldSchema(c.name, c.dataType, c.comment.orNull)
+ }
+
+ private def fromHiveColumn(hc: FieldSchema): CatalogColumn = {
+ new CatalogColumn(
+ name = hc.getName,
+ dataType = hc.getType,
+ nullable = true,
+ comment = Option(hc.getComment))
+ }
+
+ private def toHiveTable(table: CatalogTable): HiveTable = {
+ val hiveTable = new HiveTable(table.database, table.name)
+ hiveTable.setTableType(table.tableType match {
+ case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE
+ case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE
+ case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE
+ case CatalogTableType.VIRTUAL_VIEW => HiveTableType.VIRTUAL_VIEW
+ })
+ hiveTable.setFields(table.schema.map(toHiveColumn).asJava)
+ hiveTable.setPartCols(table.partitionColumns.map(toHiveColumn).asJava)
+ // TODO: set sort columns here too
+ hiveTable.setOwner(conf.getUser)
+ hiveTable.setNumBuckets(table.numBuckets)
+ hiveTable.setCreateTime((table.createTime / 1000).toInt)
+ hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt)
+ table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc) }
+ table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass)
+ table.storage.outputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass)
+ table.storage.serde.foreach(hiveTable.setSerializationLib)
+ table.storage.serdeProperties.foreach { case (k, v) => hiveTable.setSerdeParam(k, v) }
+ table.properties.foreach { case (k, v) => hiveTable.setProperty(k, v) }
+ table.viewOriginalText.foreach { t => hiveTable.setViewOriginalText(t) }
+ table.viewText.foreach { t => hiveTable.setViewExpandedText(t) }
+ hiveTable
+ }
+
+ private def toHiveViewTable(view: CatalogTable): HiveTable = {
+ val tbl = toHiveTable(view)
+ tbl.setTableType(HiveTableType.VIRTUAL_VIEW)
+ tbl.setSerializationLib(null)
+ tbl.clearSerDeInfo()
+ tbl
+ }
+
+ private def toHivePartition(
+ p: CatalogTablePartition,
+ ht: HiveTable): HivePartition = {
+ new HivePartition(ht, p.spec.asJava, p.storage.locationUri.map { l => new Path(l) }.orNull)
+ }
+
+ private def fromHivePartition(hp: HivePartition): CatalogTablePartition = {
+ val apiPartition = hp.getTPartition
+ CatalogTablePartition(
+ spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty),
+ storage = CatalogStorageFormat(
+ locationUri = Option(apiPartition.getSd.getLocation),
+ inputFormat = Option(apiPartition.getSd.getInputFormat),
+ outputFormat = Option(apiPartition.getSd.getOutputFormat),
+ serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib),
+ serdeProperties = apiPartition.getSd.getSerdeInfo.getParameters.asScala.toMap))
+ }
+
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 4c0aae6c04..3f81c99c41 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -19,10 +19,10 @@ package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, MetastoreRelation}
-import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}
/**
* Create table and insert the query result into it.
@@ -33,7 +33,7 @@ import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}
*/
private[hive]
case class CreateTableAsSelect(
- tableDesc: HiveTable,
+ tableDesc: CatalogTable,
query: LogicalPlan,
allowExisting: Boolean)
extends RunnableCommand {
@@ -51,25 +51,25 @@ case class CreateTableAsSelect(
import org.apache.hadoop.mapred.TextInputFormat
val withFormat =
- tableDesc.copy(
+ tableDesc.withNewStorage(
inputFormat =
- tableDesc.inputFormat.orElse(Some(classOf[TextInputFormat].getName)),
+ tableDesc.storage.inputFormat.orElse(Some(classOf[TextInputFormat].getName)),
outputFormat =
- tableDesc.outputFormat
+ tableDesc.storage.outputFormat
.orElse(Some(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]].getName)),
- serde = tableDesc.serde.orElse(Some(classOf[LazySimpleSerDe].getName())))
+ serde = tableDesc.storage.serde.orElse(Some(classOf[LazySimpleSerDe].getName)))
val withSchema = if (withFormat.schema.isEmpty) {
// Hive doesn't support specifying the column list for target table in CTAS
// However we don't think SparkSQL should follow that.
- tableDesc.copy(schema =
- query.output.map(c =>
- HiveColumn(c.name, HiveMetastoreTypes.toMetastoreType(c.dataType), null)))
+ tableDesc.copy(schema = query.output.map { c =>
+ CatalogColumn(c.name, HiveMetastoreTypes.toMetastoreType(c.dataType))
+ })
} else {
withFormat
}
- hiveContext.catalog.client.createTable(withSchema)
+ hiveContext.catalog.client.createTable(withSchema, ignoreIfExists = false)
// Get the Metastore Relation
hiveContext.catalog.lookupRelation(tableIdentifier, None) match {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
index 5da58a73e1..2914d03749 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
@@ -21,11 +21,11 @@ import scala.util.control.NonFatal
import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable}
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, SQLBuilder}
-import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}
+import org.apache.spark.sql.hive.{ HiveContext, HiveMetastoreTypes, SQLBuilder}
/**
* Create Hive view on non-hive-compatible tables by specifying schema ourselves instead of
@@ -34,7 +34,7 @@ import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}
// TODO: Note that this class can NOT canonicalize the view SQL string entirely, which is different
// from Hive and may not work for some cases like create view on self join.
private[hive] case class CreateViewAsSelect(
- tableDesc: HiveTable,
+ tableDesc: CatalogTable,
child: LogicalPlan,
allowExisting: Boolean,
orReplace: Boolean) extends RunnableCommand {
@@ -72,7 +72,7 @@ private[hive] case class CreateViewAsSelect(
Seq.empty[Row]
}
- private def prepareTable(sqlContext: SQLContext): HiveTable = {
+ private def prepareTable(sqlContext: SQLContext): CatalogTable = {
val expandedText = if (sqlContext.conf.canonicalView) {
try rebuildViewQueryString(sqlContext) catch {
case NonFatal(e) => wrapViewTextWithSelect
@@ -83,12 +83,16 @@ private[hive] case class CreateViewAsSelect(
val viewSchema = {
if (tableDesc.schema.isEmpty) {
- childSchema.map { attr =>
- HiveColumn(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), null)
+ childSchema.map { a =>
+ CatalogColumn(a.name, HiveMetastoreTypes.toMetastoreType(a.dataType))
}
} else {
- childSchema.zip(tableDesc.schema).map { case (attr, col) =>
- HiveColumn(col.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), col.comment)
+ childSchema.zip(tableDesc.schema).map { case (a, col) =>
+ CatalogColumn(
+ col.name,
+ HiveMetastoreTypes.toMetastoreType(a.dataType),
+ nullable = true,
+ col.comment)
}
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index feb133d448..d316664241 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -205,7 +205,7 @@ case class InsertIntoHiveTable(
val oldPart =
catalog.client.getPartitionOption(
catalog.client.getTable(table.databaseName, table.tableName),
- partitionSpec.asJava)
+ partitionSpec)
if (oldPart.isEmpty || !ifNotExists) {
catalog.client.loadPartition(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
new file mode 100644
index 0000000000..f73e7e2351
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.util.VersionInfo
+
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.hive.client.{HiveClient, IsolatedClientLoader}
+import org.apache.spark.util.Utils
+
+
+/**
+ * Test suite for the [[HiveCatalog]].
+ */
+class HiveCatalogSuite extends CatalogTestCases {
+
+ private val client: HiveClient = {
+ IsolatedClientLoader.forVersion(
+ hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
+ hadoopVersion = VersionInfo.getVersion).createClient()
+ }
+
+ protected override val tableInputFormat: String =
+ "org.apache.hadoop.mapred.SequenceFileInputFormat"
+ protected override val tableOutputFormat: String =
+ "org.apache.hadoop.mapred.SequenceFileOutputFormat"
+
+ protected override def newUriForDatabase(): String = Utils.createTempDir().getAbsolutePath
+
+ protected override def resetState(): Unit = client.reset()
+
+ protected override def newEmptyCatalog(): Catalog = new HiveCatalog(client)
+
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index 14a83d5390..f8764d4725 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -21,7 +21,7 @@ import java.io.File
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{QueryTest, Row, SaveMode, SQLConf}
-import org.apache.spark.sql.hive.client.{ExternalTable, ManagedTable}
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils}
import org.apache.spark.sql.types.{DecimalType, StringType, StructType}
@@ -83,16 +83,16 @@ class DataSourceWithHiveMetastoreCatalogSuite
}
val hiveTable = catalog.client.getTable("default", "t")
- assert(hiveTable.inputFormat === Some(inputFormat))
- assert(hiveTable.outputFormat === Some(outputFormat))
- assert(hiveTable.serde === Some(serde))
+ assert(hiveTable.storage.inputFormat === Some(inputFormat))
+ assert(hiveTable.storage.outputFormat === Some(outputFormat))
+ assert(hiveTable.storage.serde === Some(serde))
- assert(!hiveTable.isPartitioned)
- assert(hiveTable.tableType === ManagedTable)
+ assert(hiveTable.partitionColumns.isEmpty)
+ assert(hiveTable.tableType === CatalogTableType.MANAGED_TABLE)
val columns = hiveTable.schema
assert(columns.map(_.name) === Seq("d1", "d2"))
- assert(columns.map(_.hiveType) === Seq("decimal(10,3)", "string"))
+ assert(columns.map(_.dataType) === Seq("decimal(10,3)", "string"))
checkAnswer(table("t"), testDF)
assert(runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2"))
@@ -114,16 +114,17 @@ class DataSourceWithHiveMetastoreCatalogSuite
}
val hiveTable = catalog.client.getTable("default", "t")
- assert(hiveTable.inputFormat === Some(inputFormat))
- assert(hiveTable.outputFormat === Some(outputFormat))
- assert(hiveTable.serde === Some(serde))
+ assert(hiveTable.storage.inputFormat === Some(inputFormat))
+ assert(hiveTable.storage.outputFormat === Some(outputFormat))
+ assert(hiveTable.storage.serde === Some(serde))
- assert(hiveTable.tableType === ExternalTable)
- assert(hiveTable.location.get === path.toURI.toString.stripSuffix(File.separator))
+ assert(hiveTable.tableType === CatalogTableType.EXTERNAL_TABLE)
+ assert(hiveTable.storage.locationUri ===
+ Some(path.toURI.toString.stripSuffix(File.separator)))
val columns = hiveTable.schema
assert(columns.map(_.name) === Seq("d1", "d2"))
- assert(columns.map(_.hiveType) === Seq("decimal(10,3)", "string"))
+ assert(columns.map(_.dataType) === Seq("decimal(10,3)", "string"))
checkAnswer(table("t"), testDF)
assert(runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2"))
@@ -143,17 +144,16 @@ class DataSourceWithHiveMetastoreCatalogSuite
""".stripMargin)
val hiveTable = catalog.client.getTable("default", "t")
- assert(hiveTable.inputFormat === Some(inputFormat))
- assert(hiveTable.outputFormat === Some(outputFormat))
- assert(hiveTable.serde === Some(serde))
+ assert(hiveTable.storage.inputFormat === Some(inputFormat))
+ assert(hiveTable.storage.outputFormat === Some(outputFormat))
+ assert(hiveTable.storage.serde === Some(serde))
- assert(hiveTable.isPartitioned === false)
- assert(hiveTable.tableType === ExternalTable)
- assert(hiveTable.partitionColumns.length === 0)
+ assert(hiveTable.partitionColumns.isEmpty)
+ assert(hiveTable.tableType === CatalogTableType.EXTERNAL_TABLE)
val columns = hiveTable.schema
assert(columns.map(_.name) === Seq("d1", "d2"))
- assert(columns.map(_.hiveType) === Seq("int", "string"))
+ assert(columns.map(_.dataType) === Seq("int", "string"))
checkAnswer(table("t"), Row(1, "val_1"))
assert(runSqlHive("SELECT * FROM t") === Seq("1\tval_1"))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
index 137dadd6c6..e869c0e2bd 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
@@ -22,15 +22,15 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions.JsonTuple
import org.apache.spark.sql.catalyst.parser.SimpleParserConf
import org.apache.spark.sql.catalyst.plans.logical.Generate
-import org.apache.spark.sql.hive.client.{ExternalTable, HiveColumn, HiveTable, ManagedTable}
class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
val parser = new HiveQl(SimpleParserConf())
- private def extractTableDesc(sql: String): (HiveTable, Boolean) = {
+ private def extractTableDesc(sql: String): (CatalogTable, Boolean) = {
parser.parsePlan(sql).collect {
case CreateTableAsSelect(desc, child, allowExisting) => (desc, allowExisting)
}.head
@@ -53,28 +53,29 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
|AS SELECT * FROM src""".stripMargin
val (desc, exists) = extractTableDesc(s1)
- assert(exists == true)
+ assert(exists)
assert(desc.specifiedDatabase == Some("mydb"))
assert(desc.name == "page_view")
- assert(desc.tableType == ExternalTable)
- assert(desc.location == Some("/user/external/page_view"))
+ assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE)
+ assert(desc.storage.locationUri == Some("/user/external/page_view"))
assert(desc.schema ==
- HiveColumn("viewtime", "int", null) ::
- HiveColumn("userid", "bigint", null) ::
- HiveColumn("page_url", "string", null) ::
- HiveColumn("referrer_url", "string", null) ::
- HiveColumn("ip", "string", "IP Address of the User") ::
- HiveColumn("country", "string", "country of origination") :: Nil)
+ CatalogColumn("viewtime", "int") ::
+ CatalogColumn("userid", "bigint") ::
+ CatalogColumn("page_url", "string") ::
+ CatalogColumn("referrer_url", "string") ::
+ CatalogColumn("ip", "string", comment = Some("IP Address of the User")) ::
+ CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil)
// TODO will be SQLText
assert(desc.viewText == Option("This is the staging page view table"))
assert(desc.partitionColumns ==
- HiveColumn("dt", "string", "date type") ::
- HiveColumn("hour", "string", "hour of the day") :: Nil)
- assert(desc.serdeProperties ==
+ CatalogColumn("dt", "string", comment = Some("date type")) ::
+ CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
+ assert(desc.storage.serdeProperties ==
Map((serdeConstants.SERIALIZATION_FORMAT, "\054"), (serdeConstants.FIELD_DELIM, "\054")))
- assert(desc.inputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
- assert(desc.outputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
- assert(desc.serde == Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))
+ assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
+ assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
+ assert(desc.storage.serde ==
+ Some("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))
assert(desc.properties == Map(("p1", "v1"), ("p2", "v2")))
}
@@ -98,27 +99,27 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
|AS SELECT * FROM src""".stripMargin
val (desc, exists) = extractTableDesc(s2)
- assert(exists == true)
+ assert(exists)
assert(desc.specifiedDatabase == Some("mydb"))
assert(desc.name == "page_view")
- assert(desc.tableType == ExternalTable)
- assert(desc.location == Some("/user/external/page_view"))
+ assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE)
+ assert(desc.storage.locationUri == Some("/user/external/page_view"))
assert(desc.schema ==
- HiveColumn("viewtime", "int", null) ::
- HiveColumn("userid", "bigint", null) ::
- HiveColumn("page_url", "string", null) ::
- HiveColumn("referrer_url", "string", null) ::
- HiveColumn("ip", "string", "IP Address of the User") ::
- HiveColumn("country", "string", "country of origination") :: Nil)
+ CatalogColumn("viewtime", "int") ::
+ CatalogColumn("userid", "bigint") ::
+ CatalogColumn("page_url", "string") ::
+ CatalogColumn("referrer_url", "string") ::
+ CatalogColumn("ip", "string", comment = Some("IP Address of the User")) ::
+ CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil)
// TODO will be SQLText
assert(desc.viewText == Option("This is the staging page view table"))
assert(desc.partitionColumns ==
- HiveColumn("dt", "string", "date type") ::
- HiveColumn("hour", "string", "hour of the day") :: Nil)
- assert(desc.serdeProperties == Map())
- assert(desc.inputFormat == Option("parquet.hive.DeprecatedParquetInputFormat"))
- assert(desc.outputFormat == Option("parquet.hive.DeprecatedParquetOutputFormat"))
- assert(desc.serde == Option("parquet.hive.serde.ParquetHiveSerDe"))
+ CatalogColumn("dt", "string", comment = Some("date type")) ::
+ CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
+ assert(desc.storage.serdeProperties == Map())
+ assert(desc.storage.inputFormat == Some("parquet.hive.DeprecatedParquetInputFormat"))
+ assert(desc.storage.outputFormat == Some("parquet.hive.DeprecatedParquetOutputFormat"))
+ assert(desc.storage.serde == Some("parquet.hive.serde.ParquetHiveSerDe"))
assert(desc.properties == Map(("p1", "v1"), ("p2", "v2")))
}
@@ -128,14 +129,15 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
assert(exists == false)
assert(desc.specifiedDatabase == None)
assert(desc.name == "page_view")
- assert(desc.tableType == ManagedTable)
- assert(desc.location == None)
- assert(desc.schema == Seq.empty[HiveColumn])
+ assert(desc.tableType == CatalogTableType.MANAGED_TABLE)
+ assert(desc.storage.locationUri == None)
+ assert(desc.schema == Seq.empty[CatalogColumn])
assert(desc.viewText == None) // TODO will be SQLText
- assert(desc.serdeProperties == Map())
- assert(desc.inputFormat == Option("org.apache.hadoop.mapred.TextInputFormat"))
- assert(desc.outputFormat == Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
- assert(desc.serde.isEmpty)
+ assert(desc.storage.serdeProperties == Map())
+ assert(desc.storage.inputFormat == Some("org.apache.hadoop.mapred.TextInputFormat"))
+ assert(desc.storage.outputFormat ==
+ Some("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
+ assert(desc.storage.serde.isEmpty)
assert(desc.properties == Map())
}
@@ -162,14 +164,14 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
assert(exists == false)
assert(desc.specifiedDatabase == None)
assert(desc.name == "ctas2")
- assert(desc.tableType == ManagedTable)
- assert(desc.location == None)
- assert(desc.schema == Seq.empty[HiveColumn])
+ assert(desc.tableType == CatalogTableType.MANAGED_TABLE)
+ assert(desc.storage.locationUri == None)
+ assert(desc.schema == Seq.empty[CatalogColumn])
assert(desc.viewText == None) // TODO will be SQLText
- assert(desc.serdeProperties == Map(("serde_p1" -> "p1"), ("serde_p2" -> "p2")))
- assert(desc.inputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
- assert(desc.outputFormat == Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
- assert(desc.serde == Option("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"))
+ assert(desc.storage.serdeProperties == Map(("serde_p1" -> "p1"), ("serde_p2" -> "p2")))
+ assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
+ assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
+ assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"))
assert(desc.properties == Map(("tbl_p1" -> "p11"), ("tbl_p2" -> "p22")))
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index d9e4b020fd..0c288bdf8a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -25,9 +25,9 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
-import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
@@ -724,20 +724,25 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
val tableName = "spark6655"
withTable(tableName) {
val schema = StructType(StructField("int", IntegerType, true) :: Nil)
- val hiveTable = HiveTable(
+ val hiveTable = CatalogTable(
specifiedDatabase = Some("default"),
name = tableName,
+ tableType = CatalogTableType.MANAGED_TABLE,
schema = Seq.empty,
- partitionColumns = Seq.empty,
+ storage = CatalogStorageFormat(
+ locationUri = None,
+ inputFormat = None,
+ outputFormat = None,
+ serde = None,
+ serdeProperties = Map(
+ "path" -> catalog.hiveDefaultTableFilePath(TableIdentifier(tableName)))
+ ),
properties = Map(
"spark.sql.sources.provider" -> "json",
"spark.sql.sources.schema" -> schema.json,
- "EXTERNAL" -> "FALSE"),
- tableType = ManagedTable,
- serdeProperties = Map(
- "path" -> catalog.hiveDefaultTableFilePath(TableIdentifier(tableName))))
+ "EXTERNAL" -> "FALSE"))
- catalog.client.createTable(hiveTable)
+ catalog.client.createTable(hiveTable, ignoreIfExists = false)
invalidateTable(tableName)
val actualSchema = table(tableName).schema
@@ -916,7 +921,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
// As a proxy for verifying that the table was stored in Hive compatible format, we verify that
// each column of the table is of native type StringType.
assert(catalog.client.getTable("default", "not_skip_hive_metadata").schema
- .forall(column => HiveMetastoreTypes.toDataType(column.hiveType) == StringType))
+ .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == StringType))
catalog.createDataSourceTable(
tableIdent = TableIdentifier("skip_hive_metadata"),
@@ -930,6 +935,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
// As a proxy for verifying that the table was stored in SparkSQL format, we verify that
// the table has a column type as array of StringType.
assert(catalog.client.getTable("default", "skip_hive_metadata").schema
- .forall(column => HiveMetastoreTypes.toDataType(column.hiveType) == ArrayType(StringType)))
+ .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == ArrayType(StringType)))
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
index c2c896e5f6..488f298981 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
@@ -26,9 +26,9 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
private def checkTablePath(dbName: String, tableName: String): Unit = {
val metastoreTable = hiveContext.catalog.client.getTable(dbName, tableName)
- val expectedPath = hiveContext.catalog.client.getDatabase(dbName).location + "/" + tableName
+ val expectedPath = hiveContext.catalog.client.getDatabase(dbName).locationUri + "/" + tableName
- assert(metastoreTable.serdeProperties("path") === expectedPath)
+ assert(metastoreTable.storage.serdeProperties("path") === expectedPath)
}
test(s"saveAsTable() to non-default database - with USE - Overwrite") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 1344a2cc4b..d850d522be 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -22,6 +22,7 @@ import java.io.File
import org.apache.hadoop.util.VersionInfo
import org.apache.spark.{Logging, SparkFunSuite}
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.hive.HiveContext
@@ -60,8 +61,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
hadoopVersion = VersionInfo.getVersion,
config = buildConf(),
ivyPath = ivyPath).createClient()
- val db = new HiveDatabase("default", "")
- badClient.createDatabase(db)
+ val db = new CatalogDatabase("default", "desc", "loc", Map())
+ badClient.createDatabase(db, ignoreIfExists = true)
}
private def getNestedMessages(e: Throwable): String = {
@@ -116,29 +117,27 @@ class VersionsSuite extends SparkFunSuite with Logging {
}
test(s"$version: createDatabase") {
- val db = HiveDatabase("default", "")
- client.createDatabase(db)
+ val db = CatalogDatabase("default", "desc", "loc", Map())
+ client.createDatabase(db, ignoreIfExists = true)
}
test(s"$version: createTable") {
val table =
- HiveTable(
+ CatalogTable(
specifiedDatabase = Option("default"),
name = "src",
- schema = Seq(HiveColumn("key", "int", "")),
- partitionColumns = Seq.empty,
- properties = Map.empty,
- serdeProperties = Map.empty,
- tableType = ManagedTable,
- location = None,
- inputFormat =
- Some(classOf[org.apache.hadoop.mapred.TextInputFormat].getName),
- outputFormat =
- Some(classOf[org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat[_, _]].getName),
- serde =
- Some(classOf[org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe].getName()))
-
- client.createTable(table)
+ tableType = CatalogTableType.MANAGED_TABLE,
+ schema = Seq(CatalogColumn("key", "int")),
+ storage = CatalogStorageFormat(
+ locationUri = None,
+ inputFormat = Some(classOf[org.apache.hadoop.mapred.TextInputFormat].getName),
+ outputFormat = Some(
+ classOf[org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat[_, _]].getName),
+ serde = Some(classOf[org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe].getName()),
+ serdeProperties = Map.empty
+ ))
+
+ client.createTable(table, ignoreIfExists = false)
}
test(s"$version: getTable") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
index b91248bfb3..37c01792d9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
@@ -149,7 +149,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter {
val (actualScannedColumns, actualPartValues) = plan.collect {
case p @ HiveTableScan(columns, relation, _) =>
val columnNames = columns.map(_.name)
- val partValues = if (relation.table.isPartitioned) {
+ val partValues = if (relation.table.partitionColumns.nonEmpty) {
p.prunePartitions(relation.getHiveQlPartitions()).map(_.getValues)
} else {
Seq.empty