aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-03-24 22:59:35 -0700
committerAndrew Or <andrew@databricks.com>2016-03-24 22:59:35 -0700
commit20ddf5fddf40b543edc61d6e4687988489dea64c (patch)
tree9bfac66a6e754af29b06ce5529a473795c005a8e /sql/hive
parent1c70b7650f21fc51a07db1e4f28cebbc1fb47e94 (diff)
downloadspark-20ddf5fddf40b543edc61d6e4687988489dea64c.tar.gz
spark-20ddf5fddf40b543edc61d6e4687988489dea64c.tar.bz2
spark-20ddf5fddf40b543edc61d6e4687988489dea64c.zip
[SPARK-14014][SQL] Integrate session catalog (attempt #2)
## What changes were proposed in this pull request? This reopens #11836, which was merged but promptly reverted because it introduced flaky Hive tests. ## How was this patch tested? See `CatalogTestCases`, `SessionCatalogSuite` and `HiveContextSuite`. Author: Andrew Or <andrew@databricks.com> Closes #11938 from andrewor14/session-catalog-again.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala23
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala5
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala498
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala60
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala104
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala14
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala172
-rw-r--r--sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java5
-rw-r--r--sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala38
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/ExpressionToSQLSuite.scala11
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala37
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala7
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala10
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala17
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala15
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala32
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala12
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala16
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala7
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala25
33 files changed, 755 insertions, 417 deletions
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 05f59f1545..650797f768 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -60,16 +60,19 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
}
override def afterAll() {
- TestHive.cacheTables = false
- TimeZone.setDefault(originalTimeZone)
- Locale.setDefault(originalLocale)
- TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize)
- TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning)
- TestHive.sessionState.functionRegistry.restore()
-
- // For debugging dump some statistics about how much time was spent in various optimizer rules.
- logWarning(RuleExecutor.dumpTimeSpent())
- super.afterAll()
+ try {
+ TestHive.cacheTables = false
+ TimeZone.setDefault(originalTimeZone)
+ Locale.setDefault(originalLocale)
+ TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize)
+ TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning)
+ TestHive.sessionState.functionRegistry.restore()
+
+ // For debugging dump some statistics about how much time was spent in various optimizer rules.
+ logWarning(RuleExecutor.dumpTimeSpent())
+ } finally {
+ super.afterAll()
+ }
}
/** A list of tests deemed out of scope currently and thus completely disregarded. */
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
index 491f2aebb4..0722fb02a8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
@@ -85,7 +85,6 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit
withClient { getTable(db, table) }
}
-
// --------------------------------------------------------------------------
// Databases
// --------------------------------------------------------------------------
@@ -182,6 +181,10 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit
client.getTable(db, table)
}
+ override def tableExists(db: String, table: String): Boolean = withClient {
+ client.getTableOption(db, table).isDefined
+ }
+
override def listTables(db: String): Seq[String] = withClient {
requireDbExists(db)
client.listTables(db)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 914f8e9a98..ca3ce43591 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -28,6 +28,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import scala.language.implicitConversions
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.common.`type`.HiveDecimal
@@ -38,7 +39,7 @@ import org.apache.hadoop.hive.ql.parse.VariableSubstitution
import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
import org.apache.hadoop.util.VersionInfo
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
@@ -52,6 +53,7 @@ import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand}
import org.apache.spark.sql.execution.ui.SQLListener
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.SQLConfEntry
import org.apache.spark.sql.internal.SQLConf.SQLConfEntry._
import org.apache.spark.sql.types._
@@ -67,7 +69,7 @@ private[hive] case class CurrentDatabase(ctx: HiveContext)
override def foldable: Boolean = true
override def nullable: Boolean = false
override def eval(input: InternalRow): Any = {
- UTF8String.fromString(ctx.metadataHive.currentDatabase)
+ UTF8String.fromString(ctx.sessionState.catalog.getCurrentDatabase)
}
}
@@ -81,15 +83,31 @@ class HiveContext private[hive](
sc: SparkContext,
cacheManager: CacheManager,
listener: SQLListener,
- @transient private val execHive: HiveClientImpl,
- @transient private val metaHive: HiveClient,
- isRootContext: Boolean)
- extends SQLContext(sc, cacheManager, listener, isRootContext) with Logging {
+ @transient private[hive] val executionHive: HiveClientImpl,
+ @transient private[hive] val metadataHive: HiveClient,
+ isRootContext: Boolean,
+ @transient private[sql] val hiveCatalog: HiveCatalog)
+ extends SQLContext(sc, cacheManager, listener, isRootContext, hiveCatalog) with Logging {
self =>
+ private def this(sc: SparkContext, execHive: HiveClientImpl, metaHive: HiveClient) {
+ this(
+ sc,
+ new CacheManager,
+ SQLContext.createListenerAndUI(sc),
+ execHive,
+ metaHive,
+ true,
+ new HiveCatalog(metaHive))
+ }
+
def this(sc: SparkContext) = {
- this(sc, new CacheManager, SQLContext.createListenerAndUI(sc), null, null, true)
+ this(
+ sc,
+ HiveContext.newClientForExecution(sc.conf, sc.hadoopConfiguration),
+ HiveContext.newClientForMetadata(sc.conf, sc.hadoopConfiguration))
}
+
def this(sc: JavaSparkContext) = this(sc.sc)
import org.apache.spark.sql.hive.HiveContext._
@@ -106,9 +124,10 @@ class HiveContext private[hive](
sc = sc,
cacheManager = cacheManager,
listener = listener,
- execHive = executionHive.newSession(),
- metaHive = metadataHive.newSession(),
- isRootContext = false)
+ executionHive = executionHive.newSession(),
+ metadataHive = metadataHive.newSession(),
+ isRootContext = false,
+ hiveCatalog = hiveCatalog)
}
@transient
@@ -149,41 +168,6 @@ class HiveContext private[hive](
*/
protected[sql] def convertCTAS: Boolean = getConf(CONVERT_CTAS)
- /**
- * The version of the hive client that will be used to communicate with the metastore. Note that
- * this does not necessarily need to be the same version of Hive that is used internally by
- * Spark SQL for execution.
- */
- protected[hive] def hiveMetastoreVersion: String = getConf(HIVE_METASTORE_VERSION)
-
- /**
- * The location of the jars that should be used to instantiate the HiveMetastoreClient. This
- * property can be one of three options:
- * - a classpath in the standard format for both hive and hadoop.
- * - builtin - attempt to discover the jars that were used to load Spark SQL and use those. This
- * option is only valid when using the execution version of Hive.
- * - maven - download the correct version of hive on demand from maven.
- */
- protected[hive] def hiveMetastoreJars: String = getConf(HIVE_METASTORE_JARS)
-
- /**
- * A comma separated list of class prefixes that should be loaded using the classloader that
- * is shared between Spark SQL and a specific version of Hive. An example of classes that should
- * be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need
- * to be shared are those that interact with classes that are already shared. For example,
- * custom appenders that are used by log4j.
- */
- protected[hive] def hiveMetastoreSharedPrefixes: Seq[String] =
- getConf(HIVE_METASTORE_SHARED_PREFIXES).filterNot(_ == "")
-
- /**
- * A comma separated list of class prefixes that should explicitly be reloaded for each version
- * of Hive that Spark SQL is communicating with. For example, Hive UDFs that are declared in a
- * prefix that typically would be shared (i.e. org.apache.spark.*)
- */
- protected[hive] def hiveMetastoreBarrierPrefixes: Seq[String] =
- getConf(HIVE_METASTORE_BARRIER_PREFIXES).filterNot(_ == "")
-
/*
* hive thrift server use background spark sql thread pool to execute sql queries
*/
@@ -196,29 +180,6 @@ class HiveContext private[hive](
protected[sql] lazy val substitutor = new VariableSubstitution()
/**
- * The copy of the hive client that is used for execution. Currently this must always be
- * Hive 13 as this is the version of Hive that is packaged with Spark SQL. This copy of the
- * client is used for execution related tasks like registering temporary functions or ensuring
- * that the ThreadLocal SessionState is correctly populated. This copy of Hive is *not* used
- * for storing persistent metadata, and only point to a dummy metastore in a temporary directory.
- */
- @transient
- protected[hive] lazy val executionHive: HiveClientImpl = if (execHive != null) {
- execHive
- } else {
- logInfo(s"Initializing execution hive, version $hiveExecutionVersion")
- val loader = new IsolatedClientLoader(
- version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion),
- sparkConf = sc.conf,
- execJars = Seq(),
- hadoopConf = sc.hadoopConfiguration,
- config = newTemporaryConfiguration(useInMemoryDerby = true),
- isolationOn = false,
- baseClassLoader = Utils.getContextOrSparkClassLoader)
- loader.createClient().asInstanceOf[HiveClientImpl]
- }
-
- /**
* Overrides default Hive configurations to avoid breaking changes to Spark SQL users.
* - allow SQL11 keywords to be used as identifiers
*/
@@ -228,111 +189,6 @@ class HiveContext private[hive](
defaultOverrides()
- /**
- * The copy of the Hive client that is used to retrieve metadata from the Hive MetaStore.
- * The version of the Hive client that is used here must match the metastore that is configured
- * in the hive-site.xml file.
- */
- @transient
- protected[hive] lazy val metadataHive: HiveClient = if (metaHive != null) {
- metaHive
- } else {
- val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion)
-
- // We instantiate a HiveConf here to read in the hive-site.xml file and then pass the options
- // into the isolated client loader
- val metadataConf = new HiveConf(sc.hadoopConfiguration, classOf[HiveConf])
-
- val defaultWarehouseLocation = metadataConf.get("hive.metastore.warehouse.dir")
- logInfo("default warehouse location is " + defaultWarehouseLocation)
-
- // `configure` goes second to override other settings.
- val allConfig = metadataConf.asScala.map(e => e.getKey -> e.getValue).toMap ++ configure
-
- val isolatedLoader = if (hiveMetastoreJars == "builtin") {
- if (hiveExecutionVersion != hiveMetastoreVersion) {
- throw new IllegalArgumentException(
- "Builtin jars can only be used when hive execution version == hive metastore version. " +
- s"Execution: ${hiveExecutionVersion} != Metastore: ${hiveMetastoreVersion}. " +
- "Specify a vaild path to the correct hive jars using $HIVE_METASTORE_JARS " +
- s"or change ${HIVE_METASTORE_VERSION.key} to $hiveExecutionVersion.")
- }
-
- // We recursively find all jars in the class loader chain,
- // starting from the given classLoader.
- def allJars(classLoader: ClassLoader): Array[URL] = classLoader match {
- case null => Array.empty[URL]
- case urlClassLoader: URLClassLoader =>
- urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent)
- case other => allJars(other.getParent)
- }
-
- val classLoader = Utils.getContextOrSparkClassLoader
- val jars = allJars(classLoader)
- if (jars.length == 0) {
- throw new IllegalArgumentException(
- "Unable to locate hive jars to connect to metastore. " +
- "Please set spark.sql.hive.metastore.jars.")
- }
-
- logInfo(
- s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.")
- new IsolatedClientLoader(
- version = metaVersion,
- sparkConf = sc.conf,
- execJars = jars.toSeq,
- hadoopConf = sc.hadoopConfiguration,
- config = allConfig,
- isolationOn = true,
- barrierPrefixes = hiveMetastoreBarrierPrefixes,
- sharedPrefixes = hiveMetastoreSharedPrefixes)
- } else if (hiveMetastoreJars == "maven") {
- // TODO: Support for loading the jars from an already downloaded location.
- logInfo(
- s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.")
- IsolatedClientLoader.forVersion(
- hiveMetastoreVersion = hiveMetastoreVersion,
- hadoopVersion = VersionInfo.getVersion,
- sparkConf = sc.conf,
- hadoopConf = sc.hadoopConfiguration,
- config = allConfig,
- barrierPrefixes = hiveMetastoreBarrierPrefixes,
- sharedPrefixes = hiveMetastoreSharedPrefixes)
- } else {
- // Convert to files and expand any directories.
- val jars =
- hiveMetastoreJars
- .split(File.pathSeparator)
- .flatMap {
- case path if new File(path).getName() == "*" =>
- val files = new File(path).getParentFile().listFiles()
- if (files == null) {
- logWarning(s"Hive jar path '$path' does not exist.")
- Nil
- } else {
- files.filter(_.getName().toLowerCase().endsWith(".jar"))
- }
- case path =>
- new File(path) :: Nil
- }
- .map(_.toURI.toURL)
-
- logInfo(
- s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " +
- s"using ${jars.mkString(":")}")
- new IsolatedClientLoader(
- version = metaVersion,
- sparkConf = sc.conf,
- execJars = jars.toSeq,
- hadoopConf = sc.hadoopConfiguration,
- config = allConfig,
- isolationOn = true,
- barrierPrefixes = hiveMetastoreBarrierPrefixes,
- sharedPrefixes = hiveMetastoreSharedPrefixes)
- }
- isolatedLoader.createClient()
- }
-
protected[sql] override def parseSql(sql: String): LogicalPlan = {
executionHive.withHiveState {
super.parseSql(substitutor.substitute(hiveconf, sql))
@@ -432,7 +288,7 @@ class HiveContext private[hive](
// recorded in the Hive metastore.
// This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
- sessionState.catalog.client.alterTable(
+ sessionState.catalog.alterTable(
relation.table.copy(
properties = relation.table.properties +
(StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString)))
@@ -459,64 +315,10 @@ class HiveContext private[hive](
setConf(entry.key, entry.stringConverter(value))
}
- /** Overridden by child classes that need to set configuration before the client init. */
- protected def configure(): Map[String, String] = {
- // Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch
- // of time `ConfVar`s by adding time suffixes (`s`, `ms`, and `d` etc.). This breaks backwards-
- // compatibility when users are trying to connecting to a Hive metastore of lower version,
- // because these options are expected to be integral values in lower versions of Hive.
- //
- // Here we enumerate all time `ConfVar`s and convert their values to numeric strings according
- // to their output time units.
- Seq(
- ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY -> TimeUnit.SECONDS,
- ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT -> TimeUnit.SECONDS,
- ConfVars.METASTORE_CLIENT_SOCKET_LIFETIME -> TimeUnit.SECONDS,
- ConfVars.HMSHANDLERINTERVAL -> TimeUnit.MILLISECONDS,
- ConfVars.METASTORE_EVENT_DB_LISTENER_TTL -> TimeUnit.SECONDS,
- ConfVars.METASTORE_EVENT_CLEAN_FREQ -> TimeUnit.SECONDS,
- ConfVars.METASTORE_EVENT_EXPIRY_DURATION -> TimeUnit.SECONDS,
- ConfVars.METASTORE_AGGREGATE_STATS_CACHE_TTL -> TimeUnit.SECONDS,
- ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_WRITER_WAIT -> TimeUnit.MILLISECONDS,
- ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_READER_WAIT -> TimeUnit.MILLISECONDS,
- ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT -> TimeUnit.SECONDS,
- ConfVars.HIVE_LOG_INCREMENTAL_PLAN_PROGRESS_INTERVAL -> TimeUnit.MILLISECONDS,
- ConfVars.HIVE_STATS_JDBC_TIMEOUT -> TimeUnit.SECONDS,
- ConfVars.HIVE_STATS_RETRIES_WAIT -> TimeUnit.MILLISECONDS,
- ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES -> TimeUnit.SECONDS,
- ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT -> TimeUnit.MILLISECONDS,
- ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME -> TimeUnit.MILLISECONDS,
- ConfVars.HIVE_TXN_TIMEOUT -> TimeUnit.SECONDS,
- ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT -> TimeUnit.SECONDS,
- ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL -> TimeUnit.SECONDS,
- ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL -> TimeUnit.MILLISECONDS,
- ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME -> TimeUnit.MILLISECONDS,
- ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME -> TimeUnit.SECONDS,
- ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_MAX_AGE -> TimeUnit.SECONDS,
- ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH -> TimeUnit.MILLISECONDS,
- ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT -> TimeUnit.SECONDS,
- ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME -> TimeUnit.SECONDS,
- ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT -> TimeUnit.SECONDS,
- ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME -> TimeUnit.SECONDS,
- ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT -> TimeUnit.MILLISECONDS,
- ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL -> TimeUnit.MILLISECONDS,
- ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT -> TimeUnit.MILLISECONDS,
- ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT -> TimeUnit.MILLISECONDS,
- ConfVars.SERVER_READ_SOCKET_TIMEOUT -> TimeUnit.SECONDS,
- ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL -> TimeUnit.MILLISECONDS,
- ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT -> TimeUnit.SECONDS,
- ConfVars.SPARK_JOB_MONITOR_TIMEOUT -> TimeUnit.SECONDS,
- ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT -> TimeUnit.MILLISECONDS,
- ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT -> TimeUnit.MILLISECONDS
- ).map { case (confVar, unit) =>
- confVar.varname -> hiveconf.getTimeVar(confVar, unit).toString
- }.toMap
- }
-
/**
* SQLConf and HiveConf contracts:
*
- * 1. create a new SessionState for each HiveContext
+ * 1. create a new o.a.h.hive.ql.session.SessionState for each HiveContext
* 2. when the Hive session is first initialized, params in HiveConf will get picked up by the
* SQLConf. Additionally, any properties set by set() or a SET command inside sql() will be
* set in the SQLConf *as well as* in the HiveConf.
@@ -600,7 +402,7 @@ class HiveContext private[hive](
}
-private[hive] object HiveContext {
+private[hive] object HiveContext extends Logging {
/** The version of hive used internally by Spark SQL. */
val hiveExecutionVersion: String = "1.2.1"
@@ -666,6 +468,242 @@ private[hive] object HiveContext {
defaultValue = Some(true),
doc = "When set to true, Hive Thrift server executes SQL queries in an asynchronous way.")
+ /**
+ * The version of the hive client that will be used to communicate with the metastore. Note that
+ * this does not necessarily need to be the same version of Hive that is used internally by
+ * Spark SQL for execution.
+ */
+ private def hiveMetastoreVersion(conf: SQLConf): String = {
+ conf.getConf(HIVE_METASTORE_VERSION)
+ }
+
+ /**
+ * The location of the jars that should be used to instantiate the HiveMetastoreClient. This
+ * property can be one of three options:
+ * - a classpath in the standard format for both hive and hadoop.
+ * - builtin - attempt to discover the jars that were used to load Spark SQL and use those. This
+ * option is only valid when using the execution version of Hive.
+ * - maven - download the correct version of hive on demand from maven.
+ */
+ private def hiveMetastoreJars(conf: SQLConf): String = {
+ conf.getConf(HIVE_METASTORE_JARS)
+ }
+
+ /**
+ * A comma separated list of class prefixes that should be loaded using the classloader that
+ * is shared between Spark SQL and a specific version of Hive. An example of classes that should
+ * be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need
+ * to be shared are those that interact with classes that are already shared. For example,
+ * custom appenders that are used by log4j.
+ */
+ private def hiveMetastoreSharedPrefixes(conf: SQLConf): Seq[String] = {
+ conf.getConf(HIVE_METASTORE_SHARED_PREFIXES).filterNot(_ == "")
+ }
+
+ /**
+ * A comma separated list of class prefixes that should explicitly be reloaded for each version
+ * of Hive that Spark SQL is communicating with. For example, Hive UDFs that are declared in a
+ * prefix that typically would be shared (i.e. org.apache.spark.*)
+ */
+ private def hiveMetastoreBarrierPrefixes(conf: SQLConf): Seq[String] = {
+ conf.getConf(HIVE_METASTORE_BARRIER_PREFIXES).filterNot(_ == "")
+ }
+
+ /**
+ * Configurations needed to create a [[HiveClient]].
+ */
+ private[hive] def hiveClientConfigurations(hiveconf: HiveConf): Map[String, String] = {
+ // Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch
+ // of time `ConfVar`s by adding time suffixes (`s`, `ms`, and `d` etc.). This breaks backwards-
+ // compatibility when users are trying to connecting to a Hive metastore of lower version,
+ // because these options are expected to be integral values in lower versions of Hive.
+ //
+ // Here we enumerate all time `ConfVar`s and convert their values to numeric strings according
+ // to their output time units.
+ Seq(
+ ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY -> TimeUnit.SECONDS,
+ ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT -> TimeUnit.SECONDS,
+ ConfVars.METASTORE_CLIENT_SOCKET_LIFETIME -> TimeUnit.SECONDS,
+ ConfVars.HMSHANDLERINTERVAL -> TimeUnit.MILLISECONDS,
+ ConfVars.METASTORE_EVENT_DB_LISTENER_TTL -> TimeUnit.SECONDS,
+ ConfVars.METASTORE_EVENT_CLEAN_FREQ -> TimeUnit.SECONDS,
+ ConfVars.METASTORE_EVENT_EXPIRY_DURATION -> TimeUnit.SECONDS,
+ ConfVars.METASTORE_AGGREGATE_STATS_CACHE_TTL -> TimeUnit.SECONDS,
+ ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_WRITER_WAIT -> TimeUnit.MILLISECONDS,
+ ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_READER_WAIT -> TimeUnit.MILLISECONDS,
+ ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT -> TimeUnit.SECONDS,
+ ConfVars.HIVE_LOG_INCREMENTAL_PLAN_PROGRESS_INTERVAL -> TimeUnit.MILLISECONDS,
+ ConfVars.HIVE_STATS_JDBC_TIMEOUT -> TimeUnit.SECONDS,
+ ConfVars.HIVE_STATS_RETRIES_WAIT -> TimeUnit.MILLISECONDS,
+ ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES -> TimeUnit.SECONDS,
+ ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT -> TimeUnit.MILLISECONDS,
+ ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME -> TimeUnit.MILLISECONDS,
+ ConfVars.HIVE_TXN_TIMEOUT -> TimeUnit.SECONDS,
+ ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT -> TimeUnit.SECONDS,
+ ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL -> TimeUnit.SECONDS,
+ ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL -> TimeUnit.MILLISECONDS,
+ ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME -> TimeUnit.MILLISECONDS,
+ ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME -> TimeUnit.SECONDS,
+ ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_MAX_AGE -> TimeUnit.SECONDS,
+ ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH -> TimeUnit.MILLISECONDS,
+ ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT -> TimeUnit.SECONDS,
+ ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME -> TimeUnit.SECONDS,
+ ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT -> TimeUnit.SECONDS,
+ ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME -> TimeUnit.SECONDS,
+ ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT -> TimeUnit.MILLISECONDS,
+ ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL -> TimeUnit.MILLISECONDS,
+ ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT -> TimeUnit.MILLISECONDS,
+ ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT -> TimeUnit.MILLISECONDS,
+ ConfVars.SERVER_READ_SOCKET_TIMEOUT -> TimeUnit.SECONDS,
+ ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL -> TimeUnit.MILLISECONDS,
+ ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT -> TimeUnit.SECONDS,
+ ConfVars.SPARK_JOB_MONITOR_TIMEOUT -> TimeUnit.SECONDS,
+ ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT -> TimeUnit.MILLISECONDS,
+ ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT -> TimeUnit.MILLISECONDS
+ ).map { case (confVar, unit) =>
+ confVar.varname -> hiveconf.getTimeVar(confVar, unit).toString
+ }.toMap
+ }
+
+ /**
+ * Create a [[HiveClient]] used for execution.
+ *
+ * Currently this must always be Hive 13 as this is the version of Hive that is packaged
+ * with Spark SQL. This copy of the client is used for execution related tasks like
+ * registering temporary functions or ensuring that the ThreadLocal SessionState is
+ * correctly populated. This copy of Hive is *not* used for storing persistent metadata,
+ * and only point to a dummy metastore in a temporary directory.
+ */
+ protected[hive] def newClientForExecution(
+ conf: SparkConf,
+ hadoopConf: Configuration): HiveClientImpl = {
+ logInfo(s"Initializing execution hive, version $hiveExecutionVersion")
+ val loader = new IsolatedClientLoader(
+ version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion),
+ sparkConf = conf,
+ execJars = Seq(),
+ hadoopConf = hadoopConf,
+ config = newTemporaryConfiguration(useInMemoryDerby = true),
+ isolationOn = false,
+ baseClassLoader = Utils.getContextOrSparkClassLoader)
+ loader.createClient().asInstanceOf[HiveClientImpl]
+ }
+
+ /**
+ * Create a [[HiveClient]] used to retrieve metadata from the Hive MetaStore.
+ *
+ * The version of the Hive client that is used here must match the metastore that is configured
+ * in the hive-site.xml file.
+ */
+ private def newClientForMetadata(conf: SparkConf, hadoopConf: Configuration): HiveClient = {
+ val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf])
+ val configurations = hiveClientConfigurations(hiveConf)
+ newClientForMetadata(conf, hiveConf, hadoopConf, configurations)
+ }
+
+ protected[hive] def newClientForMetadata(
+ conf: SparkConf,
+ hiveConf: HiveConf,
+ hadoopConf: Configuration,
+ configurations: Map[String, String]): HiveClient = {
+ val sqlConf = new SQLConf
+ sqlConf.setConf(SQLContext.getSQLProperties(conf))
+ val hiveMetastoreVersion = HiveContext.hiveMetastoreVersion(sqlConf)
+ val hiveMetastoreJars = HiveContext.hiveMetastoreJars(sqlConf)
+ val hiveMetastoreSharedPrefixes = HiveContext.hiveMetastoreSharedPrefixes(sqlConf)
+ val hiveMetastoreBarrierPrefixes = HiveContext.hiveMetastoreBarrierPrefixes(sqlConf)
+ val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion)
+
+ val defaultWarehouseLocation = hiveConf.get("hive.metastore.warehouse.dir")
+ logInfo("default warehouse location is " + defaultWarehouseLocation)
+
+ // `configure` goes second to override other settings.
+ val allConfig = hiveConf.asScala.map(e => e.getKey -> e.getValue).toMap ++ configurations
+
+ val isolatedLoader = if (hiveMetastoreJars == "builtin") {
+ if (hiveExecutionVersion != hiveMetastoreVersion) {
+ throw new IllegalArgumentException(
+ "Builtin jars can only be used when hive execution version == hive metastore version. " +
+ s"Execution: $hiveExecutionVersion != Metastore: $hiveMetastoreVersion. " +
+ "Specify a vaild path to the correct hive jars using $HIVE_METASTORE_JARS " +
+ s"or change ${HIVE_METASTORE_VERSION.key} to $hiveExecutionVersion.")
+ }
+
+ // We recursively find all jars in the class loader chain,
+ // starting from the given classLoader.
+ def allJars(classLoader: ClassLoader): Array[URL] = classLoader match {
+ case null => Array.empty[URL]
+ case urlClassLoader: URLClassLoader =>
+ urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent)
+ case other => allJars(other.getParent)
+ }
+
+ val classLoader = Utils.getContextOrSparkClassLoader
+ val jars = allJars(classLoader)
+ if (jars.length == 0) {
+ throw new IllegalArgumentException(
+ "Unable to locate hive jars to connect to metastore. " +
+ "Please set spark.sql.hive.metastore.jars.")
+ }
+
+ logInfo(
+ s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.")
+ new IsolatedClientLoader(
+ version = metaVersion,
+ sparkConf = conf,
+ hadoopConf = hadoopConf,
+ execJars = jars.toSeq,
+ config = allConfig,
+ isolationOn = true,
+ barrierPrefixes = hiveMetastoreBarrierPrefixes,
+ sharedPrefixes = hiveMetastoreSharedPrefixes)
+ } else if (hiveMetastoreJars == "maven") {
+ // TODO: Support for loading the jars from an already downloaded location.
+ logInfo(
+ s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.")
+ IsolatedClientLoader.forVersion(
+ hiveMetastoreVersion = hiveMetastoreVersion,
+ hadoopVersion = VersionInfo.getVersion,
+ sparkConf = conf,
+ hadoopConf = hadoopConf,
+ config = allConfig,
+ barrierPrefixes = hiveMetastoreBarrierPrefixes,
+ sharedPrefixes = hiveMetastoreSharedPrefixes)
+ } else {
+ // Convert to files and expand any directories.
+ val jars =
+ hiveMetastoreJars
+ .split(File.pathSeparator)
+ .flatMap {
+ case path if new File(path).getName == "*" =>
+ val files = new File(path).getParentFile.listFiles()
+ if (files == null) {
+ logWarning(s"Hive jar path '$path' does not exist.")
+ Nil
+ } else {
+ files.filter(_.getName.toLowerCase.endsWith(".jar"))
+ }
+ case path =>
+ new File(path) :: Nil
+ }
+ .map(_.toURI.toURL)
+
+ logInfo(
+ s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " +
+ s"using ${jars.mkString(":")}")
+ new IsolatedClientLoader(
+ version = metaVersion,
+ sparkConf = conf,
+ hadoopConf = hadoopConf,
+ execJars = jars.toSeq,
+ config = allConfig,
+ isolationOn = true,
+ barrierPrefixes = hiveMetastoreBarrierPrefixes,
+ sharedPrefixes = hiveMetastoreSharedPrefixes)
+ }
+ isolatedLoader.createClient()
+ }
+
/** Constructs a configuration for hive, where the metastore is located in a temp directory. */
def newTemporaryConfiguration(useInMemoryDerby: Boolean): Map[String, String] = {
val withInMemoryMode = if (useInMemoryDerby) "memory:" else ""
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 27e4cfc103..c7066d7363 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -33,7 +33,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, SaveMode, SQLContext}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog}
+import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.parser.DataTypeParser
@@ -98,27 +98,33 @@ private[hive] object HiveSerDe {
}
-// TODO: replace this with o.a.s.sql.hive.HiveCatalog once we merge SQLContext and HiveContext
+/**
+ * Legacy catalog for interacting with the Hive metastore.
+ *
+ * This is still used for things like creating data source tables, but in the future will be
+ * cleaned up to integrate more nicely with [[HiveCatalog]].
+ */
private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveContext)
- extends Catalog with Logging {
+ extends Logging {
val conf = hive.conf
- /** Usages should lock on `this`. */
- protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf)
-
/** A fully qualified identifier for a table (i.e., database.tableName) */
case class QualifiedTableName(database: String, name: String)
- private def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = {
+ private def getCurrentDatabase: String = {
+ hive.sessionState.catalog.getCurrentDatabase
+ }
+
+ def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = {
QualifiedTableName(
- tableIdent.database.getOrElse(client.currentDatabase).toLowerCase,
+ tableIdent.database.getOrElse(getCurrentDatabase).toLowerCase,
tableIdent.table.toLowerCase)
}
private def getQualifiedTableName(t: CatalogTable): QualifiedTableName = {
QualifiedTableName(
- t.name.database.getOrElse(client.currentDatabase).toLowerCase,
+ t.name.database.getOrElse(getCurrentDatabase).toLowerCase,
t.name.table.toLowerCase)
}
@@ -194,7 +200,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader)
}
- override def refreshTable(tableIdent: TableIdentifier): Unit = {
+ def refreshTable(tableIdent: TableIdentifier): Unit = {
// refreshTable does not eagerly reload the cache. It just invalidate the cache.
// Next time when we use the table, it will be populated in the cache.
// Since we also cache ParquetRelations converted from Hive Parquet tables and
@@ -408,12 +414,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
new Path(new Path(client.getDatabase(dbName).locationUri), tblName).toString
}
- override def tableExists(tableIdent: TableIdentifier): Boolean = {
- val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
- client.getTableOption(dbName, tblName).isDefined
- }
-
- override def lookupRelation(
+ def lookupRelation(
tableIdent: TableIdentifier,
alias: Option[String]): LogicalPlan = {
val qualifiedTableName = getQualifiedTableName(tableIdent)
@@ -555,12 +556,6 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
result.copy(expectedOutputAttributes = Some(metastoreRelation.output))
}
- override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
- val db = databaseName.getOrElse(client.currentDatabase)
-
- client.listTables(db).map(tableName => (tableName, false))
- }
-
/**
* When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet
* data source relations for better performance.
@@ -716,27 +711,6 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
}
}
- /**
- * UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore.
- * For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]].
- */
- override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
- throw new UnsupportedOperationException
- }
-
- /**
- * UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore.
- * For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]].
- */
- override def unregisterTable(tableIdent: TableIdentifier): Unit = {
- throw new UnsupportedOperationException
- }
-
- override def unregisterAllTables(): Unit = {}
-
- override def setCurrentDatabase(databaseName: String): Unit = {
- client.setCurrentDatabase(databaseName)
- }
}
/**
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
new file mode 100644
index 0000000000..aa44cba4b5
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.BucketSpec
+import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+
+class HiveSessionCatalog(
+ externalCatalog: HiveCatalog,
+ client: HiveClient,
+ context: HiveContext,
+ conf: SQLConf)
+ extends SessionCatalog(externalCatalog, conf) {
+
+ override def setCurrentDatabase(db: String): Unit = {
+ super.setCurrentDatabase(db)
+ client.setCurrentDatabase(db)
+ }
+
+ override def lookupRelation(name: TableIdentifier, alias: Option[String]): LogicalPlan = {
+ val table = formatTableName(name.table)
+ if (name.database.isDefined || !tempTables.containsKey(table)) {
+ val newName = name.copy(table = table)
+ metastoreCatalog.lookupRelation(newName, alias)
+ } else {
+ val relation = tempTables.get(table)
+ val tableWithQualifiers = SubqueryAlias(table, relation)
+ // If an alias was specified by the lookup, wrap the plan in a subquery so that
+ // attributes are properly qualified with this alias.
+ alias.map(a => SubqueryAlias(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)
+ }
+ }
+
+ // ----------------------------------------------------------------
+ // | Methods and fields for interacting with HiveMetastoreCatalog |
+ // ----------------------------------------------------------------
+
+ // Catalog for handling data source tables. TODO: This really doesn't belong here since it is
+ // essentially a cache for metastore tables. However, it relies on a lot of session-specific
+ // things so it would be a lot of work to split its functionality between HiveSessionCatalog
+ // and HiveCatalog. We should still do it at some point...
+ private val metastoreCatalog = new HiveMetastoreCatalog(client, context)
+
+ val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions
+ val CreateTables: Rule[LogicalPlan] = metastoreCatalog.CreateTables
+ val PreInsertionCasts: Rule[LogicalPlan] = metastoreCatalog.PreInsertionCasts
+
+ override def refreshTable(name: TableIdentifier): Unit = {
+ metastoreCatalog.refreshTable(name)
+ }
+
+ def invalidateTable(name: TableIdentifier): Unit = {
+ metastoreCatalog.invalidateTable(name)
+ }
+
+ def invalidateCache(): Unit = {
+ metastoreCatalog.cachedDataSourceTables.invalidateAll()
+ }
+
+ def createDataSourceTable(
+ name: TableIdentifier,
+ userSpecifiedSchema: Option[StructType],
+ partitionColumns: Array[String],
+ bucketSpec: Option[BucketSpec],
+ provider: String,
+ options: Map[String, String],
+ isExternal: Boolean): Unit = {
+ metastoreCatalog.createDataSourceTable(
+ name, userSpecifiedSchema, partitionColumns, bucketSpec, provider, options, isExternal)
+ }
+
+ def hiveDefaultTableFilePath(name: TableIdentifier): String = {
+ metastoreCatalog.hiveDefaultTableFilePath(name)
+ }
+
+ // For testing only
+ private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
+ val key = metastoreCatalog.getQualifiedTableName(table)
+ metastoreCatalog.cachedDataSourceTables.getIfPresent(key)
+ }
+
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index d9cd96d66f..caa7f296ed 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.hive
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry, OverrideCatalog}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.execution.{python, SparkPlanner}
import org.apache.spark.sql.execution.datasources._
@@ -35,9 +35,11 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx)
}
/**
- * A metadata catalog that points to the Hive metastore.
+ * Internal catalog for managing table and database states.
*/
- override lazy val catalog = new HiveMetastoreCatalog(ctx.metadataHive, ctx) with OverrideCatalog
+ override lazy val catalog = {
+ new HiveSessionCatalog(ctx.hiveCatalog, ctx.metadataHive, ctx, conf)
+ }
/**
* Internal catalog for managing functions registered by the user.
@@ -61,7 +63,7 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx)
DataSourceAnalysis ::
(if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil)
- override val extendedCheckRules = Seq(PreWriteCheck(catalog))
+ override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index d214e5288e..f4d30358ca 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -53,9 +53,6 @@ private[hive] trait HiveClient {
/** Returns the names of tables in the given database that matches the given pattern. */
def listTables(dbName: String, pattern: String): Seq[String]
- /** Returns the name of the active database. */
- def currentDatabase: String
-
/** Sets the name of current database. */
def setCurrentDatabase(databaseName: String): Unit
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 928408c52b..e4e15d13df 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -241,10 +241,6 @@ private[hive] class HiveClientImpl(
state.err = stream
}
- override def currentDatabase: String = withHiveState {
- state.getCurrentDatabase
- }
-
override def setCurrentDatabase(databaseName: String): Unit = withHiveState {
if (getDatabaseOption(databaseName).isDefined) {
state.setCurrentDatabase(databaseName)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 391e2975d0..5a61eef0f2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -69,10 +69,10 @@ case class CreateTableAsSelect(
withFormat
}
- hiveContext.sessionState.catalog.client.createTable(withSchema, ignoreIfExists = false)
+ hiveContext.sessionState.catalog.createTable(withSchema, ignoreIfExists = false)
// Get the Metastore Relation
- hiveContext.sessionState.catalog.lookupRelation(tableIdentifier, None) match {
+ hiveContext.sessionState.catalog.lookupRelation(tableIdentifier) match {
case r: MetastoreRelation => r
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
index 8a1cf2caaa..9ff520da1d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
@@ -56,7 +56,7 @@ private[hive] case class CreateViewAsSelect(
case true if orReplace =>
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
- hiveContext.sessionState.catalog.client.alertView(prepareTable(sqlContext))
+ hiveContext.metadataHive.alertView(prepareTable(sqlContext))
case true =>
// Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
@@ -66,7 +66,7 @@ private[hive] case class CreateViewAsSelect(
"CREATE OR REPLACE VIEW AS")
case false =>
- hiveContext.sessionState.catalog.client.createView(prepareTable(sqlContext))
+ hiveContext.metadataHive.createView(prepareTable(sqlContext))
}
Seq.empty[Row]
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 4ffd868242..430fa4616f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -45,7 +45,7 @@ case class InsertIntoHiveTable(
@transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
@transient private lazy val hiveContext = new Context(sc.hiveconf)
- @transient private lazy val catalog = sc.sessionState.catalog
+ @transient private lazy val client = sc.metadataHive
def output: Seq[Attribute] = Seq.empty
@@ -186,8 +186,8 @@ case class InsertIntoHiveTable(
// TODO: Correctly set isSkewedStoreAsSubdir.
val isSkewedStoreAsSubdir = false
if (numDynamicPartitions > 0) {
- catalog.synchronized {
- catalog.client.loadDynamicPartitions(
+ client.synchronized {
+ client.loadDynamicPartitions(
outputPath.toString,
qualifiedTableName,
orderedPartitionSpec,
@@ -202,12 +202,12 @@ case class InsertIntoHiveTable(
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries
// scalastyle:on
val oldPart =
- catalog.client.getPartitionOption(
- catalog.client.getTable(table.databaseName, table.tableName),
+ client.getPartitionOption(
+ client.getTable(table.databaseName, table.tableName),
partitionSpec)
if (oldPart.isEmpty || !ifNotExists) {
- catalog.client.loadPartition(
+ client.loadPartition(
outputPath.toString,
qualifiedTableName,
orderedPartitionSpec,
@@ -218,7 +218,7 @@ case class InsertIntoHiveTable(
}
}
} else {
- catalog.client.loadTable(
+ client.loadTable(
outputPath.toString, // TODO: URI
qualifiedTableName,
overwrite,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 226b8e1796..cd26a68f35 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -71,7 +71,8 @@ case class DropTable(
}
hiveContext.invalidateTable(tableName)
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
- hiveContext.sessionState.catalog.unregisterTable(TableIdentifier(tableName))
+ hiveContext.sessionState.catalog.dropTable(
+ TableIdentifier(tableName), ignoreIfNotExists = true)
Seq.empty[Row]
}
}
@@ -142,7 +143,8 @@ case class CreateMetastoreDataSource(
val optionsWithPath =
if (!options.contains("path") && managedIfNoPath) {
isExternal = false
- options + ("path" -> hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
+ options + ("path" ->
+ hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
} else {
options
}
@@ -200,7 +202,8 @@ case class CreateMetastoreDataSourceAsSelect(
val optionsWithPath =
if (!options.contains("path")) {
isExternal = false
- options + ("path" -> hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
+ options + ("path" ->
+ hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
} else {
options
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 19c05f9cb0..a1785ca038 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -24,6 +24,8 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.language.implicitConversions
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.exec.FunctionRegistry
import org.apache.hadoop.hive.ql.processors._
@@ -35,9 +37,11 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.CacheManager
import org.apache.spark.sql.execution.command.CacheTableCommand
+import org.apache.spark.sql.execution.ui.SQLListener
import org.apache.spark.sql.hive._
-import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl}
import org.apache.spark.sql.hive.execution.HiveNativeCommand
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.{ShutdownHookManager, Utils}
@@ -55,10 +59,6 @@ object TestHive
// SPARK-8910
.set("spark.ui.enabled", "false")))
-trait TestHiveSingleton {
- protected val sqlContext: SQLContext = TestHive
- protected val hiveContext: TestHiveContext = TestHive
-}
/**
* A locally running test instance of Spark's Hive execution engine.
@@ -71,10 +71,87 @@ trait TestHiveSingleton {
* hive metastore seems to lead to weird non-deterministic failures. Therefore, the execution of
* test cases that rely on TestHive must be serialized.
*/
-class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
- self =>
+class TestHiveContext private[hive](
+ sc: SparkContext,
+ cacheManager: CacheManager,
+ listener: SQLListener,
+ executionHive: HiveClientImpl,
+ metadataHive: HiveClient,
+ isRootContext: Boolean,
+ hiveCatalog: HiveCatalog,
+ val warehousePath: File,
+ val scratchDirPath: File,
+ metastoreTemporaryConf: Map[String, String])
+ extends HiveContext(
+ sc,
+ cacheManager,
+ listener,
+ executionHive,
+ metadataHive,
+ isRootContext,
+ hiveCatalog) { self =>
+
+ // Unfortunately, due to the complex interactions between the construction parameters
+ // and the limitations in scala constructors, we need many of these constructors to
+ // provide a shorthand to create a new TestHiveContext with only a SparkContext.
+ // This is not a great design pattern but it's necessary here.
+
+ private def this(
+ sc: SparkContext,
+ executionHive: HiveClientImpl,
+ metadataHive: HiveClient,
+ warehousePath: File,
+ scratchDirPath: File,
+ metastoreTemporaryConf: Map[String, String]) {
+ this(
+ sc,
+ new CacheManager,
+ SQLContext.createListenerAndUI(sc),
+ executionHive,
+ metadataHive,
+ true,
+ new HiveCatalog(metadataHive),
+ warehousePath,
+ scratchDirPath,
+ metastoreTemporaryConf)
+ }
+
+ private def this(
+ sc: SparkContext,
+ warehousePath: File,
+ scratchDirPath: File,
+ metastoreTemporaryConf: Map[String, String]) {
+ this(
+ sc,
+ HiveContext.newClientForExecution(sc.conf, sc.hadoopConfiguration),
+ TestHiveContext.newClientForMetadata(
+ sc.conf, sc.hadoopConfiguration, warehousePath, scratchDirPath, metastoreTemporaryConf),
+ warehousePath,
+ scratchDirPath,
+ metastoreTemporaryConf)
+ }
- import HiveContext._
+ def this(sc: SparkContext) {
+ this(
+ sc,
+ Utils.createTempDir(namePrefix = "warehouse"),
+ TestHiveContext.makeScratchDir(),
+ HiveContext.newTemporaryConfiguration(useInMemoryDerby = false))
+ }
+
+ override def newSession(): HiveContext = {
+ new TestHiveContext(
+ sc = sc,
+ cacheManager = cacheManager,
+ listener = listener,
+ executionHive = executionHive.newSession(),
+ metadataHive = metadataHive.newSession(),
+ isRootContext = false,
+ hiveCatalog = hiveCatalog,
+ warehousePath = warehousePath,
+ scratchDirPath = scratchDirPath,
+ metastoreTemporaryConf = metastoreTemporaryConf)
+ }
// By clearing the port we force Spark to pick a new one. This allows us to rerun tests
// without restarting the JVM.
@@ -83,24 +160,12 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
hiveconf.set("hive.plan.serialization.format", "javaXML")
- lazy val warehousePath = Utils.createTempDir(namePrefix = "warehouse-")
-
- lazy val scratchDirPath = {
- val dir = Utils.createTempDir(namePrefix = "scratch-")
- dir.delete()
- dir
- }
-
- private lazy val temporaryConfig = newTemporaryConfiguration(useInMemoryDerby = false)
-
- /** Sets up the system initially or after a RESET command */
- protected override def configure(): Map[String, String] = {
- super.configure() ++ temporaryConfig ++ Map(
- ConfVars.METASTOREWAREHOUSE.varname -> warehousePath.toURI.toString,
- ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true",
- ConfVars.SCRATCHDIR.varname -> scratchDirPath.toURI.toString,
- ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY.varname -> "1"
- )
+ // A snapshot of the entries in the starting SQLConf
+ // We save this because tests can mutate this singleton object if they want
+ val initialSQLConf: SQLConf = {
+ val snapshot = new SQLConf
+ conf.getAllConfs.foreach { case (k, v) => snapshot.setConfString(k, v) }
+ snapshot
}
val testTempDir = Utils.createTempDir()
@@ -427,9 +492,9 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
cacheManager.clearCache()
loadedTables.clear()
- sessionState.catalog.cachedDataSourceTables.invalidateAll()
- sessionState.catalog.client.reset()
- sessionState.catalog.unregisterAllTables()
+ sessionState.catalog.clearTempTables()
+ sessionState.catalog.invalidateCache()
+ metadataHive.reset()
FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)).
foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) }
@@ -448,13 +513,13 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
// Lots of tests fail if we do not change the partition whitelist from the default.
runSqlHive("set hive.metastore.partition.name.whitelist.pattern=.*")
- configure().foreach {
- case (k, v) =>
- metadataHive.runSqlHive(s"SET $k=$v")
- }
+ // In case a test changed any of these values, restore all the original ones here.
+ TestHiveContext.hiveClientConfigurations(
+ hiveconf, warehousePath, scratchDirPath, metastoreTemporaryConf)
+ .foreach { case (k, v) => metadataHive.runSqlHive(s"SET $k=$v") }
defaultOverrides()
- runSqlHive("USE default")
+ sessionState.catalog.setCurrentDatabase("default")
} catch {
case e: Exception =>
logError("FATAL ERROR: Failed to reset TestDB state.", e)
@@ -490,4 +555,43 @@ private[hive] object TestHiveContext {
// Fewer shuffle partitions to speed up testing.
SQLConf.SHUFFLE_PARTITIONS.key -> "5"
)
+
+ /**
+ * Create a [[HiveClient]] used to retrieve metadata from the Hive MetaStore.
+ */
+ private def newClientForMetadata(
+ conf: SparkConf,
+ hadoopConf: Configuration,
+ warehousePath: File,
+ scratchDirPath: File,
+ metastoreTemporaryConf: Map[String, String]): HiveClient = {
+ val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf])
+ HiveContext.newClientForMetadata(
+ conf,
+ hiveConf,
+ hadoopConf,
+ hiveClientConfigurations(hiveConf, warehousePath, scratchDirPath, metastoreTemporaryConf))
+ }
+
+ /**
+ * Configurations needed to create a [[HiveClient]].
+ */
+ private def hiveClientConfigurations(
+ hiveconf: HiveConf,
+ warehousePath: File,
+ scratchDirPath: File,
+ metastoreTemporaryConf: Map[String, String]): Map[String, String] = {
+ HiveContext.hiveClientConfigurations(hiveconf) ++ metastoreTemporaryConf ++ Map(
+ ConfVars.METASTOREWAREHOUSE.varname -> warehousePath.toURI.toString,
+ ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true",
+ ConfVars.SCRATCHDIR.varname -> scratchDirPath.toURI.toString,
+ ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY.varname -> "1")
+ }
+
+ private def makeScratchDir(): File = {
+ val scratchDir = Utils.createTempDir(namePrefix = "scratch")
+ scratchDir.delete()
+ scratchDir
+ }
+
}
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
index bd14a243ea..2fc38e2b2d 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
@@ -70,8 +70,9 @@ public class JavaMetastoreDataSourcesSuite {
if (path.exists()) {
path.delete();
}
- hiveManagedPath = new Path(sqlContext.sessionState().catalog().hiveDefaultTableFilePath(
- new TableIdentifier("javaSavedTable")));
+ hiveManagedPath = new Path(
+ sqlContext.sessionState().catalog().hiveDefaultTableFilePath(
+ new TableIdentifier("javaSavedTable")));
fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration());
if (fs.exists(hiveManagedPath)){
fs.delete(hiveManagedPath, true);
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala b/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala
new file mode 100644
index 0000000000..154ada3daa
--- /dev/null
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.test
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.SQLContext
+
+
+trait TestHiveSingleton extends SparkFunSuite with BeforeAndAfterAll {
+ protected val sqlContext: SQLContext = TestHive
+ protected val hiveContext: TestHiveContext = TestHive
+
+ protected override def afterAll(): Unit = {
+ try {
+ hiveContext.reset()
+ } finally {
+ super.afterAll()
+ }
+ }
+
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ExpressionToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ExpressionToSQLSuite.scala
index 72765f05e7..4c9c48a25c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ExpressionToSQLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ExpressionToSQLSuite.scala
@@ -26,6 +26,7 @@ class ExpressionToSQLSuite extends SQLBuilderTest with SQLTestUtils {
import testImplicits._
protected override def beforeAll(): Unit = {
+ super.beforeAll()
sql("DROP TABLE IF EXISTS t0")
sql("DROP TABLE IF EXISTS t1")
sql("DROP TABLE IF EXISTS t2")
@@ -43,9 +44,13 @@ class ExpressionToSQLSuite extends SQLBuilderTest with SQLTestUtils {
}
override protected def afterAll(): Unit = {
- sql("DROP TABLE IF EXISTS t0")
- sql("DROP TABLE IF EXISTS t1")
- sql("DROP TABLE IF EXISTS t2")
+ try {
+ sql("DROP TABLE IF EXISTS t0")
+ sql("DROP TABLE IF EXISTS t1")
+ sql("DROP TABLE IF EXISTS t2")
+ } finally {
+ super.afterAll()
+ }
}
private def checkSqlGeneration(hiveQl: String): Unit = {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala
new file mode 100644
index 0000000000..b644a50613
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala
@@ -0,0 +1,37 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.hive.test.TestHive
+
+
+class HiveContextSuite extends SparkFunSuite {
+
+ test("HiveContext can access `spark.sql.*` configs") {
+ // Avoid creating another SparkContext in the same JVM
+ val sc = TestHive.sparkContext
+ require(sc.conf.get("spark.sql.hive.metastore.barrierPrefixes") ==
+ "org.apache.spark.sql.hive.execution.PairSerDe")
+ assert(TestHive.initialSQLConf.getConfString("spark.sql.hive.metastore.barrierPrefixes") ==
+ "org.apache.spark.sql.hive.execution.PairSerDe")
+ assert(TestHive.metadataHive.getConf("spark.sql.hive.metastore.barrierPrefixes", "") ==
+ "org.apache.spark.sql.hive.execution.PairSerDe")
+ }
+
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala
index 35e433964d..57f96e725a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala
@@ -33,12 +33,17 @@ class HiveDataFrameAnalyticsSuite extends QueryTest with TestHiveSingleton with
private var testData: DataFrame = _
override def beforeAll() {
+ super.beforeAll()
testData = Seq((1, 2), (2, 2), (3, 4)).toDF("a", "b")
hiveContext.registerDataFrameAsTable(testData, "mytable")
}
override def afterAll(): Unit = {
- hiveContext.dropTempTable("mytable")
+ try {
+ hiveContext.dropTempTable("mytable")
+ } finally {
+ super.afterAll()
+ }
}
test("rollup") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index ce7b08ab72..6967395613 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -19,15 +19,15 @@ package org.apache.spark.sql.hive
import java.io.File
-import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{QueryTest, Row, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils}
import org.apache.spark.sql.types.{DecimalType, StringType, StructType}
-class HiveMetastoreCatalogSuite extends SparkFunSuite with TestHiveSingleton {
+class HiveMetastoreCatalogSuite extends TestHiveSingleton {
import hiveContext.implicits._
test("struct field should accept underscore in sub-column name") {
@@ -83,7 +83,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
.saveAsTable("t")
}
- val hiveTable = sessionState.catalog.client.getTable("default", "t")
+ val hiveTable = sessionState.catalog.getTable(TableIdentifier("t", Some("default")))
assert(hiveTable.storage.inputFormat === Some(inputFormat))
assert(hiveTable.storage.outputFormat === Some(outputFormat))
assert(hiveTable.storage.serde === Some(serde))
@@ -114,7 +114,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
.saveAsTable("t")
}
- val hiveTable = sessionState.catalog.client.getTable("default", "t")
+ val hiveTable = sessionState.catalog.getTable(TableIdentifier("t", Some("default")))
assert(hiveTable.storage.inputFormat === Some(inputFormat))
assert(hiveTable.storage.outputFormat === Some(outputFormat))
assert(hiveTable.storage.serde === Some(serde))
@@ -144,7 +144,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
|AS SELECT 1 AS d1, "val_1" AS d2
""".stripMargin)
- val hiveTable = sessionState.catalog.client.getTable("default", "t")
+ val hiveTable = sessionState.catalog.getTable(TableIdentifier("t", Some("default")))
assert(hiveTable.storage.inputFormat === Some(inputFormat))
assert(hiveTable.storage.outputFormat === Some(outputFormat))
assert(hiveTable.storage.serde === Some(serde))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
index 0a31ac64a2..5272f4192e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
@@ -31,18 +31,25 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft
val df = sparkContext.parallelize((1 to 10).map(i => (i, s"str$i"))).toDF("key", "value")
override def beforeAll(): Unit = {
+ super.beforeAll()
// The catalog in HiveContext is a case insensitive one.
- sessionState.catalog.registerTable(TableIdentifier("ListTablesSuiteTable"), df.logicalPlan)
+ sessionState.catalog.createTempTable(
+ "ListTablesSuiteTable", df.logicalPlan, ignoreIfExists = true)
sql("CREATE TABLE HiveListTablesSuiteTable (key int, value string)")
sql("CREATE DATABASE IF NOT EXISTS ListTablesSuiteDB")
sql("CREATE TABLE ListTablesSuiteDB.HiveInDBListTablesSuiteTable (key int, value string)")
}
override def afterAll(): Unit = {
- sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
- sql("DROP TABLE IF EXISTS HiveListTablesSuiteTable")
- sql("DROP TABLE IF EXISTS ListTablesSuiteDB.HiveInDBListTablesSuiteTable")
- sql("DROP DATABASE IF EXISTS ListTablesSuiteDB")
+ try {
+ sessionState.catalog.dropTable(
+ TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true)
+ sql("DROP TABLE IF EXISTS HiveListTablesSuiteTable")
+ sql("DROP TABLE IF EXISTS ListTablesSuiteDB.HiveInDBListTablesSuiteTable")
+ sql("DROP DATABASE IF EXISTS ListTablesSuiteDB")
+ } finally {
+ super.afterAll()
+ }
}
test("get all tables of current database") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
index f6b9072da4..c9bcf819ef 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
@@ -27,6 +27,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
import testImplicits._
protected override def beforeAll(): Unit = {
+ super.beforeAll()
sql("DROP TABLE IF EXISTS parquet_t0")
sql("DROP TABLE IF EXISTS parquet_t1")
sql("DROP TABLE IF EXISTS parquet_t2")
@@ -64,11 +65,15 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
}
override protected def afterAll(): Unit = {
- sql("DROP TABLE IF EXISTS parquet_t0")
- sql("DROP TABLE IF EXISTS parquet_t1")
- sql("DROP TABLE IF EXISTS parquet_t2")
- sql("DROP TABLE IF EXISTS parquet_t3")
- sql("DROP TABLE IF EXISTS t0")
+ try {
+ sql("DROP TABLE IF EXISTS parquet_t0")
+ sql("DROP TABLE IF EXISTS parquet_t1")
+ sql("DROP TABLE IF EXISTS parquet_t2")
+ sql("DROP TABLE IF EXISTS parquet_t3")
+ sql("DROP TABLE IF EXISTS t0")
+ } finally {
+ super.afterAll()
+ }
}
private def checkHiveQl(hiveQl: String): Unit = {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 3f3d0692b7..71652897e6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -44,6 +44,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
var jsonFilePath: String = _
override def beforeAll(): Unit = {
+ super.beforeAll()
jsonFilePath = Utils.getSparkClassLoader.getResource("sample.json").getFile
}
@@ -693,13 +694,13 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
test("SPARK-6024 wide schema support") {
withSQLConf(SQLConf.SCHEMA_STRING_LENGTH_THRESHOLD.key -> "4000") {
withTable("wide_schema") {
- withTempDir( tempDir => {
+ withTempDir { tempDir =>
// We will need 80 splits for this schema if the threshold is 4000.
val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true)))
// Manually create a metastore data source table.
sessionState.catalog.createDataSourceTable(
- tableIdent = TableIdentifier("wide_schema"),
+ name = TableIdentifier("wide_schema"),
userSpecifiedSchema = Some(schema),
partitionColumns = Array.empty[String],
bucketSpec = None,
@@ -711,7 +712,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
val actualSchema = table("wide_schema").schema
assert(schema === actualSchema)
- })
+ }
}
}
}
@@ -737,7 +738,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
"spark.sql.sources.schema" -> schema.json,
"EXTERNAL" -> "FALSE"))
- sessionState.catalog.client.createTable(hiveTable, ignoreIfExists = false)
+ hiveCatalog.createTable("default", hiveTable, ignoreIfExists = false)
invalidateTable(tableName)
val actualSchema = table(tableName).schema
@@ -752,7 +753,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
withTable(tableName) {
df.write.format("parquet").partitionBy("d", "b").saveAsTable(tableName)
invalidateTable(tableName)
- val metastoreTable = sessionState.catalog.client.getTable("default", tableName)
+ val metastoreTable = hiveCatalog.getTable("default", tableName)
val expectedPartitionColumns = StructType(df.schema("d") :: df.schema("b") :: Nil)
val numPartCols = metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt
@@ -787,7 +788,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
.sortBy("c")
.saveAsTable(tableName)
invalidateTable(tableName)
- val metastoreTable = sessionState.catalog.client.getTable("default", tableName)
+ val metastoreTable = hiveCatalog.getTable("default", tableName)
val expectedBucketByColumns = StructType(df.schema("d") :: df.schema("b") :: Nil)
val expectedSortByColumns = StructType(df.schema("c") :: Nil)
@@ -903,11 +904,11 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
test("skip hive metadata on table creation") {
- withTempDir(tempPath => {
+ withTempDir { tempPath =>
val schema = StructType((1 to 5).map(i => StructField(s"c_$i", StringType)))
sessionState.catalog.createDataSourceTable(
- tableIdent = TableIdentifier("not_skip_hive_metadata"),
+ name = TableIdentifier("not_skip_hive_metadata"),
userSpecifiedSchema = Some(schema),
partitionColumns = Array.empty[String],
bucketSpec = None,
@@ -917,11 +918,11 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
// As a proxy for verifying that the table was stored in Hive compatible format,
// we verify that each column of the table is of native type StringType.
- assert(sessionState.catalog.client.getTable("default", "not_skip_hive_metadata").schema
+ assert(hiveCatalog.getTable("default", "not_skip_hive_metadata").schema
.forall(column => HiveMetastoreTypes.toDataType(column.dataType) == StringType))
sessionState.catalog.createDataSourceTable(
- tableIdent = TableIdentifier("skip_hive_metadata"),
+ name = TableIdentifier("skip_hive_metadata"),
userSpecifiedSchema = Some(schema),
partitionColumns = Array.empty[String],
bucketSpec = None,
@@ -929,10 +930,11 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
options = Map("path" -> tempPath.getCanonicalPath, "skipHiveMetadata" -> "true"),
isExternal = false)
- // As a proxy for verifying that the table was stored in SparkSQL format, we verify that
- // the table has a column type as array of StringType.
- assert(sessionState.catalog.client.getTable("default", "skip_hive_metadata").schema
- .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == ArrayType(StringType)))
- })
+ // As a proxy for verifying that the table was stored in SparkSQL format,
+ // we verify that the table has a column type as array of StringType.
+ assert(hiveCatalog.getTable("default", "skip_hive_metadata").schema.forall { c =>
+ HiveMetastoreTypes.toDataType(c.dataType) == ArrayType(StringType)
+ })
+ }
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
index f3af60a018..3c003506ef 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
@@ -25,9 +25,8 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
private lazy val df = sqlContext.range(10).coalesce(1).toDF()
private def checkTablePath(dbName: String, tableName: String): Unit = {
- val metastoreTable = hiveContext.sessionState.catalog.client.getTable(dbName, tableName)
- val expectedPath =
- hiveContext.sessionState.catalog.client.getDatabase(dbName).locationUri + "/" + tableName
+ val metastoreTable = hiveContext.hiveCatalog.getTable(dbName, tableName)
+ val expectedPath = hiveContext.hiveCatalog.getDatabase(dbName).locationUri + "/" + tableName
assert(metastoreTable.storage.serdeProperties("path") === expectedPath)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 151aacbdd1..ae026ed496 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -121,7 +121,8 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
intercept[UnsupportedOperationException] {
hiveContext.analyze("tempTable")
}
- hiveContext.sessionState.catalog.unregisterTable(TableIdentifier("tempTable"))
+ hiveContext.sessionState.catalog.dropTable(
+ TableIdentifier("tempTable"), ignoreIfNotExists = true)
}
test("estimates the size of a test MetastoreRelation") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 295069228f..d59bca4c7e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -171,10 +171,6 @@ class VersionsSuite extends SparkFunSuite with Logging {
assert(client.listTables("default") === Seq("src"))
}
- test(s"$version: currentDatabase") {
- assert(client.currentDatabase === "default")
- }
-
test(s"$version: getDatabase") {
client.getDatabase("default")
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
index 81fd71201d..94fbcb7ee2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
@@ -189,10 +189,14 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te
}
override def afterAll(): Unit = {
- sqlContext.sql("DROP TABLE IF EXISTS agg1")
- sqlContext.sql("DROP TABLE IF EXISTS agg2")
- sqlContext.sql("DROP TABLE IF EXISTS agg3")
- sqlContext.dropTempTable("emptyTable")
+ try {
+ sqlContext.sql("DROP TABLE IF EXISTS agg1")
+ sqlContext.sql("DROP TABLE IF EXISTS agg2")
+ sqlContext.sql("DROP TABLE IF EXISTS agg3")
+ sqlContext.dropTempTable("emptyTable")
+ } finally {
+ super.afterAll()
+ }
}
test("group by function") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 5fe85eaef2..197a123905 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -49,6 +49,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
import org.apache.spark.sql.hive.test.TestHive.implicits._
override def beforeAll() {
+ super.beforeAll()
TestHive.cacheTables = true
// Timezone is fixed to America/Los_Angeles for those timezone sensitive tests (timestamp_*)
TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
@@ -57,11 +58,14 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
}
override def afterAll() {
- TestHive.cacheTables = false
- TimeZone.setDefault(originalTimeZone)
- Locale.setDefault(originalLocale)
- sql("DROP TEMPORARY FUNCTION udtf_count2")
- super.afterAll()
+ try {
+ TestHive.cacheTables = false
+ TimeZone.setDefault(originalTimeZone)
+ Locale.setDefault(originalLocale)
+ sql("DROP TEMPORARY FUNCTION udtf_count2")
+ } finally {
+ super.afterAll()
+ }
}
test("SPARK-4908: concurrent hive native commands") {
@@ -1209,7 +1213,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
sql("USE hive_test_db")
assert("hive_test_db" == sql("select current_database()").first().getString(0))
- intercept[NoSuchDatabaseException] {
+ intercept[AnalysisException] {
sql("USE not_existing_db")
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index d7c529ab0e..b0e263dff9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -47,7 +47,7 @@ case class ListStringCaseClass(l: Seq[String])
*/
class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
- import hiveContext.{udf, sql}
+ import hiveContext.udf
import hiveContext.implicits._
test("spark sql udf test that returns a struct") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index bc8896d4bd..6199253d34 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1325,6 +1325,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
.format("parquet")
.save(path)
+ // We don't support creating a temporary table while specifying a database
val message = intercept[AnalysisException] {
sqlContext.sql(
s"""
@@ -1335,9 +1336,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
|)
""".stripMargin)
}.getMessage
- assert(message.contains("Specifying database name or other qualifiers are not allowed"))
- // If you use backticks to quote the name of a temporary table having dot in it.
+ // If you use backticks to quote the name then it's OK.
sqlContext.sql(
s"""
|CREATE TEMPORARY TABLE `db.t`
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala
index ea82b8c459..c6b7eb6366 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.test.SQLTestUtils
class WindowQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
override def beforeAll(): Unit = {
+ super.beforeAll()
sql("DROP TABLE IF EXISTS part")
sql(
"""
@@ -50,7 +51,11 @@ class WindowQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleto
}
override def afterAll(): Unit = {
- sql("DROP TABLE IF EXISTS part")
+ try {
+ sql("DROP TABLE IF EXISTS part")
+ } finally {
+ super.afterAll()
+ }
}
test("windowing.q -- 15. testExpressions") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index cc412241fb..92f424bac7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -222,7 +222,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
sql("INSERT INTO TABLE t SELECT * FROM tmp")
checkAnswer(table("t"), (data ++ data).map(Row.fromTuple))
}
- sessionState.catalog.unregisterTable(TableIdentifier("tmp"))
+ sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true)
}
test("overwriting") {
@@ -232,7 +232,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
checkAnswer(table("t"), data.map(Row.fromTuple))
}
- sessionState.catalog.unregisterTable(TableIdentifier("tmp"))
+ sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true)
}
test("self-join") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
index fe446774ef..bdd3428a89 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
@@ -68,8 +68,12 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
}
override def afterAll(): Unit = {
- orcTableDir.delete()
- orcTableAsDir.delete()
+ try {
+ orcTableDir.delete()
+ orcTableAsDir.delete()
+ } finally {
+ super.afterAll()
+ }
}
test("create temporary orc table") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index bb53179c3c..b6fc61d453 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
import java.io.File
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.DataSourceScan
import org.apache.spark.sql.execution.command.ExecutedCommand
import org.apache.spark.sql.execution.datasources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
@@ -425,10 +426,9 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
}
test("Caching converted data source Parquet Relations") {
- val _catalog = sessionState.catalog
- def checkCached(tableIdentifier: _catalog.QualifiedTableName): Unit = {
+ def checkCached(tableIdentifier: TableIdentifier): Unit = {
// Converted test_parquet should be cached.
- sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match {
+ sessionState.catalog.getCachedDataSourceTable(tableIdentifier) match {
case null => fail("Converted test_parquet should be cached in the cache.")
case logical @ LogicalRelation(parquetRelation: HadoopFsRelation, _, _) => // OK
case other =>
@@ -453,17 +453,17 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)
- var tableIdentifier = _catalog.QualifiedTableName("default", "test_insert_parquet")
+ var tableIdentifier = TableIdentifier("test_insert_parquet", Some("default"))
// First, make sure the converted test_parquet is not cached.
- assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+ assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
// Table lookup will make the table cached.
table("test_insert_parquet")
checkCached(tableIdentifier)
// For insert into non-partitioned table, we will do the conversion,
// so the converted test_insert_parquet should be cached.
invalidateTable("test_insert_parquet")
- assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+ assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
sql(
"""
|INSERT INTO TABLE test_insert_parquet
@@ -476,7 +476,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
sql("select a, b from jt").collect())
// Invalidate the cache.
invalidateTable("test_insert_parquet")
- assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+ assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
// Create a partitioned table.
sql(
@@ -493,8 +493,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)
- tableIdentifier = _catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test")
- assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+ tableIdentifier = TableIdentifier("test_parquet_partitioned_cache_test", Some("default"))
+ assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
sql(
"""
|INSERT INTO TABLE test_parquet_partitioned_cache_test
@@ -503,14 +503,14 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
""".stripMargin)
// Right now, insert into a partitioned Parquet is not supported in data source Parquet.
// So, we expect it is not cached.
- assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+ assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
sql(
"""
|INSERT INTO TABLE test_parquet_partitioned_cache_test
|PARTITION (`date`='2015-04-02')
|select a, b from jt
""".stripMargin)
- assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+ assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
// Make sure we can cache the partitioned table.
table("test_parquet_partitioned_cache_test")
@@ -526,7 +526,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
""".stripMargin).collect())
invalidateTable("test_parquet_partitioned_cache_test")
- assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+ assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test")
}
@@ -700,6 +700,7 @@ abstract class ParquetPartitioningTest extends QueryTest with SQLTestUtils with
var partitionedTableDirWithKeyAndComplexTypes: File = null
override def beforeAll(): Unit = {
+ super.beforeAll()
partitionedTableDir = Utils.createTempDir()
normalTableDir = Utils.createTempDir()