diff options
author | Reynold Xin <rxin@databricks.com> | 2016-04-29 01:14:02 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-29 01:14:02 -0700 |
commit | 054f991c4350af1350af7a4109ee77f4a34822f0 (patch) | |
tree | ec40f69f6dae5ed63c7247027f47f0b2da9d49c7 /sql/hive | |
parent | 2057cbcb0bc9d5a4fb66006c42457a556d0bb277 (diff) | |
download | spark-054f991c4350af1350af7a4109ee77f4a34822f0.tar.gz spark-054f991c4350af1350af7a4109ee77f4a34822f0.tar.bz2 spark-054f991c4350af1350af7a4109ee77f4a34822f0.zip |
[SPARK-14994][SQL] Remove execution hive from HiveSessionState
## What changes were proposed in this pull request?
This patch removes executionHive from HiveSessionState and HiveSharedState.
## How was this patch tested?
Updated test cases.
Author: Reynold Xin <rxin@databricks.com>
Author: Yin Huai <yhuai@databricks.com>
Closes #12770 from rxin/SPARK-14994.
Diffstat (limited to 'sql/hive')
9 files changed, 93 insertions, 181 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index f70131ec86..456587e0e0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -20,8 +20,6 @@ package org.apache.spark.sql.hive import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal -import org.apache.hadoop.fs.Path -import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.exec.{UDAF, UDF} import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry} import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF} @@ -46,8 +44,7 @@ private[sql] class HiveSessionCatalog( sparkSession: SparkSession, functionResourceLoader: FunctionResourceLoader, functionRegistry: FunctionRegistry, - conf: SQLConf, - hiveconf: HiveConf) + conf: SQLConf) extends SessionCatalog(externalCatalog, functionResourceLoader, functionRegistry, conf) { override def setCurrentDatabase(db: String): Unit = { @@ -73,11 +70,6 @@ private[sql] class HiveSessionCatalog( // | Methods and fields for interacting with HiveMetastoreCatalog | // ---------------------------------------------------------------- - override def getDefaultDBPath(db: String): String = { - val defaultPath = hiveconf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE) - new Path(new Path(defaultPath), db + ".db").toString - } - // Catalog for handling data source tables. TODO: This really doesn't belong here since it is // essentially a cache for metastore tables. However, it relies on a lot of session-specific // things so it would be a lot of work to split its functionality between HiveSessionCatalog diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index e085094383..9608f0b4ef 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.hive -import java.util.regex.Pattern - import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars @@ -26,7 +24,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.Analyzer import org.apache.spark.sql.execution.SparkPlanner import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl} +import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.internal.SessionState @@ -43,11 +41,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) } /** - * A Hive client used for execution. - */ - lazy val executionHive: HiveClientImpl = sharedState.executionHive.newSession() - - /** * A Hive client used for interacting with the metastore. */ lazy val metadataHive: HiveClient = sharedState.metadataHive.newSession() @@ -61,9 +54,20 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) * set in the SQLConf *as well as* in the HiveConf. */ lazy val hiveconf: HiveConf = { - val c = executionHive.conf - conf.setConf(c.getAllProperties) - c + val initialConf = new HiveConf( + sparkSession.sparkContext.hadoopConfiguration, + classOf[org.apache.hadoop.hive.ql.session.SessionState]) + + // HiveConf is a Hadoop Configuration, which has a field of classLoader and + // the initial value will be the current thread's context class loader + // (i.e. initClassLoader at here). + // We call initialConf.setClassLoader(initClassLoader) at here to make + // this action explicit. + initialConf.setClassLoader(sparkSession.sharedState.jarClassLoader) + sparkSession.sparkContext.conf.getAll.foreach { case (k, v) => + initialConf.set(k, v) + } + initialConf } setDefaultOverrideConfs() @@ -78,8 +82,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) sparkSession, functionResourceLoader, functionRegistry, - conf, - hiveconf) + conf) } /** @@ -141,16 +144,13 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) override def setConf(key: String, value: String): Unit = { super.setConf(key, value) - executionHive.runSqlHive(s"SET $key=$value") metadataHive.runSqlHive(s"SET $key=$value") hiveconf.set(key, value) } override def addJar(path: String): Unit = { - super.addJar(path) - executionHive.addJar(path) metadataHive.addJar(path) - Thread.currentThread().setContextClassLoader(executionHive.clientLoader.classLoader) + super.addJar(path) } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala index fb1f59eed3..0ea5ce9196 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala @@ -32,13 +32,6 @@ private[hive] class HiveSharedState(override val sparkContext: SparkContext) // TODO: just share the IsolatedClientLoader instead of the client instances themselves /** - * A Hive client used for execution. - */ - val executionHive: HiveClientImpl = { - HiveUtils.newClientForExecution(sparkContext.conf, sparkContext.hadoopConfiguration) - } - - /** * A Hive client used to interact with the metastore. */ // This needs to be a lazy val at here because TestHiveSharedState is overriding it. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 0380d2342b..e1950d181d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -33,6 +33,7 @@ import org.apache.spark.deploy.SparkSubmitUtils import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.hive.HiveUtils +import org.apache.spark.sql.internal.NonClosableMutableURLClassLoader import org.apache.spark.util.{MutableURLClassLoader, Utils} /** Factory for `IsolatedClientLoader` with specific versions of hive. */ @@ -278,14 +279,3 @@ private[hive] class IsolatedClientLoader( */ private[hive] var cachedHive: Any = null } - -/** - * URL class loader that exposes the `addURL` and `getURLs` methods in URLClassLoader. - * This class loader cannot be closed (its `close` method is a no-op). - */ -private[sql] class NonClosableMutableURLClassLoader( - parent: ClassLoader) - extends MutableURLClassLoader(Array.empty, parent) { - - override def close(): Unit = {} -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index cba10caf98..73ccec2ee0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -17,11 +17,19 @@ package org.apache.spark.sql.hive.execution +import java.io.IOException +import java.net.URI +import java.text.SimpleDateFormat import java.util +import java.util.{Date, Random} import scala.collection.JavaConverters._ -import org.apache.hadoop.hive.ql.{Context, ErrorMsg} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hive.common.FileUtils +import org.apache.hadoop.hive.ql.exec.TaskRunner +import org.apache.hadoop.hive.ql.ErrorMsg import org.apache.hadoop.mapred.{FileOutputFormat, JobConf} import org.apache.spark.rdd.RDD @@ -46,6 +54,61 @@ case class InsertIntoHiveTable( def output: Seq[Attribute] = Seq.empty + val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") + + private def executionId: String = { + val rand: Random = new Random + val format: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS") + val executionId: String = "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong) + return executionId + } + + private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = { + val inputPathUri: URI = inputPath.toUri + val inputPathName: String = inputPathUri.getPath + val fs: FileSystem = inputPath.getFileSystem(hadoopConf) + val stagingPathName: String = + if (inputPathName.indexOf(stagingDir) == -1) { + new Path(inputPathName, stagingDir).toString + } else { + inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length) + } + val dir: Path = + fs.makeQualified( + new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID)) + logDebug("Created staging dir = " + dir + " for path = " + inputPath) + try { + if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { + throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") + } + fs.deleteOnExit(dir) + } + catch { + case e: IOException => + throw new RuntimeException( + "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e) + + } + return dir + } + + private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = { + getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf) + } + + def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = { + val extURI: URI = path.toUri + if (extURI.getScheme == "viewfs") { + getExtTmpPathRelTo(path.getParent, hadoopConf) + } else { + new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000") + } + } + + def getExtTmpPathRelTo(path: Path, hadoopConf: Configuration): Path = { + new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000 + } + private def saveAsHiveFile( rdd: RDD[InternalRow], valueClass: Class[_], @@ -81,7 +144,7 @@ case class InsertIntoHiveTable( val tableDesc = table.tableDesc val tableLocation = table.hiveQlTable.getDataLocation val hadoopConf = sessionState.newHadoopConf() - val tmpLocation = new Context(hadoopConf).getExternalTmpPath(tableLocation) + val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) val isCompressed = sessionState.conf.getConfString("hive.exec.compress.output", "false").toBoolean diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index ddb72fb1e1..c4a3a74b9b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -136,7 +136,8 @@ private[hive] class TestHiveSparkSession( } @transient - override lazy val sessionState: TestHiveSessionState = new TestHiveSessionState(self) + override lazy val sessionState: TestHiveSessionState = + new TestHiveSessionState(self, warehousePath) override def newSession(): TestHiveSparkSession = { new TestHiveSparkSession( @@ -156,19 +157,8 @@ private[hive] class TestHiveSparkSession( sessionState.hiveconf.set("hive.plan.serialization.format", "javaXML") - // A snapshot of the entries in the starting SQLConf - // We save this because tests can mutate this singleton object if they want - // This snapshot is saved when we create this TestHiveSparkSession. - val initialSQLConf: SQLConf = { - val snapshot = new SQLConf - sessionState.conf.getAllConfs.foreach { case (k, v) => snapshot.setConfString(k, v) } - snapshot - } - - val testTempDir = Utils.createTempDir() - // For some hive test case which contain ${system:test.tmp.dir} - System.setProperty("test.tmp.dir", testTempDir.getCanonicalPath) + System.setProperty("test.tmp.dir", Utils.createTempDir().getCanonicalPath) /** The location of the compiled hive distribution */ lazy val hiveHome = envVarToFile("HIVE_HOME") @@ -521,8 +511,10 @@ private[hive] class TestHiveSharedState( } -private[hive] class TestHiveSessionState(sparkSession: TestHiveSparkSession) - extends HiveSessionState(sparkSession) { +private[hive] class TestHiveSessionState( + sparkSession: TestHiveSparkSession, + warehousePath: File) + extends HiveSessionState(sparkSession) { self => override lazy val conf: SQLConf = { new SQLConf { @@ -530,9 +522,8 @@ private[hive] class TestHiveSessionState(sparkSession: TestHiveSparkSession) override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) override def clear(): Unit = { super.clear() - TestHiveContext.overrideConfs.map { - case (key, value) => setConfString(key, value) - } + TestHiveContext.overrideConfs.foreach { case (k, v) => setConfString(k, v) } + setConfString("hive.metastore.warehouse.dir", self.warehousePath.toURI.toString) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala deleted file mode 100644 index b2c0f7e0e5..0000000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.spark.sql.hive - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.hive.test.TestHive - - -class HiveContextSuite extends SparkFunSuite { - - test("HiveContext can access `spark.sql.*` configs") { - // Avoid creating another SparkContext in the same JVM - val sc = TestHive.sparkContext - require(sc.conf.get("spark.sql.hive.metastore.barrierPrefixes") == - "org.apache.spark.sql.hive.execution.PairSerDe") - assert(TestHive.sparkSession.initialSQLConf.getConfString( - "spark.sql.hive.metastore.barrierPrefixes") == - "org.apache.spark.sql.hive.execution.PairSerDe") - // This setting should be also set in the hiveconf of the current session. - assert(TestHive.sessionState.hiveconf.get( - "spark.sql.hive.metastore.barrierPrefixes", "") == - "org.apache.spark.sql.hive.execution.PairSerDe") - } - -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala deleted file mode 100644 index ac3a65032f..0000000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.serializer.JavaSerializer - -class SerializationSuite extends SparkFunSuite { - - test("[SPARK-5840] HiveContext should be serializable") { - val hiveContext = org.apache.spark.sql.hive.test.TestHive - hiveContext.sessionState.hiveconf - val serializer = new JavaSerializer(new SparkConf()).newInstance() - val bytes = serializer.serialize(hiveContext) - val deSer = serializer.deserialize[AnyRef](bytes) - } -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index e5a7706cc5..3bf0e84267 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -1070,51 +1070,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { assert(getConf(testKey, "0") == "") } - test("SET commands semantics for a HiveContext") { - // Adapted from its SQL counterpart. - val testKey = "spark.sql.key.usedfortestonly" - val testVal = "test.val.0" - val nonexistentKey = "nonexistent" - def collectResults(df: DataFrame): Set[Any] = - df.collect().map { - case Row(key: String, value: String) => key -> value - case Row(key: String, defaultValue: String, doc: String) => (key, defaultValue, doc) - }.toSet - conf.clear() - - val expectedConfs = conf.getAllDefinedConfs.toSet - assertResult(expectedConfs)(collectResults(sql("SET -v"))) - - // "SET" itself returns all config variables currently specified in SQLConf. - // TODO: Should we be listing the default here always? probably... - assert(sql("SET").collect().size === TestHiveContext.overrideConfs.size) - - val defaults = collectResults(sql("SET")) - assertResult(Set(testKey -> testVal)) { - collectResults(sql(s"SET $testKey=$testVal")) - } - - assert(sessionState.hiveconf.get(testKey, "") === testVal) - assertResult(defaults ++ Set(testKey -> testVal))(collectResults(sql("SET"))) - - sql(s"SET ${testKey + testKey}=${testVal + testVal}") - assert(sessionState.hiveconf.get(testKey + testKey, "") == testVal + testVal) - assertResult(defaults ++ Set(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) { - collectResults(sql("SET")) - } - - // "SET key" - assertResult(Set(testKey -> testVal)) { - collectResults(sql(s"SET $testKey")) - } - - assertResult(Set(nonexistentKey -> "<undefined>")) { - collectResults(sql(s"SET $nonexistentKey")) - } - - conf.clear() - } - test("current_database with multiple sessions") { sql("create database a") sql("use a") |