aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSean Zhong <seanzhong@databricks.com>2016-05-18 16:00:02 +0800
committerCheng Lian <lian@databricks.com>2016-05-18 16:00:02 +0800
commit6e02aec44b9e5bc2ada55cb612f26e6ba000c23e (patch)
tree4c5b2a9039a833f81efdb0fa7cec5f247c59cab6
parent411c04adb596c514f2634efd5f5d126e12b05df7 (diff)
downloadspark-6e02aec44b9e5bc2ada55cb612f26e6ba000c23e.tar.gz
spark-6e02aec44b9e5bc2ada55cb612f26e6ba000c23e.tar.bz2
spark-6e02aec44b9e5bc2ada55cb612f26e6ba000c23e.zip
[SPARK-15334][SQL] HiveClient facade not compatible with Hive 0.12
## What changes were proposed in this pull request? HiveClient facade is not compatible with Hive 0.12. This PR Fixes the following compatibility issues: 1. `org.apache.spark.sql.hive.client.HiveClientImpl` use `AddPartitionDesc(db, table, ignoreIfExists)` to create partitions, however, Hive 0.12 doesn't have this constructor for `AddPartitionDesc`. 2. `HiveClientImpl` uses `PartitionDropOptions` when dropping partition, however, class `PartitionDropOptions` doesn't exist in Hive 0.12. 3. Hive 0.12 doesn't support adding permanent functions. It is not valid to call `org.apache.hadoop.hive.ql.metadata.Hive.createFunction`, `org.apache.hadoop.hive.ql.metadata.Hive.alterFunction`, and `org.apache.hadoop.hive.ql.metadata.Hive.alterFunction` 4. `org.apache.spark.sql.hive.client.VersionsSuite` doesn't have enough test coverage for different hive versions 0.12, 0.13, 0.14, 1.0.0, 1.1.0, 1.2.0. ## How was this patch tested? Unit test. Author: Sean Zhong <seanzhong@databricks.com> Closes #13127 from clockfly/versionSuite.
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala74
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala182
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala412
3 files changed, 545 insertions, 123 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index a4e9f03b43..af2850d4f5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -26,14 +26,10 @@ import scala.language.reflectiveCalls
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.metastore.{PartitionDropOptions, TableType => HiveTableType}
+import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema}
-import org.apache.hadoop.hive.metastore.api.{Function => HiveFunction, FunctionType}
-import org.apache.hadoop.hive.metastore.api.{NoSuchObjectException, PrincipalType}
-import org.apache.hadoop.hive.metastore.api.{ResourceType, ResourceUri}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable}
-import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.security.UserGroupInformation
@@ -41,13 +37,13 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.QueryExecutionException
-import org.apache.spark.util.{CausedBy, CircularBuffer, Utils}
+import org.apache.spark.util.{CircularBuffer, Utils}
/**
* A class that wraps the HiveClient and converts its responses to externally visible classes.
@@ -400,11 +396,7 @@ private[hive] class HiveClientImpl(
table: String,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = withHiveState {
- val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists)
- parts.foreach { s =>
- addPartitionDesc.addPartition(s.spec.asJava, s.storage.locationUri.orNull)
- }
- client.createPartitions(addPartitionDesc)
+ shim.createPartitions(client, db, table, parts, ignoreIfExists)
}
override def dropPartitions(
@@ -430,10 +422,9 @@ private[hive] class HiveClientImpl(
}.distinct
var droppedParts = ArrayBuffer.empty[java.util.List[String]]
matchingParts.foreach { partition =>
- val dropOptions = new PartitionDropOptions
- dropOptions.ifExists = ignoreIfNotExists
try {
- client.dropPartition(db, table, partition, dropOptions)
+ val deleteData = true
+ client.dropPartition(db, table, partition, deleteData)
} catch {
case e: Exception =>
val remainingParts = matchingParts.toBuffer -- droppedParts
@@ -629,37 +620,28 @@ private[hive] class HiveClientImpl(
}
override def createFunction(db: String, func: CatalogFunction): Unit = withHiveState {
- client.createFunction(toHiveFunction(func, db))
+ shim.createFunction(client, db, func)
}
override def dropFunction(db: String, name: String): Unit = withHiveState {
- client.dropFunction(db, name)
+ shim.dropFunction(client, db, name)
}
override def renameFunction(db: String, oldName: String, newName: String): Unit = withHiveState {
- val catalogFunc = getFunction(db, oldName)
- .copy(identifier = FunctionIdentifier(newName, Some(db)))
- val hiveFunc = toHiveFunction(catalogFunc, db)
- client.alterFunction(db, oldName, hiveFunc)
+ shim.renameFunction(client, db, oldName, newName)
}
override def alterFunction(db: String, func: CatalogFunction): Unit = withHiveState {
- client.alterFunction(db, func.identifier.funcName, toHiveFunction(func, db))
+ shim.alterFunction(client, db, func)
}
override def getFunctionOption(
- db: String,
- name: String): Option[CatalogFunction] = withHiveState {
- try {
- Option(client.getFunction(db, name)).map(fromHiveFunction)
- } catch {
- case CausedBy(ex: NoSuchObjectException) if ex.getMessage.contains(name) =>
- None
- }
+ db: String, name: String): Option[CatalogFunction] = withHiveState {
+ shim.getFunctionOption(client, db, name)
}
override def listFunctions(db: String, pattern: String): Seq[String] = withHiveState {
- client.getFunctions(db, pattern).asScala
+ shim.listFunctions(client, db, pattern)
}
def addJar(path: String): Unit = {
@@ -708,36 +690,6 @@ private[hive] class HiveClientImpl(
Utils.classForName(name)
.asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
- private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = {
- val resourceUris = f.resources.map { resource =>
- new ResourceUri(
- ResourceType.valueOf(resource.resourceType.resourceType.toUpperCase()), resource.uri)
- }
- new HiveFunction(
- f.identifier.funcName,
- db,
- f.className,
- null,
- PrincipalType.USER,
- (System.currentTimeMillis / 1000).toInt,
- FunctionType.JAVA,
- resourceUris.asJava)
- }
-
- private def fromHiveFunction(hf: HiveFunction): CatalogFunction = {
- val name = FunctionIdentifier(hf.getFunctionName, Option(hf.getDbName))
- val resources = hf.getResourceUris.asScala.map { uri =>
- val resourceType = uri.getResourceType() match {
- case ResourceType.ARCHIVE => "archive"
- case ResourceType.FILE => "file"
- case ResourceType.JAR => "jar"
- case r => throw new AnalysisException(s"Unknown resource type: $r")
- }
- FunctionResource(FunctionResourceType.fromString(resourceType), uri.getUri())
- }
- new CatalogFunction(name, hf.getClassName, resources)
- }
-
private def toHiveColumn(c: CatalogColumn): FieldSchema = {
new FieldSchema(c.name, c.dataType, c.comment.orNull)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 4ecf866f96..78713c3f0b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -27,15 +27,23 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.metastore.api.{Function => HiveFunction, FunctionType, NoSuchObjectException, PrincipalType, ResourceType, ResourceUri}
import org.apache.hadoop.hive.ql.Driver
-import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
+import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition, Table}
+import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory}
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
+import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, FunctionResource, FunctionResourceType}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{IntegralType, StringType}
+import org.apache.spark.util.CausedBy
+
/**
* A shim that defines the interface between [[HiveClientImpl]] and the underlying Hive library used
@@ -73,6 +81,13 @@ private[client] sealed abstract class Shim {
def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long
+ def createPartitions(
+ hive: Hive,
+ db: String,
+ table: String,
+ parts: Seq[CatalogTablePartition],
+ ignoreIfExists: Boolean): Unit
+
def loadPartition(
hive: Hive,
loadPath: Path,
@@ -100,6 +115,18 @@ private[client] sealed abstract class Shim {
holdDDLTime: Boolean,
listBucketingEnabled: Boolean): Unit
+ def createFunction(hive: Hive, db: String, func: CatalogFunction): Unit
+
+ def dropFunction(hive: Hive, db: String, name: String): Unit
+
+ def renameFunction(hive: Hive, db: String, oldName: String, newName: String): Unit
+
+ def alterFunction(hive: Hive, db: String, func: CatalogFunction): Unit
+
+ def getFunctionOption(hive: Hive, db: String, name: String): Option[CatalogFunction]
+
+ def listFunctions(hive: Hive, db: String, pattern: String): Seq[String]
+
def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit
protected def findStaticMethod(klass: Class[_], name: String, args: Class[_]*): Method = {
@@ -112,7 +139,6 @@ private[client] sealed abstract class Shim {
protected def findMethod(klass: Class[_], name: String, args: Class[_]*): Method = {
klass.getMethod(name, args: _*)
}
-
}
private[client] class Shim_v0_12 extends Shim with Logging {
@@ -144,6 +170,22 @@ private[client] class Shim_v0_12 extends Shim with Logging {
classOf[Driver],
"getResults",
classOf[JArrayList[String]])
+ private lazy val createPartitionMethod =
+ findMethod(
+ classOf[Hive],
+ "createPartition",
+ classOf[Table],
+ classOf[JMap[String, String]],
+ classOf[Path],
+ classOf[JMap[String, String]],
+ classOf[String],
+ classOf[String],
+ JInteger.TYPE,
+ classOf[JList[Object]],
+ classOf[String],
+ classOf[JMap[String, String]],
+ classOf[JList[Object]],
+ classOf[JList[Object]])
private lazy val loadPartitionMethod =
findMethod(
classOf[Hive],
@@ -199,6 +241,42 @@ private[client] class Shim_v0_12 extends Shim with Logging {
override def setDataLocation(table: Table, loc: String): Unit =
setDataLocationMethod.invoke(table, new URI(loc))
+ // Follows exactly the same logic of DDLTask.createPartitions in Hive 0.12
+ override def createPartitions(
+ hive: Hive,
+ database: String,
+ tableName: String,
+ parts: Seq[CatalogTablePartition],
+ ignoreIfExists: Boolean): Unit = {
+ val table = hive.getTable(database, tableName)
+ parts.foreach { s =>
+ val location = s.storage.locationUri.map(new Path(table.getPath, _)).orNull
+ val spec = s.spec.asJava
+ if (hive.getPartition(table, spec, false) != null && ignoreIfExists) {
+ // Ignore this partition since it already exists and ignoreIfExists == true
+ } else {
+ if (location == null && table.isView()) {
+ throw new HiveException("LOCATION clause illegal for view partition");
+ }
+
+ createPartitionMethod.invoke(
+ hive,
+ table,
+ spec,
+ location,
+ null, // partParams
+ null, // inputFormat
+ null, // outputFormat
+ -1: JInteger, // numBuckets
+ null, // cols
+ null, // serializationLib
+ null, // serdeParams
+ null, // bucketCols
+ null) // sortCols
+ }
+ }
+ }
+
override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] =
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].asScala.toSeq
@@ -265,6 +343,30 @@ private[client] class Shim_v0_12 extends Shim with Logging {
dropIndexMethod.invoke(hive, dbName, tableName, indexName, true: JBoolean)
}
+ override def createFunction(hive: Hive, db: String, func: CatalogFunction): Unit = {
+ throw new AnalysisException("Hive 0.12 doesn't support creating permanent functions. " +
+ "Please use Hive 0.13 or higher.")
+ }
+
+ def dropFunction(hive: Hive, db: String, name: String): Unit = {
+ throw new NoSuchPermanentFunctionException(db, name)
+ }
+
+ def renameFunction(hive: Hive, db: String, oldName: String, newName: String): Unit = {
+ throw new NoSuchPermanentFunctionException(db, oldName)
+ }
+
+ def alterFunction(hive: Hive, db: String, func: CatalogFunction): Unit = {
+ throw new NoSuchPermanentFunctionException(db, func.identifier.funcName)
+ }
+
+ def getFunctionOption(hive: Hive, db: String, name: String): Option[CatalogFunction] = {
+ None
+ }
+
+ def listFunctions(hive: Hive, db: String, pattern: String): Seq[String] = {
+ Seq.empty[String]
+ }
}
private[client] class Shim_v0_13 extends Shim_v0_12 {
@@ -308,9 +410,85 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
override def setDataLocation(table: Table, loc: String): Unit =
setDataLocationMethod.invoke(table, new Path(loc))
+ override def createPartitions(
+ hive: Hive,
+ db: String,
+ table: String,
+ parts: Seq[CatalogTablePartition],
+ ignoreIfExists: Boolean): Unit = {
+ val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists)
+ parts.foreach { s =>
+ addPartitionDesc.addPartition(s.spec.asJava, s.storage.locationUri.orNull)
+ }
+ hive.createPartitions(addPartitionDesc)
+ }
+
override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] =
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].asScala.toSeq
+ private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = {
+ val resourceUris = f.resources.map { resource =>
+ new ResourceUri(
+ ResourceType.valueOf(resource.resourceType.resourceType.toUpperCase()), resource.uri)
+ }
+ new HiveFunction(
+ f.identifier.funcName,
+ db,
+ f.className,
+ null,
+ PrincipalType.USER,
+ (System.currentTimeMillis / 1000).toInt,
+ FunctionType.JAVA,
+ resourceUris.asJava)
+ }
+
+ override def createFunction(hive: Hive, db: String, func: CatalogFunction): Unit = {
+ hive.createFunction(toHiveFunction(func, db))
+ }
+
+ override def dropFunction(hive: Hive, db: String, name: String): Unit = {
+ hive.dropFunction(db, name)
+ }
+
+ override def renameFunction(hive: Hive, db: String, oldName: String, newName: String): Unit = {
+ val catalogFunc = getFunctionOption(hive, db, oldName)
+ .getOrElse(throw new NoSuchPermanentFunctionException(db, oldName))
+ .copy(identifier = FunctionIdentifier(newName, Some(db)))
+ val hiveFunc = toHiveFunction(catalogFunc, db)
+ hive.alterFunction(db, oldName, hiveFunc)
+ }
+
+ override def alterFunction(hive: Hive, db: String, func: CatalogFunction): Unit = {
+ hive.alterFunction(db, func.identifier.funcName, toHiveFunction(func, db))
+ }
+
+ private def fromHiveFunction(hf: HiveFunction): CatalogFunction = {
+ val name = FunctionIdentifier(hf.getFunctionName, Option(hf.getDbName))
+ val resources = hf.getResourceUris.asScala.map { uri =>
+ val resourceType = uri.getResourceType() match {
+ case ResourceType.ARCHIVE => "archive"
+ case ResourceType.FILE => "file"
+ case ResourceType.JAR => "jar"
+ case r => throw new AnalysisException(s"Unknown resource type: $r")
+ }
+ FunctionResource(FunctionResourceType.fromString(resourceType), uri.getUri())
+ }
+ new CatalogFunction(name, hf.getClassName, resources)
+ }
+
+ override def getFunctionOption(hive: Hive, db: String, name: String): Option[CatalogFunction] = {
+ try {
+ Option(hive.getFunction(db, name)).map(fromHiveFunction)
+ } catch {
+ case CausedBy(ex: NoSuchObjectException) if ex.getMessage.contains(name) =>
+ None
+ }
+ }
+
+ override def listFunctions(hive: Hive, db: String, pattern: String): Seq[String] = {
+ hive.getFunctions(db, pattern).asScala
+ }
+
/**
* Converts catalyst expression to the format that Hive's getPartitionsByFilter() expects, i.e.
* a string that represents partition predicates like "str_key=\"value\" and int_key=1 ...".
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index a6a5ab3988..57e8db7e88 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -17,21 +17,27 @@
package org.apache.spark.sql.hive.client
-import java.io.File
+import java.io.{ByteArrayOutputStream, File, PrintStream}
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
+import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.util.VersionInfo
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.tags.ExtendedHiveTest
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{MutableURLClassLoader, Utils}
/**
* A simple set of tests that call the methods of a [[HiveClient]], loading different version
@@ -97,12 +103,6 @@ class VersionsSuite extends SparkFunSuite with Logging {
private val emptyDir = Utils.createTempDir().getCanonicalPath
- private def partSpec = {
- val hashMap = new java.util.LinkedHashMap[String, String]
- hashMap.put("key", "1")
- hashMap
- }
-
// Its actually pretty easy to mess things up and have all of your tests "pass" by accidentally
// connecting to an auto-populated, in-process metastore. Let's make sure we are getting the
// versions right by forcing a known compatibility failure.
@@ -122,7 +122,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'")
}
- private val versions = Seq("12", "13", "14", "1.0.0", "1.1.0", "1.2.0")
+ private val versions = Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2")
private var client: HiveClient = null
@@ -130,110 +130,402 @@ class VersionsSuite extends SparkFunSuite with Logging {
test(s"$version: create client") {
client = null
System.gc() // Hack to avoid SEGV on some JVM versions.
+ val hadoopConf = new Configuration();
+ hadoopConf.set("test", "success")
client =
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = version,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
- hadoopConf = new Configuration(),
+ hadoopConf,
config = buildConf(),
ivyPath = ivyPath).createClient()
}
+ def table(database: String, tableName: String): CatalogTable = {
+ CatalogTable(
+ identifier = TableIdentifier(tableName, Some(database)),
+ tableType = CatalogTableType.MANAGED,
+ schema = Seq(CatalogColumn("key", "int")),
+ storage = CatalogStorageFormat(
+ locationUri = None,
+ inputFormat = Some(classOf[TextInputFormat].getName),
+ outputFormat = Some(classOf[HiveIgnoreKeyTextOutputFormat[_, _]].getName),
+ serde = Some(classOf[LazySimpleSerDe].getName()),
+ compressed = false,
+ serdeProperties = Map.empty
+ ))
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+ // Database related API
+ ///////////////////////////////////////////////////////////////////////////
+
+ val tempDatabasePath = Utils.createTempDir().getCanonicalPath
+
test(s"$version: createDatabase") {
- val db = CatalogDatabase("default", "desc", "loc", Map())
- client.createDatabase(db, ignoreIfExists = true)
+ val defaultDB = CatalogDatabase("default", "desc", "loc", Map())
+ client.createDatabase(defaultDB, ignoreIfExists = true)
+ val tempDB = CatalogDatabase(
+ "temporary", description = "test create", tempDatabasePath, Map())
+ client.createDatabase(tempDB, ignoreIfExists = true)
+ }
+
+ test(s"$version: setCurrentDatabase") {
+ client.setCurrentDatabase("default")
+ }
+
+ test(s"$version: getDatabase") {
+ // No exception should be thrown
+ client.getDatabase("default")
+ }
+
+ test(s"$version: getDatabaseOption") {
+ assert(client.getDatabaseOption("default").isDefined)
+ assert(client.getDatabaseOption("nonexist") == None)
}
+ test(s"$version: listDatabases") {
+ assert(client.listDatabases("defau.*") == Seq("default"))
+ }
+
+ test(s"$version: alterDatabase") {
+ val database = client.getDatabase("temporary").copy(properties = Map("flag" -> "true"))
+ client.alterDatabase(database)
+ assert(client.getDatabase("temporary").properties.contains("flag"))
+ }
+
+ test(s"$version: dropDatabase") {
+ assert(client.getDatabaseOption("temporary").isDefined)
+ client.dropDatabase("temporary", ignoreIfNotExists = false, cascade = true)
+ assert(client.getDatabaseOption("temporary").isEmpty)
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+ // Table related API
+ ///////////////////////////////////////////////////////////////////////////
+
test(s"$version: createTable") {
- val table =
- CatalogTable(
- identifier = TableIdentifier("src", Some("default")),
- tableType = CatalogTableType.MANAGED,
- schema = Seq(CatalogColumn("key", "int")),
- storage = CatalogStorageFormat(
- locationUri = None,
- inputFormat = Some(classOf[org.apache.hadoop.mapred.TextInputFormat].getName),
- outputFormat = Some(
- classOf[org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat[_, _]].getName),
- serde = Some(classOf[org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe].getName()),
- compressed = false,
- serdeProperties = Map.empty
- ))
-
- client.createTable(table, ignoreIfExists = false)
+ client.createTable(table("default", tableName = "src"), ignoreIfExists = false)
+ client.createTable(table("default", "temporary"), ignoreIfExists = false)
+ }
+
+ test(s"$version: loadTable") {
+ client.loadTable(
+ emptyDir,
+ tableName = "src",
+ replace = false,
+ holdDDLTime = false)
}
test(s"$version: getTable") {
+ // No exception should be thrown
client.getTable("default", "src")
}
- test(s"$version: listTables") {
- assert(client.listTables("default") === Seq("src"))
+ test(s"$version: getTableOption") {
+ assert(client.getTableOption("default", "src").isDefined)
}
- test(s"$version: getDatabase") {
- client.getDatabase("default")
+ test(s"$version: alterTable(table: CatalogTable)") {
+ val newTable = client.getTable("default", "src").copy(properties = Map("changed" -> ""))
+ client.alterTable(newTable)
+ assert(client.getTable("default", "src").properties.contains("changed"))
}
- test(s"$version: alterTable") {
- client.alterTable(client.getTable("default", "src"))
+ test(s"$version: alterTable(tableName: String, table: CatalogTable)") {
+ val newTable = client.getTable("default", "src").copy(properties = Map("changedAgain" -> ""))
+ client.alterTable("src", newTable)
+ assert(client.getTable("default", "src").properties.contains("changedAgain"))
}
- test(s"$version: set command") {
- client.runSqlHive("SET spark.sql.test.key=1")
+ test(s"$version: listTables(database)") {
+ assert(client.listTables("default") === Seq("src", "temporary"))
+ }
+
+ test(s"$version: listTables(database, pattern)") {
+ assert(client.listTables("default", pattern = "src") === Seq("src"))
+ assert(client.listTables("default", pattern = "nonexist").isEmpty)
+ }
+
+ test(s"$version: dropTable") {
+ client.dropTable("default", tableName = "temporary", ignoreIfNotExists = false)
+ assert(client.listTables("default") === Seq("src"))
}
- test(s"$version: create partitioned table DDL") {
- client.runSqlHive("CREATE TABLE src_part (value INT) PARTITIONED BY (key INT)")
- client.runSqlHive("ALTER TABLE src_part ADD PARTITION (key = '1')")
+ ///////////////////////////////////////////////////////////////////////////
+ // Partition related API
+ ///////////////////////////////////////////////////////////////////////////
+
+ val storageFormat = CatalogStorageFormat(
+ locationUri = None,
+ inputFormat = None,
+ outputFormat = None,
+ serde = None,
+ compressed = false,
+ serdeProperties = Map.empty)
+
+ test(s"$version: sql create partitioned table") {
+ client.runSqlHive("CREATE TABLE src_part (value INT) PARTITIONED BY (key1 INT, key2 INT)")
}
- test(s"$version: getPartitions") {
- client.getPartitions(client.getTable("default", "src_part"))
+ test(s"$version: createPartitions") {
+ val partition1 = CatalogTablePartition(Map("key1" -> "1", "key2" -> "1"), storageFormat)
+ val partition2 = CatalogTablePartition(Map("key1" -> "1", "key2" -> "2"), storageFormat)
+ client.createPartitions(
+ "default", "src_part", Seq(partition1, partition2), ignoreIfExists = true)
+ }
+
+ test(s"$version: getPartitions(catalogTable)") {
+ assert(2 == client.getPartitions(client.getTable("default", "src_part")).size)
}
test(s"$version: getPartitionsByFilter") {
- client.getPartitionsByFilter(client.getTable("default", "src_part"), Seq(EqualTo(
- AttributeReference("key", IntegerType, false)(NamedExpression.newExprId),
- Literal(1))))
+ // Only one partition [1, 1] for key2 == 1
+ val result = client.getPartitionsByFilter(client.getTable("default", "src_part"),
+ Seq(EqualTo(AttributeReference("key2", IntegerType)(), Literal(1))))
+
+ // Hive 0.12 doesn't support getPartitionsByFilter, it ignores the filter condition.
+ if (version != "0.12") {
+ assert(result.size == 1)
+ }
+ }
+
+ test(s"$version: getPartition") {
+ // No exception should be thrown
+ client.getPartition("default", "src_part", Map("key1" -> "1", "key2" -> "2"))
+ }
+
+ test(s"$version: getPartitionOption(db: String, table: String, spec: TablePartitionSpec)") {
+ val partition = client.getPartitionOption(
+ "default", "src_part", Map("key1" -> "1", "key2" -> "2"))
+ assert(partition.isDefined)
+ }
+
+ test(s"$version: getPartitionOption(table: CatalogTable, spec: TablePartitionSpec)") {
+ val partition = client.getPartitionOption(
+ client.getTable("default", "src_part"), Map("key1" -> "1", "key2" -> "2"))
+ assert(partition.isDefined)
+ }
+
+ test(s"$version: getPartitions(db: String, table: String)") {
+ assert(2 == client.getPartitions("default", "src_part", None).size)
}
test(s"$version: loadPartition") {
+ val partSpec = new java.util.LinkedHashMap[String, String]
+ partSpec.put("key1", "1")
+ partSpec.put("key2", "2")
+
client.loadPartition(
emptyDir,
"default.src_part",
partSpec,
- false,
- false,
- false,
- false)
- }
-
- test(s"$version: loadTable") {
- client.loadTable(
- emptyDir,
- "src",
- false,
- false)
+ replace = false,
+ holdDDLTime = false,
+ inheritTableSpecs = false,
+ isSkewedStoreAsSubdir = false)
}
test(s"$version: loadDynamicPartitions") {
+ val partSpec = new java.util.LinkedHashMap[String, String]
+ partSpec.put("key1", "1")
+ partSpec.put("key2", "") // Dynamic partition
+
client.loadDynamicPartitions(
emptyDir,
"default.src_part",
partSpec,
- false,
- 1,
+ replace = false,
+ numDP = 1,
false,
false)
}
- test(s"$version: create index and reset") {
+ test(s"$version: renamePartitions") {
+ val oldSpec = Map("key1" -> "1", "key2" -> "1")
+ val newSpec = Map("key1" -> "1", "key2" -> "3")
+ client.renamePartitions("default", "src_part", Seq(oldSpec), Seq(newSpec))
+
+ // Checks the existence of the new partition (key1 = 1, key2 = 3)
+ assert(client.getPartitionOption("default", "src_part", newSpec).isDefined)
+ }
+
+ test(s"$version: alterPartitions") {
+ val spec = Map("key1" -> "1", "key2" -> "2")
+ val newLocation = Utils.createTempDir().getPath()
+ val storage = storageFormat.copy(locationUri = Some(newLocation))
+ val partition = CatalogTablePartition(spec, storage)
+ client.alterPartitions("default", "src_part", Seq(partition))
+ assert(client.getPartition("default", "src_part", spec)
+ .storage.locationUri == Some(newLocation))
+ }
+
+ test(s"$version: dropPartitions") {
+ val spec = Map("key1" -> "1", "key2" -> "3")
+ client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true)
+ assert(client.getPartitionOption("default", "src_part", spec).isEmpty)
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+ // Function related API
+ ///////////////////////////////////////////////////////////////////////////
+
+ def function(name: String, className: String): CatalogFunction = {
+ CatalogFunction(
+ FunctionIdentifier(name, Some("default")), className, Seq.empty[FunctionResource])
+ }
+
+ test(s"$version: createFunction") {
+ val functionClass = "org.apache.spark.MyFunc1"
+ if (version == "0.12") {
+ // Hive 0.12 doesn't support creating permanent functions
+ intercept[AnalysisException] {
+ client.createFunction("default", function("func1", functionClass))
+ }
+ } else {
+ client.createFunction("default", function("func1", functionClass))
+ }
+ }
+
+ test(s"$version: functionExists") {
+ if (version == "0.12") {
+ // Hive 0.12 doesn't allow customized permanent functions
+ assert(client.functionExists("default", "func1") == false)
+ } else {
+ assert(client.functionExists("default", "func1") == true)
+ }
+ }
+
+ test(s"$version: renameFunction") {
+ if (version == "0.12") {
+ // Hive 0.12 doesn't allow customized permanent functions
+ intercept[NoSuchPermanentFunctionException] {
+ client.renameFunction("default", "func1", "func2")
+ }
+ } else {
+ client.renameFunction("default", "func1", "func2")
+ assert(client.functionExists("default", "func2") == true)
+ }
+ }
+
+ test(s"$version: alterFunction") {
+ val functionClass = "org.apache.spark.MyFunc2"
+ if (version == "0.12") {
+ // Hive 0.12 doesn't allow customized permanent functions
+ intercept[NoSuchPermanentFunctionException] {
+ client.alterFunction("default", function("func2", functionClass))
+ }
+ } else {
+ client.alterFunction("default", function("func2", functionClass))
+ }
+ }
+
+ test(s"$version: getFunction") {
+ if (version == "0.12") {
+ // Hive 0.12 doesn't allow customized permanent functions
+ intercept[NoSuchPermanentFunctionException] {
+ client.getFunction("default", "func2")
+ }
+ } else {
+ // No exception should be thrown
+ val func = client.getFunction("default", "func2")
+ assert(func.className == "org.apache.spark.MyFunc2")
+ }
+ }
+
+ test(s"$version: getFunctionOption") {
+ if (version == "0.12") {
+ // Hive 0.12 doesn't allow customized permanent functions
+ assert(client.getFunctionOption("default", "func2").isEmpty)
+ } else {
+ assert(client.getFunctionOption("default", "func2").isDefined)
+ }
+ }
+
+ test(s"$version: listFunctions") {
+ if (version == "0.12") {
+ // Hive 0.12 doesn't allow customized permanent functions
+ assert(client.listFunctions("default", "fun.*").isEmpty)
+ } else {
+ assert(client.listFunctions("default", "fun.*").size == 1)
+ }
+ }
+
+ test(s"$version: dropFunction") {
+ if (version == "0.12") {
+ // Hive 0.12 doesn't support creating permanent functions
+ intercept[NoSuchPermanentFunctionException] {
+ client.dropFunction("default", "func2")
+ }
+ } else {
+ // No exception should be thrown
+ client.dropFunction("default", "func2")
+ assert(client.listFunctions("default", "fun.*").size == 0)
+ }
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+ // SQL related API
+ ///////////////////////////////////////////////////////////////////////////
+
+ test(s"$version: sql set command") {
+ client.runSqlHive("SET spark.sql.test.key=1")
+ }
+
+ test(s"$version: sql create index and reset") {
client.runSqlHive("CREATE TABLE indexed_table (key INT)")
client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " +
"as 'COMPACT' WITH DEFERRED REBUILD")
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+ // Miscellaneous API
+ ///////////////////////////////////////////////////////////////////////////
+
+ test(s"$version: version") {
+ assert(client.version.fullVersion.startsWith(version))
+ }
+
+ test(s"$version: getConf") {
+ assert("success" === client.getConf("test", null))
+ }
+
+ test(s"$version: setOut") {
+ client.setOut(new PrintStream(new ByteArrayOutputStream()))
+ }
+
+ test(s"$version: setInfo") {
+ client.setInfo(new PrintStream(new ByteArrayOutputStream()))
+ }
+
+ test(s"$version: setError") {
+ client.setError(new PrintStream(new ByteArrayOutputStream()))
+ }
+
+ test(s"$version: newSession") {
+ val newClient = client.newSession()
+ assert(newClient != null)
+ }
+
+ test(s"$version: withHiveState and addJar") {
+ val newClassPath = "."
+ client.addJar(newClassPath)
+ client.withHiveState {
+ // No exception should be thrown.
+ // withHiveState changes the classloader to MutableURLClassLoader
+ val classLoader = Thread.currentThread().getContextClassLoader
+ .asInstanceOf[MutableURLClassLoader]
+
+ val urls = classLoader.getURLs()
+ urls.contains(new File(newClassPath).toURI.toURL)
+ }
+ }
+
+ test(s"$version: reset") {
+ // Clears all database, tables, functions...
client.reset()
+ assert(client.listTables("default").isEmpty)
}
}
}