diff options
author | Andrew Or <andrew@databricks.com> | 2016-03-23 22:21:15 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-03-23 22:21:15 -0700 |
commit | c44d140cae99d0b880e6d25f158125ad3adc6a05 (patch) | |
tree | 7f0e5324e67efeff2cccf661cd27a21c3618098c /sql/hive | |
parent | cf823bead18c5be86b36da59b4bbf935c4804d04 (diff) | |
download | spark-c44d140cae99d0b880e6d25f158125ad3adc6a05.tar.gz spark-c44d140cae99d0b880e6d25f158125ad3adc6a05.tar.bz2 spark-c44d140cae99d0b880e6d25f158125ad3adc6a05.zip |
Revert "[SPARK-14014][SQL] Replace existing catalog with SessionCatalog"
This reverts commit 5dfc01976bb0d72489620b4f32cc12d620bb6260.
Diffstat (limited to 'sql/hive')
24 files changed, 384 insertions, 631 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala index 0722fb02a8..491f2aebb4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala @@ -85,6 +85,7 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit withClient { getTable(db, table) } } + // -------------------------------------------------------------------------- // Databases // -------------------------------------------------------------------------- @@ -181,10 +182,6 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit client.getTable(db, table) } - override def tableExists(db: String, table: String): Boolean = withClient { - client.getTableOption(db, table).isDefined - } - override def listTables(db: String): Seq[String] = withClient { requireDbExists(db) client.listTables(db) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index ca3ce43591..914f8e9a98 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -28,7 +28,6 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap import scala.language.implicitConversions -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.common.`type`.HiveDecimal @@ -39,7 +38,7 @@ import org.apache.hadoop.hive.ql.parse.VariableSubstitution import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable} import org.apache.hadoop.util.VersionInfo -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.SparkContext import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.internal.Logging import org.apache.spark.sql._ @@ -53,7 +52,6 @@ import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand} import org.apache.spark.sql.execution.ui.SQLListener import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.SQLConfEntry import org.apache.spark.sql.internal.SQLConf.SQLConfEntry._ import org.apache.spark.sql.types._ @@ -69,7 +67,7 @@ private[hive] case class CurrentDatabase(ctx: HiveContext) override def foldable: Boolean = true override def nullable: Boolean = false override def eval(input: InternalRow): Any = { - UTF8String.fromString(ctx.sessionState.catalog.getCurrentDatabase) + UTF8String.fromString(ctx.metadataHive.currentDatabase) } } @@ -83,31 +81,15 @@ class HiveContext private[hive]( sc: SparkContext, cacheManager: CacheManager, listener: SQLListener, - @transient private[hive] val executionHive: HiveClientImpl, - @transient private[hive] val metadataHive: HiveClient, - isRootContext: Boolean, - @transient private[sql] val hiveCatalog: HiveCatalog) - extends SQLContext(sc, cacheManager, listener, isRootContext, hiveCatalog) with Logging { + @transient private val execHive: HiveClientImpl, + @transient private val metaHive: HiveClient, + isRootContext: Boolean) + extends SQLContext(sc, cacheManager, listener, isRootContext) with Logging { self => - private def this(sc: SparkContext, execHive: HiveClientImpl, metaHive: HiveClient) { - this( - sc, - new CacheManager, - SQLContext.createListenerAndUI(sc), - execHive, - metaHive, - true, - new HiveCatalog(metaHive)) - } - def this(sc: SparkContext) = { - this( - sc, - HiveContext.newClientForExecution(sc.conf, sc.hadoopConfiguration), - HiveContext.newClientForMetadata(sc.conf, sc.hadoopConfiguration)) + this(sc, new CacheManager, SQLContext.createListenerAndUI(sc), null, null, true) } - def this(sc: JavaSparkContext) = this(sc.sc) import org.apache.spark.sql.hive.HiveContext._ @@ -124,10 +106,9 @@ class HiveContext private[hive]( sc = sc, cacheManager = cacheManager, listener = listener, - executionHive = executionHive.newSession(), - metadataHive = metadataHive.newSession(), - isRootContext = false, - hiveCatalog = hiveCatalog) + execHive = executionHive.newSession(), + metaHive = metadataHive.newSession(), + isRootContext = false) } @transient @@ -168,6 +149,41 @@ class HiveContext private[hive]( */ protected[sql] def convertCTAS: Boolean = getConf(CONVERT_CTAS) + /** + * The version of the hive client that will be used to communicate with the metastore. Note that + * this does not necessarily need to be the same version of Hive that is used internally by + * Spark SQL for execution. + */ + protected[hive] def hiveMetastoreVersion: String = getConf(HIVE_METASTORE_VERSION) + + /** + * The location of the jars that should be used to instantiate the HiveMetastoreClient. This + * property can be one of three options: + * - a classpath in the standard format for both hive and hadoop. + * - builtin - attempt to discover the jars that were used to load Spark SQL and use those. This + * option is only valid when using the execution version of Hive. + * - maven - download the correct version of hive on demand from maven. + */ + protected[hive] def hiveMetastoreJars: String = getConf(HIVE_METASTORE_JARS) + + /** + * A comma separated list of class prefixes that should be loaded using the classloader that + * is shared between Spark SQL and a specific version of Hive. An example of classes that should + * be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need + * to be shared are those that interact with classes that are already shared. For example, + * custom appenders that are used by log4j. + */ + protected[hive] def hiveMetastoreSharedPrefixes: Seq[String] = + getConf(HIVE_METASTORE_SHARED_PREFIXES).filterNot(_ == "") + + /** + * A comma separated list of class prefixes that should explicitly be reloaded for each version + * of Hive that Spark SQL is communicating with. For example, Hive UDFs that are declared in a + * prefix that typically would be shared (i.e. org.apache.spark.*) + */ + protected[hive] def hiveMetastoreBarrierPrefixes: Seq[String] = + getConf(HIVE_METASTORE_BARRIER_PREFIXES).filterNot(_ == "") + /* * hive thrift server use background spark sql thread pool to execute sql queries */ @@ -180,6 +196,29 @@ class HiveContext private[hive]( protected[sql] lazy val substitutor = new VariableSubstitution() /** + * The copy of the hive client that is used for execution. Currently this must always be + * Hive 13 as this is the version of Hive that is packaged with Spark SQL. This copy of the + * client is used for execution related tasks like registering temporary functions or ensuring + * that the ThreadLocal SessionState is correctly populated. This copy of Hive is *not* used + * for storing persistent metadata, and only point to a dummy metastore in a temporary directory. + */ + @transient + protected[hive] lazy val executionHive: HiveClientImpl = if (execHive != null) { + execHive + } else { + logInfo(s"Initializing execution hive, version $hiveExecutionVersion") + val loader = new IsolatedClientLoader( + version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion), + sparkConf = sc.conf, + execJars = Seq(), + hadoopConf = sc.hadoopConfiguration, + config = newTemporaryConfiguration(useInMemoryDerby = true), + isolationOn = false, + baseClassLoader = Utils.getContextOrSparkClassLoader) + loader.createClient().asInstanceOf[HiveClientImpl] + } + + /** * Overrides default Hive configurations to avoid breaking changes to Spark SQL users. * - allow SQL11 keywords to be used as identifiers */ @@ -189,6 +228,111 @@ class HiveContext private[hive]( defaultOverrides() + /** + * The copy of the Hive client that is used to retrieve metadata from the Hive MetaStore. + * The version of the Hive client that is used here must match the metastore that is configured + * in the hive-site.xml file. + */ + @transient + protected[hive] lazy val metadataHive: HiveClient = if (metaHive != null) { + metaHive + } else { + val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion) + + // We instantiate a HiveConf here to read in the hive-site.xml file and then pass the options + // into the isolated client loader + val metadataConf = new HiveConf(sc.hadoopConfiguration, classOf[HiveConf]) + + val defaultWarehouseLocation = metadataConf.get("hive.metastore.warehouse.dir") + logInfo("default warehouse location is " + defaultWarehouseLocation) + + // `configure` goes second to override other settings. + val allConfig = metadataConf.asScala.map(e => e.getKey -> e.getValue).toMap ++ configure + + val isolatedLoader = if (hiveMetastoreJars == "builtin") { + if (hiveExecutionVersion != hiveMetastoreVersion) { + throw new IllegalArgumentException( + "Builtin jars can only be used when hive execution version == hive metastore version. " + + s"Execution: ${hiveExecutionVersion} != Metastore: ${hiveMetastoreVersion}. " + + "Specify a vaild path to the correct hive jars using $HIVE_METASTORE_JARS " + + s"or change ${HIVE_METASTORE_VERSION.key} to $hiveExecutionVersion.") + } + + // We recursively find all jars in the class loader chain, + // starting from the given classLoader. + def allJars(classLoader: ClassLoader): Array[URL] = classLoader match { + case null => Array.empty[URL] + case urlClassLoader: URLClassLoader => + urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent) + case other => allJars(other.getParent) + } + + val classLoader = Utils.getContextOrSparkClassLoader + val jars = allJars(classLoader) + if (jars.length == 0) { + throw new IllegalArgumentException( + "Unable to locate hive jars to connect to metastore. " + + "Please set spark.sql.hive.metastore.jars.") + } + + logInfo( + s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.") + new IsolatedClientLoader( + version = metaVersion, + sparkConf = sc.conf, + execJars = jars.toSeq, + hadoopConf = sc.hadoopConfiguration, + config = allConfig, + isolationOn = true, + barrierPrefixes = hiveMetastoreBarrierPrefixes, + sharedPrefixes = hiveMetastoreSharedPrefixes) + } else if (hiveMetastoreJars == "maven") { + // TODO: Support for loading the jars from an already downloaded location. + logInfo( + s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.") + IsolatedClientLoader.forVersion( + hiveMetastoreVersion = hiveMetastoreVersion, + hadoopVersion = VersionInfo.getVersion, + sparkConf = sc.conf, + hadoopConf = sc.hadoopConfiguration, + config = allConfig, + barrierPrefixes = hiveMetastoreBarrierPrefixes, + sharedPrefixes = hiveMetastoreSharedPrefixes) + } else { + // Convert to files and expand any directories. + val jars = + hiveMetastoreJars + .split(File.pathSeparator) + .flatMap { + case path if new File(path).getName() == "*" => + val files = new File(path).getParentFile().listFiles() + if (files == null) { + logWarning(s"Hive jar path '$path' does not exist.") + Nil + } else { + files.filter(_.getName().toLowerCase().endsWith(".jar")) + } + case path => + new File(path) :: Nil + } + .map(_.toURI.toURL) + + logInfo( + s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " + + s"using ${jars.mkString(":")}") + new IsolatedClientLoader( + version = metaVersion, + sparkConf = sc.conf, + execJars = jars.toSeq, + hadoopConf = sc.hadoopConfiguration, + config = allConfig, + isolationOn = true, + barrierPrefixes = hiveMetastoreBarrierPrefixes, + sharedPrefixes = hiveMetastoreSharedPrefixes) + } + isolatedLoader.createClient() + } + protected[sql] override def parseSql(sql: String): LogicalPlan = { executionHive.withHiveState { super.parseSql(substitutor.substitute(hiveconf, sql)) @@ -288,7 +432,7 @@ class HiveContext private[hive]( // recorded in the Hive metastore. // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats(). if (newTotalSize > 0 && newTotalSize != oldTotalSize) { - sessionState.catalog.alterTable( + sessionState.catalog.client.alterTable( relation.table.copy( properties = relation.table.properties + (StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString))) @@ -315,10 +459,64 @@ class HiveContext private[hive]( setConf(entry.key, entry.stringConverter(value)) } + /** Overridden by child classes that need to set configuration before the client init. */ + protected def configure(): Map[String, String] = { + // Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch + // of time `ConfVar`s by adding time suffixes (`s`, `ms`, and `d` etc.). This breaks backwards- + // compatibility when users are trying to connecting to a Hive metastore of lower version, + // because these options are expected to be integral values in lower versions of Hive. + // + // Here we enumerate all time `ConfVar`s and convert their values to numeric strings according + // to their output time units. + Seq( + ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY -> TimeUnit.SECONDS, + ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT -> TimeUnit.SECONDS, + ConfVars.METASTORE_CLIENT_SOCKET_LIFETIME -> TimeUnit.SECONDS, + ConfVars.HMSHANDLERINTERVAL -> TimeUnit.MILLISECONDS, + ConfVars.METASTORE_EVENT_DB_LISTENER_TTL -> TimeUnit.SECONDS, + ConfVars.METASTORE_EVENT_CLEAN_FREQ -> TimeUnit.SECONDS, + ConfVars.METASTORE_EVENT_EXPIRY_DURATION -> TimeUnit.SECONDS, + ConfVars.METASTORE_AGGREGATE_STATS_CACHE_TTL -> TimeUnit.SECONDS, + ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_WRITER_WAIT -> TimeUnit.MILLISECONDS, + ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_READER_WAIT -> TimeUnit.MILLISECONDS, + ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT -> TimeUnit.SECONDS, + ConfVars.HIVE_LOG_INCREMENTAL_PLAN_PROGRESS_INTERVAL -> TimeUnit.MILLISECONDS, + ConfVars.HIVE_STATS_JDBC_TIMEOUT -> TimeUnit.SECONDS, + ConfVars.HIVE_STATS_RETRIES_WAIT -> TimeUnit.MILLISECONDS, + ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES -> TimeUnit.SECONDS, + ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT -> TimeUnit.MILLISECONDS, + ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME -> TimeUnit.MILLISECONDS, + ConfVars.HIVE_TXN_TIMEOUT -> TimeUnit.SECONDS, + ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT -> TimeUnit.SECONDS, + ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL -> TimeUnit.SECONDS, + ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL -> TimeUnit.MILLISECONDS, + ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME -> TimeUnit.MILLISECONDS, + ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME -> TimeUnit.SECONDS, + ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_MAX_AGE -> TimeUnit.SECONDS, + ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH -> TimeUnit.MILLISECONDS, + ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT -> TimeUnit.SECONDS, + ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME -> TimeUnit.SECONDS, + ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT -> TimeUnit.SECONDS, + ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME -> TimeUnit.SECONDS, + ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT -> TimeUnit.MILLISECONDS, + ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL -> TimeUnit.MILLISECONDS, + ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT -> TimeUnit.MILLISECONDS, + ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT -> TimeUnit.MILLISECONDS, + ConfVars.SERVER_READ_SOCKET_TIMEOUT -> TimeUnit.SECONDS, + ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL -> TimeUnit.MILLISECONDS, + ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT -> TimeUnit.SECONDS, + ConfVars.SPARK_JOB_MONITOR_TIMEOUT -> TimeUnit.SECONDS, + ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT -> TimeUnit.MILLISECONDS, + ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT -> TimeUnit.MILLISECONDS + ).map { case (confVar, unit) => + confVar.varname -> hiveconf.getTimeVar(confVar, unit).toString + }.toMap + } + /** * SQLConf and HiveConf contracts: * - * 1. create a new o.a.h.hive.ql.session.SessionState for each HiveContext + * 1. create a new SessionState for each HiveContext * 2. when the Hive session is first initialized, params in HiveConf will get picked up by the * SQLConf. Additionally, any properties set by set() or a SET command inside sql() will be * set in the SQLConf *as well as* in the HiveConf. @@ -402,7 +600,7 @@ class HiveContext private[hive]( } -private[hive] object HiveContext extends Logging { +private[hive] object HiveContext { /** The version of hive used internally by Spark SQL. */ val hiveExecutionVersion: String = "1.2.1" @@ -468,242 +666,6 @@ private[hive] object HiveContext extends Logging { defaultValue = Some(true), doc = "When set to true, Hive Thrift server executes SQL queries in an asynchronous way.") - /** - * The version of the hive client that will be used to communicate with the metastore. Note that - * this does not necessarily need to be the same version of Hive that is used internally by - * Spark SQL for execution. - */ - private def hiveMetastoreVersion(conf: SQLConf): String = { - conf.getConf(HIVE_METASTORE_VERSION) - } - - /** - * The location of the jars that should be used to instantiate the HiveMetastoreClient. This - * property can be one of three options: - * - a classpath in the standard format for both hive and hadoop. - * - builtin - attempt to discover the jars that were used to load Spark SQL and use those. This - * option is only valid when using the execution version of Hive. - * - maven - download the correct version of hive on demand from maven. - */ - private def hiveMetastoreJars(conf: SQLConf): String = { - conf.getConf(HIVE_METASTORE_JARS) - } - - /** - * A comma separated list of class prefixes that should be loaded using the classloader that - * is shared between Spark SQL and a specific version of Hive. An example of classes that should - * be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need - * to be shared are those that interact with classes that are already shared. For example, - * custom appenders that are used by log4j. - */ - private def hiveMetastoreSharedPrefixes(conf: SQLConf): Seq[String] = { - conf.getConf(HIVE_METASTORE_SHARED_PREFIXES).filterNot(_ == "") - } - - /** - * A comma separated list of class prefixes that should explicitly be reloaded for each version - * of Hive that Spark SQL is communicating with. For example, Hive UDFs that are declared in a - * prefix that typically would be shared (i.e. org.apache.spark.*) - */ - private def hiveMetastoreBarrierPrefixes(conf: SQLConf): Seq[String] = { - conf.getConf(HIVE_METASTORE_BARRIER_PREFIXES).filterNot(_ == "") - } - - /** - * Configurations needed to create a [[HiveClient]]. - */ - private[hive] def hiveClientConfigurations(hiveconf: HiveConf): Map[String, String] = { - // Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch - // of time `ConfVar`s by adding time suffixes (`s`, `ms`, and `d` etc.). This breaks backwards- - // compatibility when users are trying to connecting to a Hive metastore of lower version, - // because these options are expected to be integral values in lower versions of Hive. - // - // Here we enumerate all time `ConfVar`s and convert their values to numeric strings according - // to their output time units. - Seq( - ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY -> TimeUnit.SECONDS, - ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT -> TimeUnit.SECONDS, - ConfVars.METASTORE_CLIENT_SOCKET_LIFETIME -> TimeUnit.SECONDS, - ConfVars.HMSHANDLERINTERVAL -> TimeUnit.MILLISECONDS, - ConfVars.METASTORE_EVENT_DB_LISTENER_TTL -> TimeUnit.SECONDS, - ConfVars.METASTORE_EVENT_CLEAN_FREQ -> TimeUnit.SECONDS, - ConfVars.METASTORE_EVENT_EXPIRY_DURATION -> TimeUnit.SECONDS, - ConfVars.METASTORE_AGGREGATE_STATS_CACHE_TTL -> TimeUnit.SECONDS, - ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_WRITER_WAIT -> TimeUnit.MILLISECONDS, - ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_READER_WAIT -> TimeUnit.MILLISECONDS, - ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT -> TimeUnit.SECONDS, - ConfVars.HIVE_LOG_INCREMENTAL_PLAN_PROGRESS_INTERVAL -> TimeUnit.MILLISECONDS, - ConfVars.HIVE_STATS_JDBC_TIMEOUT -> TimeUnit.SECONDS, - ConfVars.HIVE_STATS_RETRIES_WAIT -> TimeUnit.MILLISECONDS, - ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES -> TimeUnit.SECONDS, - ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT -> TimeUnit.MILLISECONDS, - ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME -> TimeUnit.MILLISECONDS, - ConfVars.HIVE_TXN_TIMEOUT -> TimeUnit.SECONDS, - ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT -> TimeUnit.SECONDS, - ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL -> TimeUnit.SECONDS, - ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL -> TimeUnit.MILLISECONDS, - ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME -> TimeUnit.MILLISECONDS, - ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME -> TimeUnit.SECONDS, - ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_MAX_AGE -> TimeUnit.SECONDS, - ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH -> TimeUnit.MILLISECONDS, - ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT -> TimeUnit.SECONDS, - ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME -> TimeUnit.SECONDS, - ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT -> TimeUnit.SECONDS, - ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME -> TimeUnit.SECONDS, - ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT -> TimeUnit.MILLISECONDS, - ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL -> TimeUnit.MILLISECONDS, - ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT -> TimeUnit.MILLISECONDS, - ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT -> TimeUnit.MILLISECONDS, - ConfVars.SERVER_READ_SOCKET_TIMEOUT -> TimeUnit.SECONDS, - ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL -> TimeUnit.MILLISECONDS, - ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT -> TimeUnit.SECONDS, - ConfVars.SPARK_JOB_MONITOR_TIMEOUT -> TimeUnit.SECONDS, - ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT -> TimeUnit.MILLISECONDS, - ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT -> TimeUnit.MILLISECONDS - ).map { case (confVar, unit) => - confVar.varname -> hiveconf.getTimeVar(confVar, unit).toString - }.toMap - } - - /** - * Create a [[HiveClient]] used for execution. - * - * Currently this must always be Hive 13 as this is the version of Hive that is packaged - * with Spark SQL. This copy of the client is used for execution related tasks like - * registering temporary functions or ensuring that the ThreadLocal SessionState is - * correctly populated. This copy of Hive is *not* used for storing persistent metadata, - * and only point to a dummy metastore in a temporary directory. - */ - protected[hive] def newClientForExecution( - conf: SparkConf, - hadoopConf: Configuration): HiveClientImpl = { - logInfo(s"Initializing execution hive, version $hiveExecutionVersion") - val loader = new IsolatedClientLoader( - version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion), - sparkConf = conf, - execJars = Seq(), - hadoopConf = hadoopConf, - config = newTemporaryConfiguration(useInMemoryDerby = true), - isolationOn = false, - baseClassLoader = Utils.getContextOrSparkClassLoader) - loader.createClient().asInstanceOf[HiveClientImpl] - } - - /** - * Create a [[HiveClient]] used to retrieve metadata from the Hive MetaStore. - * - * The version of the Hive client that is used here must match the metastore that is configured - * in the hive-site.xml file. - */ - private def newClientForMetadata(conf: SparkConf, hadoopConf: Configuration): HiveClient = { - val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf]) - val configurations = hiveClientConfigurations(hiveConf) - newClientForMetadata(conf, hiveConf, hadoopConf, configurations) - } - - protected[hive] def newClientForMetadata( - conf: SparkConf, - hiveConf: HiveConf, - hadoopConf: Configuration, - configurations: Map[String, String]): HiveClient = { - val sqlConf = new SQLConf - sqlConf.setConf(SQLContext.getSQLProperties(conf)) - val hiveMetastoreVersion = HiveContext.hiveMetastoreVersion(sqlConf) - val hiveMetastoreJars = HiveContext.hiveMetastoreJars(sqlConf) - val hiveMetastoreSharedPrefixes = HiveContext.hiveMetastoreSharedPrefixes(sqlConf) - val hiveMetastoreBarrierPrefixes = HiveContext.hiveMetastoreBarrierPrefixes(sqlConf) - val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion) - - val defaultWarehouseLocation = hiveConf.get("hive.metastore.warehouse.dir") - logInfo("default warehouse location is " + defaultWarehouseLocation) - - // `configure` goes second to override other settings. - val allConfig = hiveConf.asScala.map(e => e.getKey -> e.getValue).toMap ++ configurations - - val isolatedLoader = if (hiveMetastoreJars == "builtin") { - if (hiveExecutionVersion != hiveMetastoreVersion) { - throw new IllegalArgumentException( - "Builtin jars can only be used when hive execution version == hive metastore version. " + - s"Execution: $hiveExecutionVersion != Metastore: $hiveMetastoreVersion. " + - "Specify a vaild path to the correct hive jars using $HIVE_METASTORE_JARS " + - s"or change ${HIVE_METASTORE_VERSION.key} to $hiveExecutionVersion.") - } - - // We recursively find all jars in the class loader chain, - // starting from the given classLoader. - def allJars(classLoader: ClassLoader): Array[URL] = classLoader match { - case null => Array.empty[URL] - case urlClassLoader: URLClassLoader => - urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent) - case other => allJars(other.getParent) - } - - val classLoader = Utils.getContextOrSparkClassLoader - val jars = allJars(classLoader) - if (jars.length == 0) { - throw new IllegalArgumentException( - "Unable to locate hive jars to connect to metastore. " + - "Please set spark.sql.hive.metastore.jars.") - } - - logInfo( - s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.") - new IsolatedClientLoader( - version = metaVersion, - sparkConf = conf, - hadoopConf = hadoopConf, - execJars = jars.toSeq, - config = allConfig, - isolationOn = true, - barrierPrefixes = hiveMetastoreBarrierPrefixes, - sharedPrefixes = hiveMetastoreSharedPrefixes) - } else if (hiveMetastoreJars == "maven") { - // TODO: Support for loading the jars from an already downloaded location. - logInfo( - s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.") - IsolatedClientLoader.forVersion( - hiveMetastoreVersion = hiveMetastoreVersion, - hadoopVersion = VersionInfo.getVersion, - sparkConf = conf, - hadoopConf = hadoopConf, - config = allConfig, - barrierPrefixes = hiveMetastoreBarrierPrefixes, - sharedPrefixes = hiveMetastoreSharedPrefixes) - } else { - // Convert to files and expand any directories. - val jars = - hiveMetastoreJars - .split(File.pathSeparator) - .flatMap { - case path if new File(path).getName == "*" => - val files = new File(path).getParentFile.listFiles() - if (files == null) { - logWarning(s"Hive jar path '$path' does not exist.") - Nil - } else { - files.filter(_.getName.toLowerCase.endsWith(".jar")) - } - case path => - new File(path) :: Nil - } - .map(_.toURI.toURL) - - logInfo( - s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " + - s"using ${jars.mkString(":")}") - new IsolatedClientLoader( - version = metaVersion, - sparkConf = conf, - hadoopConf = hadoopConf, - execJars = jars.toSeq, - config = allConfig, - isolationOn = true, - barrierPrefixes = hiveMetastoreBarrierPrefixes, - sharedPrefixes = hiveMetastoreSharedPrefixes) - } - isolatedLoader.createClient() - } - /** Constructs a configuration for hive, where the metastore is located in a temp directory. */ def newTemporaryConfiguration(useInMemoryDerby: Boolean): Map[String, String] = { val withInMemoryMode = if (useInMemoryDerby) "memory:" else "" diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index c7066d7363..27e4cfc103 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -33,7 +33,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, SaveMode, SQLContext} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation +import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.parser.DataTypeParser @@ -98,33 +98,27 @@ private[hive] object HiveSerDe { } -/** - * Legacy catalog for interacting with the Hive metastore. - * - * This is still used for things like creating data source tables, but in the future will be - * cleaned up to integrate more nicely with [[HiveCatalog]]. - */ +// TODO: replace this with o.a.s.sql.hive.HiveCatalog once we merge SQLContext and HiveContext private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveContext) - extends Logging { + extends Catalog with Logging { val conf = hive.conf + /** Usages should lock on `this`. */ + protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf) + /** A fully qualified identifier for a table (i.e., database.tableName) */ case class QualifiedTableName(database: String, name: String) - private def getCurrentDatabase: String = { - hive.sessionState.catalog.getCurrentDatabase - } - - def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = { + private def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = { QualifiedTableName( - tableIdent.database.getOrElse(getCurrentDatabase).toLowerCase, + tableIdent.database.getOrElse(client.currentDatabase).toLowerCase, tableIdent.table.toLowerCase) } private def getQualifiedTableName(t: CatalogTable): QualifiedTableName = { QualifiedTableName( - t.name.database.getOrElse(getCurrentDatabase).toLowerCase, + t.name.database.getOrElse(client.currentDatabase).toLowerCase, t.name.table.toLowerCase) } @@ -200,7 +194,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader) } - def refreshTable(tableIdent: TableIdentifier): Unit = { + override def refreshTable(tableIdent: TableIdentifier): Unit = { // refreshTable does not eagerly reload the cache. It just invalidate the cache. // Next time when we use the table, it will be populated in the cache. // Since we also cache ParquetRelations converted from Hive Parquet tables and @@ -414,7 +408,12 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte new Path(new Path(client.getDatabase(dbName).locationUri), tblName).toString } - def lookupRelation( + override def tableExists(tableIdent: TableIdentifier): Boolean = { + val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent) + client.getTableOption(dbName, tblName).isDefined + } + + override def lookupRelation( tableIdent: TableIdentifier, alias: Option[String]): LogicalPlan = { val qualifiedTableName = getQualifiedTableName(tableIdent) @@ -556,6 +555,12 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte result.copy(expectedOutputAttributes = Some(metastoreRelation.output)) } + override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { + val db = databaseName.getOrElse(client.currentDatabase) + + client.listTables(db).map(tableName => (tableName, false)) + } + /** * When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet * data source relations for better performance. @@ -711,6 +716,27 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte } } + /** + * UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore. + * For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]]. + */ + override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = { + throw new UnsupportedOperationException + } + + /** + * UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore. + * For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]]. + */ + override def unregisterTable(tableIdent: TableIdentifier): Unit = { + throw new UnsupportedOperationException + } + + override def unregisterAllTables(): Unit = {} + + override def setCurrentDatabase(databaseName: String): Unit = { + client.setCurrentDatabase(databaseName) + } } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala deleted file mode 100644 index aa44cba4b5..0000000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.SessionCatalog -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources.BucketSpec -import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.StructType - - -class HiveSessionCatalog( - externalCatalog: HiveCatalog, - client: HiveClient, - context: HiveContext, - conf: SQLConf) - extends SessionCatalog(externalCatalog, conf) { - - override def setCurrentDatabase(db: String): Unit = { - super.setCurrentDatabase(db) - client.setCurrentDatabase(db) - } - - override def lookupRelation(name: TableIdentifier, alias: Option[String]): LogicalPlan = { - val table = formatTableName(name.table) - if (name.database.isDefined || !tempTables.containsKey(table)) { - val newName = name.copy(table = table) - metastoreCatalog.lookupRelation(newName, alias) - } else { - val relation = tempTables.get(table) - val tableWithQualifiers = SubqueryAlias(table, relation) - // If an alias was specified by the lookup, wrap the plan in a subquery so that - // attributes are properly qualified with this alias. - alias.map(a => SubqueryAlias(a, tableWithQualifiers)).getOrElse(tableWithQualifiers) - } - } - - // ---------------------------------------------------------------- - // | Methods and fields for interacting with HiveMetastoreCatalog | - // ---------------------------------------------------------------- - - // Catalog for handling data source tables. TODO: This really doesn't belong here since it is - // essentially a cache for metastore tables. However, it relies on a lot of session-specific - // things so it would be a lot of work to split its functionality between HiveSessionCatalog - // and HiveCatalog. We should still do it at some point... - private val metastoreCatalog = new HiveMetastoreCatalog(client, context) - - val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions - val CreateTables: Rule[LogicalPlan] = metastoreCatalog.CreateTables - val PreInsertionCasts: Rule[LogicalPlan] = metastoreCatalog.PreInsertionCasts - - override def refreshTable(name: TableIdentifier): Unit = { - metastoreCatalog.refreshTable(name) - } - - def invalidateTable(name: TableIdentifier): Unit = { - metastoreCatalog.invalidateTable(name) - } - - def invalidateCache(): Unit = { - metastoreCatalog.cachedDataSourceTables.invalidateAll() - } - - def createDataSourceTable( - name: TableIdentifier, - userSpecifiedSchema: Option[StructType], - partitionColumns: Array[String], - bucketSpec: Option[BucketSpec], - provider: String, - options: Map[String, String], - isExternal: Boolean): Unit = { - metastoreCatalog.createDataSourceTable( - name, userSpecifiedSchema, partitionColumns, bucketSpec, provider, options, isExternal) - } - - def hiveDefaultTableFilePath(name: TableIdentifier): String = { - metastoreCatalog.hiveDefaultTableFilePath(name) - } - - // For testing only - private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = { - val key = metastoreCatalog.getQualifiedTableName(table) - metastoreCatalog.cachedDataSourceTables.getIfPresent(key) - } - -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index caa7f296ed..d9cd96d66f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry, OverrideCatalog} import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.execution.{python, SparkPlanner} import org.apache.spark.sql.execution.datasources._ @@ -35,11 +35,9 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) } /** - * Internal catalog for managing table and database states. + * A metadata catalog that points to the Hive metastore. */ - override lazy val catalog = { - new HiveSessionCatalog(ctx.hiveCatalog, ctx.metadataHive, ctx, conf) - } + override lazy val catalog = new HiveMetastoreCatalog(ctx.metadataHive, ctx) with OverrideCatalog /** * Internal catalog for managing functions registered by the user. @@ -63,7 +61,7 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) DataSourceAnalysis :: (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil) - override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog)) + override val extendedCheckRules = Seq(PreWriteCheck(catalog)) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index f4d30358ca..d214e5288e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -53,6 +53,9 @@ private[hive] trait HiveClient { /** Returns the names of tables in the given database that matches the given pattern. */ def listTables(dbName: String, pattern: String): Seq[String] + /** Returns the name of the active database. */ + def currentDatabase: String + /** Sets the name of current database. */ def setCurrentDatabase(databaseName: String): Unit diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index e4e15d13df..928408c52b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -241,6 +241,10 @@ private[hive] class HiveClientImpl( state.err = stream } + override def currentDatabase: String = withHiveState { + state.getCurrentDatabase + } + override def setCurrentDatabase(databaseName: String): Unit = withHiveState { if (getDatabaseOption(databaseName).isDefined) { state.setCurrentDatabase(databaseName) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index 5a61eef0f2..391e2975d0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -69,10 +69,10 @@ case class CreateTableAsSelect( withFormat } - hiveContext.sessionState.catalog.createTable(withSchema, ignoreIfExists = false) + hiveContext.sessionState.catalog.client.createTable(withSchema, ignoreIfExists = false) // Get the Metastore Relation - hiveContext.sessionState.catalog.lookupRelation(tableIdentifier) match { + hiveContext.sessionState.catalog.lookupRelation(tableIdentifier, None) match { case r: MetastoreRelation => r } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala index 9ff520da1d..8a1cf2caaa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala @@ -56,7 +56,7 @@ private[hive] case class CreateViewAsSelect( case true if orReplace => // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` - hiveContext.metadataHive.alertView(prepareTable(sqlContext)) + hiveContext.sessionState.catalog.client.alertView(prepareTable(sqlContext)) case true => // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already @@ -66,7 +66,7 @@ private[hive] case class CreateViewAsSelect( "CREATE OR REPLACE VIEW AS") case false => - hiveContext.metadataHive.createView(prepareTable(sqlContext)) + hiveContext.sessionState.catalog.client.createView(prepareTable(sqlContext)) } Seq.empty[Row] diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 430fa4616f..4ffd868242 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -45,7 +45,7 @@ case class InsertIntoHiveTable( @transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext] @transient private lazy val hiveContext = new Context(sc.hiveconf) - @transient private lazy val client = sc.metadataHive + @transient private lazy val catalog = sc.sessionState.catalog def output: Seq[Attribute] = Seq.empty @@ -186,8 +186,8 @@ case class InsertIntoHiveTable( // TODO: Correctly set isSkewedStoreAsSubdir. val isSkewedStoreAsSubdir = false if (numDynamicPartitions > 0) { - client.synchronized { - client.loadDynamicPartitions( + catalog.synchronized { + catalog.client.loadDynamicPartitions( outputPath.toString, qualifiedTableName, orderedPartitionSpec, @@ -202,12 +202,12 @@ case class InsertIntoHiveTable( // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries // scalastyle:on val oldPart = - client.getPartitionOption( - client.getTable(table.databaseName, table.tableName), + catalog.client.getPartitionOption( + catalog.client.getTable(table.databaseName, table.tableName), partitionSpec) if (oldPart.isEmpty || !ifNotExists) { - client.loadPartition( + catalog.client.loadPartition( outputPath.toString, qualifiedTableName, orderedPartitionSpec, @@ -218,7 +218,7 @@ case class InsertIntoHiveTable( } } } else { - client.loadTable( + catalog.client.loadTable( outputPath.toString, // TODO: URI qualifiedTableName, overwrite, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index cd26a68f35..226b8e1796 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -71,8 +71,7 @@ case class DropTable( } hiveContext.invalidateTable(tableName) hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName") - hiveContext.sessionState.catalog.dropTable( - TableIdentifier(tableName), ignoreIfNotExists = true) + hiveContext.sessionState.catalog.unregisterTable(TableIdentifier(tableName)) Seq.empty[Row] } } @@ -143,8 +142,7 @@ case class CreateMetastoreDataSource( val optionsWithPath = if (!options.contains("path") && managedIfNoPath) { isExternal = false - options + ("path" -> - hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent)) + options + ("path" -> hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent)) } else { options } @@ -202,8 +200,7 @@ case class CreateMetastoreDataSourceAsSelect( val optionsWithPath = if (!options.contains("path")) { isExternal = false - options + ("path" -> - hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent)) + options + ("path" -> hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent)) } else { options } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 1155903037..19c05f9cb0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -24,8 +24,6 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.language.implicitConversions -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.exec.FunctionRegistry import org.apache.hadoop.hive.ql.processors._ @@ -37,11 +35,9 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.expressions.ExpressionInfo import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.CacheManager import org.apache.spark.sql.execution.command.CacheTableCommand -import org.apache.spark.sql.execution.ui.SQLListener import org.apache.spark.sql.hive._ -import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl} +import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.hive.execution.HiveNativeCommand import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.{ShutdownHookManager, Utils} @@ -75,77 +71,10 @@ trait TestHiveSingleton { * hive metastore seems to lead to weird non-deterministic failures. Therefore, the execution of * test cases that rely on TestHive must be serialized. */ -class TestHiveContext private[hive]( - sc: SparkContext, - cacheManager: CacheManager, - listener: SQLListener, - executionHive: HiveClientImpl, - metadataHive: HiveClient, - isRootContext: Boolean, - hiveCatalog: HiveCatalog, - val warehousePath: File, - val scratchDirPath: File) - extends HiveContext( - sc, - cacheManager, - listener, - executionHive, - metadataHive, - isRootContext, - hiveCatalog) { self => - - // Unfortunately, due to the complex interactions between the construction parameters - // and the limitations in scala constructors, we need many of these constructors to - // provide a shorthand to create a new TestHiveContext with only a SparkContext. - // This is not a great design pattern but it's necessary here. - - private def this( - sc: SparkContext, - executionHive: HiveClientImpl, - metadataHive: HiveClient, - warehousePath: File, - scratchDirPath: File) { - this( - sc, - new CacheManager, - SQLContext.createListenerAndUI(sc), - executionHive, - metadataHive, - true, - new HiveCatalog(metadataHive), - warehousePath, - scratchDirPath) - } - - private def this(sc: SparkContext, warehousePath: File, scratchDirPath: File) { - this( - sc, - HiveContext.newClientForExecution(sc.conf, sc.hadoopConfiguration), - TestHiveContext.newClientForMetadata( - sc.conf, sc.hadoopConfiguration, warehousePath, scratchDirPath), - warehousePath, - scratchDirPath) - } +class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { + self => - def this(sc: SparkContext) { - this( - sc, - Utils.createTempDir(namePrefix = "warehouse"), - TestHiveContext.makeScratchDir()) - } - - override def newSession(): HiveContext = { - new TestHiveContext( - sc = sc, - cacheManager = cacheManager, - listener = listener, - executionHive = executionHive.newSession(), - metadataHive = metadataHive.newSession(), - isRootContext = false, - hiveCatalog = hiveCatalog, - warehousePath = warehousePath, - scratchDirPath = scratchDirPath) - } + import HiveContext._ // By clearing the port we force Spark to pick a new one. This allows us to rerun tests // without restarting the JVM. @@ -154,6 +83,26 @@ class TestHiveContext private[hive]( hiveconf.set("hive.plan.serialization.format", "javaXML") + lazy val warehousePath = Utils.createTempDir(namePrefix = "warehouse-") + + lazy val scratchDirPath = { + val dir = Utils.createTempDir(namePrefix = "scratch-") + dir.delete() + dir + } + + private lazy val temporaryConfig = newTemporaryConfiguration(useInMemoryDerby = false) + + /** Sets up the system initially or after a RESET command */ + protected override def configure(): Map[String, String] = { + super.configure() ++ temporaryConfig ++ Map( + ConfVars.METASTOREWAREHOUSE.varname -> warehousePath.toURI.toString, + ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true", + ConfVars.SCRATCHDIR.varname -> scratchDirPath.toURI.toString, + ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY.varname -> "1" + ) + } + val testTempDir = Utils.createTempDir() // For some hive test case which contain ${system:test.tmp.dir} @@ -478,9 +427,9 @@ class TestHiveContext private[hive]( cacheManager.clearCache() loadedTables.clear() - sessionState.catalog.clearTempTables() - sessionState.catalog.invalidateCache() - metadataHive.reset() + sessionState.catalog.cachedDataSourceTables.invalidateAll() + sessionState.catalog.client.reset() + sessionState.catalog.unregisterAllTables() FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)). foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) } @@ -499,8 +448,13 @@ class TestHiveContext private[hive]( // Lots of tests fail if we do not change the partition whitelist from the default. runSqlHive("set hive.metastore.partition.name.whitelist.pattern=.*") + configure().foreach { + case (k, v) => + metadataHive.runSqlHive(s"SET $k=$v") + } defaultOverrides() - sessionState.catalog.setCurrentDatabase("default") + + runSqlHive("USE default") } catch { case e: Exception => logError("FATAL ERROR: Failed to reset TestDB state.", e) @@ -536,43 +490,4 @@ private[hive] object TestHiveContext { // Fewer shuffle partitions to speed up testing. SQLConf.SHUFFLE_PARTITIONS.key -> "5" ) - - /** - * Create a [[HiveClient]] used to retrieve metadata from the Hive MetaStore. - */ - private def newClientForMetadata( - conf: SparkConf, - hadoopConf: Configuration, - warehousePath: File, - scratchDirPath: File): HiveClient = { - val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf]) - HiveContext.newClientForMetadata( - conf, - hiveConf, - hadoopConf, - hiveClientConfigurations(hiveConf, warehousePath, scratchDirPath)) - } - - /** - * Configurations needed to create a [[HiveClient]]. - */ - private def hiveClientConfigurations( - hiveconf: HiveConf, - warehousePath: File, - scratchDirPath: File): Map[String, String] = { - HiveContext.hiveClientConfigurations(hiveconf) ++ - HiveContext.newTemporaryConfiguration(useInMemoryDerby = false) ++ Map( - ConfVars.METASTOREWAREHOUSE.varname -> warehousePath.toURI.toString, - ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true", - ConfVars.SCRATCHDIR.varname -> scratchDirPath.toURI.toString, - ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY.varname -> "1" - ) - } - - private def makeScratchDir(): File = { - val scratchDir = Utils.createTempDir(namePrefix = "scratch") - scratchDir.delete() - scratchDir - } - } diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java index 2fc38e2b2d..bd14a243ea 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java @@ -70,9 +70,8 @@ public class JavaMetastoreDataSourcesSuite { if (path.exists()) { path.delete(); } - hiveManagedPath = new Path( - sqlContext.sessionState().catalog().hiveDefaultTableFilePath( - new TableIdentifier("javaSavedTable"))); + hiveManagedPath = new Path(sqlContext.sessionState().catalog().hiveDefaultTableFilePath( + new TableIdentifier("javaSavedTable"))); fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration()); if (fs.exists(hiveManagedPath)){ fs.delete(hiveManagedPath, true); diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala deleted file mode 100644 index fa0c4d92cd..0000000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.spark.sql.hive - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.hive.test.TestHive - - -class HiveContextSuite extends SparkFunSuite { - - // TODO: investigate; this passes locally but fails on Jenkins for some reason. - ignore("HiveContext can access `spark.sql.*` configs") { - // Avoid creating another SparkContext in the same JVM - val sc = TestHive.sparkContext - require(sc.conf.get("spark.sql.hive.metastore.barrierPrefixes") == - "org.apache.spark.sql.hive.execution.PairSerDe") - assert(TestHive.getConf("spark.sql.hive.metastore.barrierPrefixes") == - "org.apache.spark.sql.hive.execution.PairSerDe") - assert(TestHive.metadataHive.getConf("spark.sql.hive.metastore.barrierPrefixes", "") == - "org.apache.spark.sql.hive.execution.PairSerDe") - } - -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 42cbfee10e..ce7b08ab72 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -21,7 +21,6 @@ import java.io.File import org.apache.spark.SparkFunSuite import org.apache.spark.sql.{QueryTest, Row, SaveMode} -import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf @@ -84,7 +83,7 @@ class DataSourceWithHiveMetastoreCatalogSuite .saveAsTable("t") } - val hiveTable = sessionState.catalog.getTable(TableIdentifier("t", Some("default"))) + val hiveTable = sessionState.catalog.client.getTable("default", "t") assert(hiveTable.storage.inputFormat === Some(inputFormat)) assert(hiveTable.storage.outputFormat === Some(outputFormat)) assert(hiveTable.storage.serde === Some(serde)) @@ -115,8 +114,7 @@ class DataSourceWithHiveMetastoreCatalogSuite .saveAsTable("t") } - val hiveTable = - sessionState.catalog.getTable(TableIdentifier("t", Some("default"))) + val hiveTable = sessionState.catalog.client.getTable("default", "t") assert(hiveTable.storage.inputFormat === Some(inputFormat)) assert(hiveTable.storage.outputFormat === Some(outputFormat)) assert(hiveTable.storage.serde === Some(serde)) @@ -146,8 +144,7 @@ class DataSourceWithHiveMetastoreCatalogSuite |AS SELECT 1 AS d1, "val_1" AS d2 """.stripMargin) - val hiveTable = - sessionState.catalog.getTable(TableIdentifier("t", Some("default"))) + val hiveTable = sessionState.catalog.client.getTable("default", "t") assert(hiveTable.storage.inputFormat === Some(inputFormat)) assert(hiveTable.storage.outputFormat === Some(outputFormat)) assert(hiveTable.storage.serde === Some(serde)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala index c3b24623d1..0a31ac64a2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala @@ -32,16 +32,14 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft override def beforeAll(): Unit = { // The catalog in HiveContext is a case insensitive one. - sessionState.catalog.createTempTable( - "ListTablesSuiteTable", df.logicalPlan, ignoreIfExists = true) + sessionState.catalog.registerTable(TableIdentifier("ListTablesSuiteTable"), df.logicalPlan) sql("CREATE TABLE HiveListTablesSuiteTable (key int, value string)") sql("CREATE DATABASE IF NOT EXISTS ListTablesSuiteDB") sql("CREATE TABLE ListTablesSuiteDB.HiveInDBListTablesSuiteTable (key int, value string)") } override def afterAll(): Unit = { - sessionState.catalog.dropTable( - TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true) + sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) sql("DROP TABLE IF EXISTS HiveListTablesSuiteTable") sql("DROP TABLE IF EXISTS ListTablesSuiteDB.HiveInDBListTablesSuiteTable") sql("DROP DATABASE IF EXISTS ListTablesSuiteDB") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 7d2a4eb1de..3f3d0692b7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -693,13 +693,13 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv test("SPARK-6024 wide schema support") { withSQLConf(SQLConf.SCHEMA_STRING_LENGTH_THRESHOLD.key -> "4000") { withTable("wide_schema") { - withTempDir { tempDir => + withTempDir( tempDir => { // We will need 80 splits for this schema if the threshold is 4000. val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true))) // Manually create a metastore data source table. sessionState.catalog.createDataSourceTable( - name = TableIdentifier("wide_schema"), + tableIdent = TableIdentifier("wide_schema"), userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], bucketSpec = None, @@ -711,7 +711,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val actualSchema = table("wide_schema").schema assert(schema === actualSchema) - } + }) } } } @@ -737,7 +737,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv "spark.sql.sources.schema" -> schema.json, "EXTERNAL" -> "FALSE")) - hiveCatalog.createTable("default", hiveTable, ignoreIfExists = false) + sessionState.catalog.client.createTable(hiveTable, ignoreIfExists = false) invalidateTable(tableName) val actualSchema = table(tableName).schema @@ -752,7 +752,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv withTable(tableName) { df.write.format("parquet").partitionBy("d", "b").saveAsTable(tableName) invalidateTable(tableName) - val metastoreTable = hiveCatalog.getTable("default", tableName) + val metastoreTable = sessionState.catalog.client.getTable("default", tableName) val expectedPartitionColumns = StructType(df.schema("d") :: df.schema("b") :: Nil) val numPartCols = metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt @@ -787,7 +787,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv .sortBy("c") .saveAsTable(tableName) invalidateTable(tableName) - val metastoreTable = hiveCatalog.getTable("default", tableName) + val metastoreTable = sessionState.catalog.client.getTable("default", tableName) val expectedBucketByColumns = StructType(df.schema("d") :: df.schema("b") :: Nil) val expectedSortByColumns = StructType(df.schema("c") :: Nil) @@ -903,11 +903,11 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv test("skip hive metadata on table creation") { - withTempDir { tempPath => + withTempDir(tempPath => { val schema = StructType((1 to 5).map(i => StructField(s"c_$i", StringType))) sessionState.catalog.createDataSourceTable( - name = TableIdentifier("not_skip_hive_metadata"), + tableIdent = TableIdentifier("not_skip_hive_metadata"), userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], bucketSpec = None, @@ -917,11 +917,11 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // As a proxy for verifying that the table was stored in Hive compatible format, // we verify that each column of the table is of native type StringType. - assert(hiveCatalog.getTable("default", "not_skip_hive_metadata").schema + assert(sessionState.catalog.client.getTable("default", "not_skip_hive_metadata").schema .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == StringType)) sessionState.catalog.createDataSourceTable( - name = TableIdentifier("skip_hive_metadata"), + tableIdent = TableIdentifier("skip_hive_metadata"), userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], bucketSpec = None, @@ -929,11 +929,10 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv options = Map("path" -> tempPath.getCanonicalPath, "skipHiveMetadata" -> "true"), isExternal = false) - // As a proxy for verifying that the table was stored in SparkSQL format, - // we verify that the table has a column type as array of StringType. - assert(hiveCatalog.getTable("default", "skip_hive_metadata").schema.forall { c => - HiveMetastoreTypes.toDataType(c.dataType) == ArrayType(StringType) - }) - } + // As a proxy for verifying that the table was stored in SparkSQL format, we verify that + // the table has a column type as array of StringType. + assert(sessionState.catalog.client.getTable("default", "skip_hive_metadata").schema + .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == ArrayType(StringType))) + }) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala index 3be2269d3f..d275190744 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala @@ -25,8 +25,9 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle private lazy val df = sqlContext.range(10).coalesce(1).toDF() private def checkTablePath(dbName: String, tableName: String): Unit = { - val metastoreTable = hiveContext.hiveCatalog.getTable(dbName, tableName) - val expectedPath = hiveContext.hiveCatalog.getDatabase(dbName).locationUri + "/" + tableName + val metastoreTable = hiveContext.sessionState.catalog.client.getTable(dbName, tableName) + val expectedPath = + hiveContext.sessionState.catalog.client.getDatabase(dbName).locationUri + "/" + tableName assert(metastoreTable.storage.serdeProperties("path") === expectedPath) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index ae026ed496..151aacbdd1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -121,8 +121,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton { intercept[UnsupportedOperationException] { hiveContext.analyze("tempTable") } - hiveContext.sessionState.catalog.dropTable( - TableIdentifier("tempTable"), ignoreIfNotExists = true) + hiveContext.sessionState.catalog.unregisterTable(TableIdentifier("tempTable")) } test("estimates the size of a test MetastoreRelation") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index d59bca4c7e..295069228f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -171,6 +171,10 @@ class VersionsSuite extends SparkFunSuite with Logging { assert(client.listTables("default") === Seq("src")) } + test(s"$version: currentDatabase") { + assert(client.currentDatabase === "default") + } + test(s"$version: getDatabase") { client.getDatabase("default") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 197a123905..5fe85eaef2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -49,7 +49,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { import org.apache.spark.sql.hive.test.TestHive.implicits._ override def beforeAll() { - super.beforeAll() TestHive.cacheTables = true // Timezone is fixed to America/Los_Angeles for those timezone sensitive tests (timestamp_*) TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")) @@ -58,14 +57,11 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { } override def afterAll() { - try { - TestHive.cacheTables = false - TimeZone.setDefault(originalTimeZone) - Locale.setDefault(originalLocale) - sql("DROP TEMPORARY FUNCTION udtf_count2") - } finally { - super.afterAll() - } + TestHive.cacheTables = false + TimeZone.setDefault(originalTimeZone) + Locale.setDefault(originalLocale) + sql("DROP TEMPORARY FUNCTION udtf_count2") + super.afterAll() } test("SPARK-4908: concurrent hive native commands") { @@ -1213,7 +1209,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { sql("USE hive_test_db") assert("hive_test_db" == sql("select current_database()").first().getString(0)) - intercept[AnalysisException] { + intercept[NoSuchDatabaseException] { sql("USE not_existing_db") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 6199253d34..bc8896d4bd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1325,7 +1325,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { .format("parquet") .save(path) - // We don't support creating a temporary table while specifying a database val message = intercept[AnalysisException] { sqlContext.sql( s""" @@ -1336,8 +1335,9 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |) """.stripMargin) }.getMessage + assert(message.contains("Specifying database name or other qualifiers are not allowed")) - // If you use backticks to quote the name then it's OK. + // If you use backticks to quote the name of a temporary table having dot in it. sqlContext.sql( s""" |CREATE TEMPORARY TABLE `db.t` diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 92f424bac7..cc412241fb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -222,7 +222,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { sql("INSERT INTO TABLE t SELECT * FROM tmp") checkAnswer(table("t"), (data ++ data).map(Row.fromTuple)) } - sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true) + sessionState.catalog.unregisterTable(TableIdentifier("tmp")) } test("overwriting") { @@ -232,7 +232,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp") checkAnswer(table("t"), data.map(Row.fromTuple)) } - sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true) + sessionState.catalog.unregisterTable(TableIdentifier("tmp")) } test("self-join") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 07fe0ccd87..bb53179c3c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.hive import java.io.File import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.DataSourceScan import org.apache.spark.sql.execution.command.ExecutedCommand import org.apache.spark.sql.execution.datasources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation} @@ -426,9 +425,10 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { } test("Caching converted data source Parquet Relations") { - def checkCached(tableIdentifier: TableIdentifier): Unit = { + val _catalog = sessionState.catalog + def checkCached(tableIdentifier: _catalog.QualifiedTableName): Unit = { // Converted test_parquet should be cached. - sessionState.catalog.getCachedDataSourceTable(tableIdentifier) match { + sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null => fail("Converted test_parquet should be cached in the cache.") case logical @ LogicalRelation(parquetRelation: HadoopFsRelation, _, _) => // OK case other => @@ -453,17 +453,17 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' """.stripMargin) - var tableIdentifier = TableIdentifier("test_insert_parquet", Some("default")) + var tableIdentifier = _catalog.QualifiedTableName("default", "test_insert_parquet") // First, make sure the converted test_parquet is not cached. - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) // Table lookup will make the table cached. table("test_insert_parquet") checkCached(tableIdentifier) // For insert into non-partitioned table, we will do the conversion, // so the converted test_insert_parquet should be cached. invalidateTable("test_insert_parquet") - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) sql( """ |INSERT INTO TABLE test_insert_parquet @@ -476,7 +476,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { sql("select a, b from jt").collect()) // Invalidate the cache. invalidateTable("test_insert_parquet") - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) // Create a partitioned table. sql( @@ -493,8 +493,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' """.stripMargin) - tableIdentifier = TableIdentifier("test_parquet_partitioned_cache_test", Some("default")) - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + tableIdentifier = _catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test") + assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) sql( """ |INSERT INTO TABLE test_parquet_partitioned_cache_test @@ -503,14 +503,14 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin) // Right now, insert into a partitioned Parquet is not supported in data source Parquet. // So, we expect it is not cached. - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) sql( """ |INSERT INTO TABLE test_parquet_partitioned_cache_test |PARTITION (`date`='2015-04-02') |select a, b from jt """.stripMargin) - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) // Make sure we can cache the partitioned table. table("test_parquet_partitioned_cache_test") @@ -526,7 +526,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin).collect()) invalidateTable("test_parquet_partitioned_cache_test") - assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) + assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test") } |