aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorXiao Li <gatorsmile@gmail.com>2017-03-15 10:53:58 +0800
committerWenchen Fan <wenchen@databricks.com>2017-03-15 10:53:58 +0800
commitf9a93b1b4a20e7c72d900362b269edab66e73dd8 (patch)
treef816b258082a9a6f567bda055d1070de6b3ff370 /sql/hive
parentd1f6c64c4b763c05d6d79ae5497f298dc3835f3e (diff)
downloadspark-f9a93b1b4a20e7c72d900362b269edab66e73dd8.tar.gz
spark-f9a93b1b4a20e7c72d900362b269edab66e73dd8.tar.bz2
spark-f9a93b1b4a20e7c72d900362b269edab66e73dd8.zip
[SPARK-18112][SQL] Support reading data from Hive 2.1 metastore
### What changes were proposed in this pull request? This PR is to support reading data from Hive 2.1 metastore. Need to update shim class because of the Hive API changes caused by the following three Hive JIRAs: - [HIVE-12730 MetadataUpdater: provide a mechanism to edit the basic statistics of a table (or a partition)](https://issues.apache.org/jira/browse/HIVE-12730) - [Hive-13341 Stats state is not captured correctly: differentiate load table and create table](https://issues.apache.org/jira/browse/HIVE-13341) - [HIVE-13622 WriteSet tracking optimizations](https://issues.apache.org/jira/browse/HIVE-13622) There are three new fields added to Hive APIs. - `boolean hasFollowingStatsTask`. We always set it to `false`. This is to keep the existing behavior unchanged (starting from 0.13), no matter which Hive metastore client version users choose. If we set it to `true`, the basic table statistics is not collected by Hive. For example, ```SQL CREATE TABLE tbl AS SELECT 1 AS a ``` When setting `hasFollowingStatsTask ` to `false`, the table properties is like ``` Properties: [numFiles=1, transient_lastDdlTime=1489513927, totalSize=2] ``` When setting `hasFollowingStatsTask ` to `true`, the table properties is like ``` Properties: [transient_lastDdlTime=1489513563] ``` - `AcidUtils.Operation operation`. Obviously, we do not support ACID. Thus, we set it to `AcidUtils.Operation.NOT_ACID`. - `EnvironmentContext environmentContext`. So far, this is always set to `null`. This was introduced for supporting DDL `alter table s update statistics set ('numRows'='NaN')`. Using this DDL, users can specify the statistics. So far, our Spark SQL does not need it, because we use different table properties to store our generated statistics values. However, when Spark SQL issues ALTER TABLE DDL statements, Hive metastore always automatically invalidate the Hive-generated statistics. In the follow-up PR, we can fix it by explicitly adding a property to `environmentContext`. ```JAVA putToProperties(StatsSetupConst.STATS_GENERATED, StatsSetupConst.USER) ``` Another alternative is to set `DO_NOT_UPDATE_STATS`to `TRUE`. See the Hive JIRA: https://issues.apache.org/jira/browse/HIVE-15653. We will not address it in this PR. ### How was this patch tested? Added test cases to VersionsSuite.scala Author: Xiao Li <gatorsmile@gmail.com> Closes #17232 from gatorsmile/Hive21.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala5
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala181
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala19
7 files changed, 190 insertions, 25 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 8a3c81ac8b..33b21be372 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive
import java.io.IOException
import java.lang.reflect.InvocationTargetException
-import java.net.URI
import java.util
import scala.collection.mutable
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 6e1f429286..989fdc5564 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -97,6 +97,7 @@ private[hive] class HiveClientImpl(
case hive.v1_1 => new Shim_v1_1()
case hive.v1_2 => new Shim_v1_2()
case hive.v2_0 => new Shim_v2_0()
+ case hive.v2_1 => new Shim_v2_1()
}
// Create an internal session state for this HiveClientImpl.
@@ -455,7 +456,7 @@ private[hive] class HiveClientImpl(
val hiveTable = toHiveTable(table, Some(conf))
// Do not use `table.qualifiedName` here because this may be a rename
val qualifiedTableName = s"${table.database}.$tableName"
- client.alterTable(qualifiedTableName, hiveTable)
+ shim.alterTable(client, qualifiedTableName, hiveTable)
}
override def createPartitions(
@@ -535,7 +536,7 @@ private[hive] class HiveClientImpl(
table: String,
newParts: Seq[CatalogTablePartition]): Unit = withHiveState {
val hiveTable = toHiveTable(getTable(db, table), Some(conf))
- client.alterPartitions(table, newParts.map { p => toHivePartition(p, hiveTable) }.asJava)
+ shim.alterPartitions(client, table, newParts.map { p => toHivePartition(p, hiveTable) }.asJava)
}
/**
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 153f1673c9..76568f5990 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -28,8 +28,10 @@ import scala.util.control.NonFatal
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.metastore.api.{Function => HiveFunction, FunctionType, MetaException, PrincipalType, ResourceType, ResourceUri}
+import org.apache.hadoop.hive.metastore.api.{EnvironmentContext, Function => HiveFunction, FunctionType}
+import org.apache.hadoop.hive.metastore.api.{MetaException, PrincipalType, ResourceType, ResourceUri}
import org.apache.hadoop.hive.ql.Driver
+import org.apache.hadoop.hive.ql.io.AcidUtils
import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition, Table}
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory}
@@ -82,6 +84,10 @@ private[client] sealed abstract class Shim {
def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long
+ def alterTable(hive: Hive, tableName: String, table: Table): Unit
+
+ def alterPartitions(hive: Hive, tableName: String, newParts: JList[Partition]): Unit
+
def createPartitions(
hive: Hive,
db: String,
@@ -158,6 +164,10 @@ private[client] sealed abstract class Shim {
}
private[client] class Shim_v0_12 extends Shim with Logging {
+ // See HIVE-12224, HOLD_DDLTIME was broken as soon as it landed
+ protected lazy val holdDDLTime = JBoolean.FALSE
+ // deletes the underlying data along with metadata
+ protected lazy val deleteDataInDropIndex = JBoolean.TRUE
private lazy val startMethod =
findStaticMethod(
@@ -240,6 +250,18 @@ private[client] class Shim_v0_12 extends Shim with Logging {
classOf[String],
classOf[String],
JBoolean.TYPE)
+ private lazy val alterTableMethod =
+ findMethod(
+ classOf[Hive],
+ "alterTable",
+ classOf[String],
+ classOf[Table])
+ private lazy val alterPartitionsMethod =
+ findMethod(
+ classOf[Hive],
+ "alterPartitions",
+ classOf[String],
+ classOf[JList[Partition]])
override def setCurrentSessionState(state: SessionState): Unit = {
// Starting from Hive 0.13, setCurrentSessionState will internally override
@@ -341,7 +363,7 @@ private[client] class Shim_v0_12 extends Shim with Logging {
tableName: String,
replace: Boolean,
isSrcLocal: Boolean): Unit = {
- loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, JBoolean.FALSE)
+ loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime)
}
override def loadDynamicPartitions(
@@ -353,11 +375,11 @@ private[client] class Shim_v0_12 extends Shim with Logging {
numDP: Int,
listBucketingEnabled: Boolean): Unit = {
loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
- numDP: JInteger, JBoolean.FALSE, listBucketingEnabled: JBoolean)
+ numDP: JInteger, holdDDLTime, listBucketingEnabled: JBoolean)
}
override def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit = {
- dropIndexMethod.invoke(hive, dbName, tableName, indexName, true: JBoolean)
+ dropIndexMethod.invoke(hive, dbName, tableName, indexName, deleteDataInDropIndex)
}
override def dropTable(
@@ -373,6 +395,14 @@ private[client] class Shim_v0_12 extends Shim with Logging {
hive.dropTable(dbName, tableName, deleteData, ignoreIfNotExists)
}
+ override def alterTable(hive: Hive, tableName: String, table: Table): Unit = {
+ alterTableMethod.invoke(hive, tableName, table)
+ }
+
+ override def alterPartitions(hive: Hive, tableName: String, newParts: JList[Partition]): Unit = {
+ alterPartitionsMethod.invoke(hive, tableName, newParts)
+ }
+
override def dropPartition(
hive: Hive,
dbName: String,
@@ -520,7 +550,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
}
FunctionResource(FunctionResourceType.fromString(resourceType), uri.getUri())
}
- new CatalogFunction(name, hf.getClassName, resources)
+ CatalogFunction(name, hf.getClassName, resources)
}
override def getFunctionOption(hive: Hive, db: String, name: String): Option[CatalogFunction] = {
@@ -638,6 +668,11 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
private[client] class Shim_v0_14 extends Shim_v0_13 {
+ // true if this is an ACID operation
+ protected lazy val isAcid = JBoolean.FALSE
+ // true if list bucketing enabled
+ protected lazy val isSkewedStoreAsSubdir = JBoolean.FALSE
+
private lazy val loadPartitionMethod =
findMethod(
classOf[Hive],
@@ -700,8 +735,8 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
isSkewedStoreAsSubdir: Boolean,
isSrcLocal: Boolean): Unit = {
loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
- JBoolean.FALSE, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
- isSrcLocal: JBoolean, JBoolean.FALSE)
+ holdDDLTime, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
+ isSrcLocal: JBoolean, isAcid)
}
override def loadTable(
@@ -710,8 +745,8 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
tableName: String,
replace: Boolean,
isSrcLocal: Boolean): Unit = {
- loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, JBoolean.FALSE,
- isSrcLocal: JBoolean, JBoolean.FALSE, JBoolean.FALSE)
+ loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime,
+ isSrcLocal: JBoolean, isSkewedStoreAsSubdir, isAcid)
}
override def loadDynamicPartitions(
@@ -723,7 +758,7 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
numDP: Int,
listBucketingEnabled: Boolean): Unit = {
loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
- numDP: JInteger, JBoolean.FALSE, listBucketingEnabled: JBoolean, JBoolean.FALSE)
+ numDP: JInteger, holdDDLTime, listBucketingEnabled: JBoolean, isAcid)
}
override def dropTable(
@@ -752,6 +787,9 @@ private[client] class Shim_v1_0 extends Shim_v0_14 {
private[client] class Shim_v1_1 extends Shim_v1_0 {
+ // throws an exception if the index does not exist
+ protected lazy val throwExceptionInDropIndex = JBoolean.TRUE
+
private lazy val dropIndexMethod =
findMethod(
classOf[Hive],
@@ -763,13 +801,17 @@ private[client] class Shim_v1_1 extends Shim_v1_0 {
JBoolean.TYPE)
override def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit = {
- dropIndexMethod.invoke(hive, dbName, tableName, indexName, true: JBoolean, true: JBoolean)
+ dropIndexMethod.invoke(hive, dbName, tableName, indexName, throwExceptionInDropIndex,
+ deleteDataInDropIndex)
}
}
private[client] class Shim_v1_2 extends Shim_v1_1 {
+ // txnId can be 0 unless isAcid == true
+ protected lazy val txnIdInLoadDynamicPartitions: JLong = 0L
+
private lazy val loadDynamicPartitionsMethod =
findMethod(
classOf[Hive],
@@ -806,8 +848,8 @@ private[client] class Shim_v1_2 extends Shim_v1_1 {
numDP: Int,
listBucketingEnabled: Boolean): Unit = {
loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
- numDP: JInteger, JBoolean.FALSE, listBucketingEnabled: JBoolean, JBoolean.FALSE,
- 0L: JLong)
+ numDP: JInteger, holdDDLTime, listBucketingEnabled: JBoolean, isAcid,
+ txnIdInLoadDynamicPartitions)
}
override def dropPartition(
@@ -872,7 +914,106 @@ private[client] class Shim_v2_0 extends Shim_v1_2 {
isSrcLocal: Boolean): Unit = {
loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
- isSrcLocal: JBoolean, JBoolean.FALSE)
+ isSrcLocal: JBoolean, isAcid)
+ }
+
+ override def loadTable(
+ hive: Hive,
+ loadPath: Path,
+ tableName: String,
+ replace: Boolean,
+ isSrcLocal: Boolean): Unit = {
+ loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, isSrcLocal: JBoolean,
+ isSkewedStoreAsSubdir, isAcid)
+ }
+
+ override def loadDynamicPartitions(
+ hive: Hive,
+ loadPath: Path,
+ tableName: String,
+ partSpec: JMap[String, String],
+ replace: Boolean,
+ numDP: Int,
+ listBucketingEnabled: Boolean): Unit = {
+ loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
+ numDP: JInteger, listBucketingEnabled: JBoolean, isAcid, txnIdInLoadDynamicPartitions)
+ }
+
+}
+
+private[client] class Shim_v2_1 extends Shim_v2_0 {
+
+ // true if there is any following stats task
+ protected lazy val hasFollowingStatsTask = JBoolean.FALSE
+ // TODO: Now, always set environmentContext to null. In the future, we should avoid setting
+ // hive-generated stats to -1 when altering tables by using environmentContext. See Hive-12730
+ protected lazy val environmentContextInAlterTable = null
+
+ private lazy val loadPartitionMethod =
+ findMethod(
+ classOf[Hive],
+ "loadPartition",
+ classOf[Path],
+ classOf[String],
+ classOf[JMap[String, String]],
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE)
+ private lazy val loadTableMethod =
+ findMethod(
+ classOf[Hive],
+ "loadTable",
+ classOf[Path],
+ classOf[String],
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE)
+ private lazy val loadDynamicPartitionsMethod =
+ findMethod(
+ classOf[Hive],
+ "loadDynamicPartitions",
+ classOf[Path],
+ classOf[String],
+ classOf[JMap[String, String]],
+ JBoolean.TYPE,
+ JInteger.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JLong.TYPE,
+ JBoolean.TYPE,
+ classOf[AcidUtils.Operation])
+ private lazy val alterTableMethod =
+ findMethod(
+ classOf[Hive],
+ "alterTable",
+ classOf[String],
+ classOf[Table],
+ classOf[EnvironmentContext])
+ private lazy val alterPartitionsMethod =
+ findMethod(
+ classOf[Hive],
+ "alterPartitions",
+ classOf[String],
+ classOf[JList[Partition]],
+ classOf[EnvironmentContext])
+
+ override def loadPartition(
+ hive: Hive,
+ loadPath: Path,
+ tableName: String,
+ partSpec: JMap[String, String],
+ replace: Boolean,
+ inheritTableSpecs: Boolean,
+ isSkewedStoreAsSubdir: Boolean,
+ isSrcLocal: Boolean): Unit = {
+ loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
+ inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
+ isSrcLocal: JBoolean, isAcid, hasFollowingStatsTask)
}
override def loadTable(
@@ -882,7 +1023,7 @@ private[client] class Shim_v2_0 extends Shim_v1_2 {
replace: Boolean,
isSrcLocal: Boolean): Unit = {
loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, isSrcLocal: JBoolean,
- JBoolean.FALSE, JBoolean.FALSE)
+ isSkewedStoreAsSubdir, isAcid, hasFollowingStatsTask)
}
override def loadDynamicPartitions(
@@ -894,7 +1035,15 @@ private[client] class Shim_v2_0 extends Shim_v1_2 {
numDP: Int,
listBucketingEnabled: Boolean): Unit = {
loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
- numDP: JInteger, listBucketingEnabled: JBoolean, JBoolean.FALSE, 0L: JLong)
+ numDP: JInteger, listBucketingEnabled: JBoolean, isAcid, txnIdInLoadDynamicPartitions,
+ hasFollowingStatsTask, AcidUtils.Operation.NOT_ACID)
}
+ override def alterTable(hive: Hive, tableName: String, table: Table): Unit = {
+ alterTableMethod.invoke(hive, tableName, table, environmentContextInAlterTable)
+ }
+
+ override def alterPartitions(hive: Hive, tableName: String, newParts: JList[Partition]): Unit = {
+ alterPartitionsMethod.invoke(hive, tableName, newParts, environmentContextInAlterTable)
+ }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index 6f69a4adf2..e95f9ea480 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -95,6 +95,7 @@ private[hive] object IsolatedClientLoader extends Logging {
case "1.1" | "1.1.0" => hive.v1_1
case "1.2" | "1.2.0" | "1.2.1" => hive.v1_2
case "2.0" | "2.0.0" | "2.0.1" => hive.v2_0
+ case "2.1" | "2.1.0" | "2.1.1" => hive.v2_1
}
private def downloadVersion(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
index 790ad74e66..f9635e3654 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
@@ -67,7 +67,11 @@ package object client {
exclusions = Seq("org.apache.curator:*",
"org.pentaho:pentaho-aggdesigner-algorithm"))
- val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0)
+ case object v2_1 extends HiveVersion("2.1.1",
+ exclusions = Seq("org.apache.curator:*",
+ "org.pentaho:pentaho-aggdesigner-algorithm"))
+
+ val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1)
}
// scalastyle:on
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index b8536d0c1b..3682dc8507 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -149,7 +149,7 @@ case class InsertIntoHiveTable(
// staging directory under the table director for Hive prior to 1.1, the staging directory will
// be removed by Hive when Hive is trying to empty the table directory.
val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
- val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, v1_2, v2_0)
+ val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, v1_2, v2_0, v2_1)
// Ensure all the supported versions are considered here.
assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index cb13861110..7aff49c0fc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -21,6 +21,7 @@ import java.io.{ByteArrayOutputStream, File, PrintStream}
import java.net.URI
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.mapred.TextInputFormat
@@ -108,7 +109,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'")
}
- private val versions = Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0")
+ private val versions = Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1")
private var client: HiveClient = null
@@ -120,10 +121,12 @@ class VersionsSuite extends SparkFunSuite with Logging {
System.gc() // Hack to avoid SEGV on some JVM versions.
val hadoopConf = new Configuration()
hadoopConf.set("test", "success")
- // Hive changed the default of datanucleus.schema.autoCreateAll from true to false since 2.0
- // For details, see the JIRA HIVE-6113
- if (version == "2.0") {
+ // Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
+ // hive.metastore.schema.verification from false to true since 2.0
+ // For details, see the JIRA HIVE-6113 and HIVE-12463
+ if (version == "2.0" || version == "2.1") {
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
+ hadoopConf.set("hive.metastore.schema.verification", "false")
}
client = buildClient(version, hadoopConf, HiveUtils.hiveClientConfigurations(hadoopConf))
if (versionSpark != null) versionSpark.reset()
@@ -572,6 +575,14 @@ class VersionsSuite extends SparkFunSuite with Logging {
withTable("tbl") {
versionSpark.sql("CREATE TABLE tbl AS SELECT 1 AS a")
assert(versionSpark.table("tbl").collect().toSeq == Seq(Row(1)))
+ val tableMeta = versionSpark.sessionState.catalog.getTableMetadata(TableIdentifier("tbl"))
+ val totalSize = tableMeta.properties.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+ // Except 0.12, all the following versions will fill the Hive-generated statistics
+ if (version == "0.12") {
+ assert(totalSize.isEmpty)
+ } else {
+ assert(totalSize.nonEmpty && totalSize.get > 0)
+ }
}
}