aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala5
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala181
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala19
7 files changed, 190 insertions, 25 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 8a3c81ac8b..33b21be372 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive
import java.io.IOException
import java.lang.reflect.InvocationTargetException
-import java.net.URI
import java.util
import scala.collection.mutable
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 6e1f429286..989fdc5564 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -97,6 +97,7 @@ private[hive] class HiveClientImpl(
case hive.v1_1 => new Shim_v1_1()
case hive.v1_2 => new Shim_v1_2()
case hive.v2_0 => new Shim_v2_0()
+ case hive.v2_1 => new Shim_v2_1()
}
// Create an internal session state for this HiveClientImpl.
@@ -455,7 +456,7 @@ private[hive] class HiveClientImpl(
val hiveTable = toHiveTable(table, Some(conf))
// Do not use `table.qualifiedName` here because this may be a rename
val qualifiedTableName = s"${table.database}.$tableName"
- client.alterTable(qualifiedTableName, hiveTable)
+ shim.alterTable(client, qualifiedTableName, hiveTable)
}
override def createPartitions(
@@ -535,7 +536,7 @@ private[hive] class HiveClientImpl(
table: String,
newParts: Seq[CatalogTablePartition]): Unit = withHiveState {
val hiveTable = toHiveTable(getTable(db, table), Some(conf))
- client.alterPartitions(table, newParts.map { p => toHivePartition(p, hiveTable) }.asJava)
+ shim.alterPartitions(client, table, newParts.map { p => toHivePartition(p, hiveTable) }.asJava)
}
/**
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 153f1673c9..76568f5990 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -28,8 +28,10 @@ import scala.util.control.NonFatal
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.metastore.api.{Function => HiveFunction, FunctionType, MetaException, PrincipalType, ResourceType, ResourceUri}
+import org.apache.hadoop.hive.metastore.api.{EnvironmentContext, Function => HiveFunction, FunctionType}
+import org.apache.hadoop.hive.metastore.api.{MetaException, PrincipalType, ResourceType, ResourceUri}
import org.apache.hadoop.hive.ql.Driver
+import org.apache.hadoop.hive.ql.io.AcidUtils
import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition, Table}
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory}
@@ -82,6 +84,10 @@ private[client] sealed abstract class Shim {
def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long
+ def alterTable(hive: Hive, tableName: String, table: Table): Unit
+
+ def alterPartitions(hive: Hive, tableName: String, newParts: JList[Partition]): Unit
+
def createPartitions(
hive: Hive,
db: String,
@@ -158,6 +164,10 @@ private[client] sealed abstract class Shim {
}
private[client] class Shim_v0_12 extends Shim with Logging {
+ // See HIVE-12224, HOLD_DDLTIME was broken as soon as it landed
+ protected lazy val holdDDLTime = JBoolean.FALSE
+ // deletes the underlying data along with metadata
+ protected lazy val deleteDataInDropIndex = JBoolean.TRUE
private lazy val startMethod =
findStaticMethod(
@@ -240,6 +250,18 @@ private[client] class Shim_v0_12 extends Shim with Logging {
classOf[String],
classOf[String],
JBoolean.TYPE)
+ private lazy val alterTableMethod =
+ findMethod(
+ classOf[Hive],
+ "alterTable",
+ classOf[String],
+ classOf[Table])
+ private lazy val alterPartitionsMethod =
+ findMethod(
+ classOf[Hive],
+ "alterPartitions",
+ classOf[String],
+ classOf[JList[Partition]])
override def setCurrentSessionState(state: SessionState): Unit = {
// Starting from Hive 0.13, setCurrentSessionState will internally override
@@ -341,7 +363,7 @@ private[client] class Shim_v0_12 extends Shim with Logging {
tableName: String,
replace: Boolean,
isSrcLocal: Boolean): Unit = {
- loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, JBoolean.FALSE)
+ loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime)
}
override def loadDynamicPartitions(
@@ -353,11 +375,11 @@ private[client] class Shim_v0_12 extends Shim with Logging {
numDP: Int,
listBucketingEnabled: Boolean): Unit = {
loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
- numDP: JInteger, JBoolean.FALSE, listBucketingEnabled: JBoolean)
+ numDP: JInteger, holdDDLTime, listBucketingEnabled: JBoolean)
}
override def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit = {
- dropIndexMethod.invoke(hive, dbName, tableName, indexName, true: JBoolean)
+ dropIndexMethod.invoke(hive, dbName, tableName, indexName, deleteDataInDropIndex)
}
override def dropTable(
@@ -373,6 +395,14 @@ private[client] class Shim_v0_12 extends Shim with Logging {
hive.dropTable(dbName, tableName, deleteData, ignoreIfNotExists)
}
+ override def alterTable(hive: Hive, tableName: String, table: Table): Unit = {
+ alterTableMethod.invoke(hive, tableName, table)
+ }
+
+ override def alterPartitions(hive: Hive, tableName: String, newParts: JList[Partition]): Unit = {
+ alterPartitionsMethod.invoke(hive, tableName, newParts)
+ }
+
override def dropPartition(
hive: Hive,
dbName: String,
@@ -520,7 +550,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
}
FunctionResource(FunctionResourceType.fromString(resourceType), uri.getUri())
}
- new CatalogFunction(name, hf.getClassName, resources)
+ CatalogFunction(name, hf.getClassName, resources)
}
override def getFunctionOption(hive: Hive, db: String, name: String): Option[CatalogFunction] = {
@@ -638,6 +668,11 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
private[client] class Shim_v0_14 extends Shim_v0_13 {
+ // true if this is an ACID operation
+ protected lazy val isAcid = JBoolean.FALSE
+ // true if list bucketing enabled
+ protected lazy val isSkewedStoreAsSubdir = JBoolean.FALSE
+
private lazy val loadPartitionMethod =
findMethod(
classOf[Hive],
@@ -700,8 +735,8 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
isSkewedStoreAsSubdir: Boolean,
isSrcLocal: Boolean): Unit = {
loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
- JBoolean.FALSE, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
- isSrcLocal: JBoolean, JBoolean.FALSE)
+ holdDDLTime, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
+ isSrcLocal: JBoolean, isAcid)
}
override def loadTable(
@@ -710,8 +745,8 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
tableName: String,
replace: Boolean,
isSrcLocal: Boolean): Unit = {
- loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, JBoolean.FALSE,
- isSrcLocal: JBoolean, JBoolean.FALSE, JBoolean.FALSE)
+ loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime,
+ isSrcLocal: JBoolean, isSkewedStoreAsSubdir, isAcid)
}
override def loadDynamicPartitions(
@@ -723,7 +758,7 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
numDP: Int,
listBucketingEnabled: Boolean): Unit = {
loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
- numDP: JInteger, JBoolean.FALSE, listBucketingEnabled: JBoolean, JBoolean.FALSE)
+ numDP: JInteger, holdDDLTime, listBucketingEnabled: JBoolean, isAcid)
}
override def dropTable(
@@ -752,6 +787,9 @@ private[client] class Shim_v1_0 extends Shim_v0_14 {
private[client] class Shim_v1_1 extends Shim_v1_0 {
+ // throws an exception if the index does not exist
+ protected lazy val throwExceptionInDropIndex = JBoolean.TRUE
+
private lazy val dropIndexMethod =
findMethod(
classOf[Hive],
@@ -763,13 +801,17 @@ private[client] class Shim_v1_1 extends Shim_v1_0 {
JBoolean.TYPE)
override def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit = {
- dropIndexMethod.invoke(hive, dbName, tableName, indexName, true: JBoolean, true: JBoolean)
+ dropIndexMethod.invoke(hive, dbName, tableName, indexName, throwExceptionInDropIndex,
+ deleteDataInDropIndex)
}
}
private[client] class Shim_v1_2 extends Shim_v1_1 {
+ // txnId can be 0 unless isAcid == true
+ protected lazy val txnIdInLoadDynamicPartitions: JLong = 0L
+
private lazy val loadDynamicPartitionsMethod =
findMethod(
classOf[Hive],
@@ -806,8 +848,8 @@ private[client] class Shim_v1_2 extends Shim_v1_1 {
numDP: Int,
listBucketingEnabled: Boolean): Unit = {
loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
- numDP: JInteger, JBoolean.FALSE, listBucketingEnabled: JBoolean, JBoolean.FALSE,
- 0L: JLong)
+ numDP: JInteger, holdDDLTime, listBucketingEnabled: JBoolean, isAcid,
+ txnIdInLoadDynamicPartitions)
}
override def dropPartition(
@@ -872,7 +914,106 @@ private[client] class Shim_v2_0 extends Shim_v1_2 {
isSrcLocal: Boolean): Unit = {
loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
- isSrcLocal: JBoolean, JBoolean.FALSE)
+ isSrcLocal: JBoolean, isAcid)
+ }
+
+ override def loadTable(
+ hive: Hive,
+ loadPath: Path,
+ tableName: String,
+ replace: Boolean,
+ isSrcLocal: Boolean): Unit = {
+ loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, isSrcLocal: JBoolean,
+ isSkewedStoreAsSubdir, isAcid)
+ }
+
+ override def loadDynamicPartitions(
+ hive: Hive,
+ loadPath: Path,
+ tableName: String,
+ partSpec: JMap[String, String],
+ replace: Boolean,
+ numDP: Int,
+ listBucketingEnabled: Boolean): Unit = {
+ loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
+ numDP: JInteger, listBucketingEnabled: JBoolean, isAcid, txnIdInLoadDynamicPartitions)
+ }
+
+}
+
+private[client] class Shim_v2_1 extends Shim_v2_0 {
+
+ // true if there is any following stats task
+ protected lazy val hasFollowingStatsTask = JBoolean.FALSE
+ // TODO: Now, always set environmentContext to null. In the future, we should avoid setting
+ // hive-generated stats to -1 when altering tables by using environmentContext. See Hive-12730
+ protected lazy val environmentContextInAlterTable = null
+
+ private lazy val loadPartitionMethod =
+ findMethod(
+ classOf[Hive],
+ "loadPartition",
+ classOf[Path],
+ classOf[String],
+ classOf[JMap[String, String]],
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE)
+ private lazy val loadTableMethod =
+ findMethod(
+ classOf[Hive],
+ "loadTable",
+ classOf[Path],
+ classOf[String],
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE)
+ private lazy val loadDynamicPartitionsMethod =
+ findMethod(
+ classOf[Hive],
+ "loadDynamicPartitions",
+ classOf[Path],
+ classOf[String],
+ classOf[JMap[String, String]],
+ JBoolean.TYPE,
+ JInteger.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JLong.TYPE,
+ JBoolean.TYPE,
+ classOf[AcidUtils.Operation])
+ private lazy val alterTableMethod =
+ findMethod(
+ classOf[Hive],
+ "alterTable",
+ classOf[String],
+ classOf[Table],
+ classOf[EnvironmentContext])
+ private lazy val alterPartitionsMethod =
+ findMethod(
+ classOf[Hive],
+ "alterPartitions",
+ classOf[String],
+ classOf[JList[Partition]],
+ classOf[EnvironmentContext])
+
+ override def loadPartition(
+ hive: Hive,
+ loadPath: Path,
+ tableName: String,
+ partSpec: JMap[String, String],
+ replace: Boolean,
+ inheritTableSpecs: Boolean,
+ isSkewedStoreAsSubdir: Boolean,
+ isSrcLocal: Boolean): Unit = {
+ loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
+ inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
+ isSrcLocal: JBoolean, isAcid, hasFollowingStatsTask)
}
override def loadTable(
@@ -882,7 +1023,7 @@ private[client] class Shim_v2_0 extends Shim_v1_2 {
replace: Boolean,
isSrcLocal: Boolean): Unit = {
loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, isSrcLocal: JBoolean,
- JBoolean.FALSE, JBoolean.FALSE)
+ isSkewedStoreAsSubdir, isAcid, hasFollowingStatsTask)
}
override def loadDynamicPartitions(
@@ -894,7 +1035,15 @@ private[client] class Shim_v2_0 extends Shim_v1_2 {
numDP: Int,
listBucketingEnabled: Boolean): Unit = {
loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
- numDP: JInteger, listBucketingEnabled: JBoolean, JBoolean.FALSE, 0L: JLong)
+ numDP: JInteger, listBucketingEnabled: JBoolean, isAcid, txnIdInLoadDynamicPartitions,
+ hasFollowingStatsTask, AcidUtils.Operation.NOT_ACID)
}
+ override def alterTable(hive: Hive, tableName: String, table: Table): Unit = {
+ alterTableMethod.invoke(hive, tableName, table, environmentContextInAlterTable)
+ }
+
+ override def alterPartitions(hive: Hive, tableName: String, newParts: JList[Partition]): Unit = {
+ alterPartitionsMethod.invoke(hive, tableName, newParts, environmentContextInAlterTable)
+ }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index 6f69a4adf2..e95f9ea480 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -95,6 +95,7 @@ private[hive] object IsolatedClientLoader extends Logging {
case "1.1" | "1.1.0" => hive.v1_1
case "1.2" | "1.2.0" | "1.2.1" => hive.v1_2
case "2.0" | "2.0.0" | "2.0.1" => hive.v2_0
+ case "2.1" | "2.1.0" | "2.1.1" => hive.v2_1
}
private def downloadVersion(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
index 790ad74e66..f9635e3654 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
@@ -67,7 +67,11 @@ package object client {
exclusions = Seq("org.apache.curator:*",
"org.pentaho:pentaho-aggdesigner-algorithm"))
- val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0)
+ case object v2_1 extends HiveVersion("2.1.1",
+ exclusions = Seq("org.apache.curator:*",
+ "org.pentaho:pentaho-aggdesigner-algorithm"))
+
+ val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1)
}
// scalastyle:on
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index b8536d0c1b..3682dc8507 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -149,7 +149,7 @@ case class InsertIntoHiveTable(
// staging directory under the table director for Hive prior to 1.1, the staging directory will
// be removed by Hive when Hive is trying to empty the table directory.
val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
- val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, v1_2, v2_0)
+ val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, v1_2, v2_0, v2_1)
// Ensure all the supported versions are considered here.
assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index cb13861110..7aff49c0fc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -21,6 +21,7 @@ import java.io.{ByteArrayOutputStream, File, PrintStream}
import java.net.URI
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.mapred.TextInputFormat
@@ -108,7 +109,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'")
}
- private val versions = Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0")
+ private val versions = Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1")
private var client: HiveClient = null
@@ -120,10 +121,12 @@ class VersionsSuite extends SparkFunSuite with Logging {
System.gc() // Hack to avoid SEGV on some JVM versions.
val hadoopConf = new Configuration()
hadoopConf.set("test", "success")
- // Hive changed the default of datanucleus.schema.autoCreateAll from true to false since 2.0
- // For details, see the JIRA HIVE-6113
- if (version == "2.0") {
+ // Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
+ // hive.metastore.schema.verification from false to true since 2.0
+ // For details, see the JIRA HIVE-6113 and HIVE-12463
+ if (version == "2.0" || version == "2.1") {
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
+ hadoopConf.set("hive.metastore.schema.verification", "false")
}
client = buildClient(version, hadoopConf, HiveUtils.hiveClientConfigurations(hadoopConf))
if (versionSpark != null) versionSpark.reset()
@@ -572,6 +575,14 @@ class VersionsSuite extends SparkFunSuite with Logging {
withTable("tbl") {
versionSpark.sql("CREATE TABLE tbl AS SELECT 1 AS a")
assert(versionSpark.table("tbl").collect().toSeq == Seq(Row(1)))
+ val tableMeta = versionSpark.sessionState.catalog.getTableMetadata(TableIdentifier("tbl"))
+ val totalSize = tableMeta.properties.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+ // Except 0.12, all the following versions will fill the Hive-generated statistics
+ if (version == "0.12") {
+ assert(totalSize.isEmpty)
+ } else {
+ assert(totalSize.nonEmpty && totalSize.get > 0)
+ }
}
}