aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-29 01:14:02 -0700
committerReynold Xin <rxin@databricks.com>2016-04-29 01:14:02 -0700
commit054f991c4350af1350af7a4109ee77f4a34822f0 (patch)
treeec40f69f6dae5ed63c7247027f47f0b2da9d49c7 /sql/hive
parent2057cbcb0bc9d5a4fb66006c42457a556d0bb277 (diff)
downloadspark-054f991c4350af1350af7a4109ee77f4a34822f0.tar.gz
spark-054f991c4350af1350af7a4109ee77f4a34822f0.tar.bz2
spark-054f991c4350af1350af7a4109ee77f4a34822f0.zip
[SPARK-14994][SQL] Remove execution hive from HiveSessionState
## What changes were proposed in this pull request? This patch removes executionHive from HiveSessionState and HiveSharedState. ## How was this patch tested? Updated test cases. Author: Reynold Xin <rxin@databricks.com> Author: Yin Huai <yhuai@databricks.com> Closes #12770 from rxin/SPARK-14994.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala34
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala7
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala67
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala27
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala40
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala32
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala45
9 files changed, 93 insertions, 181 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index f70131ec86..456587e0e0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -20,8 +20,6 @@ package org.apache.spark.sql.hive
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.exec.{UDAF, UDF}
import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry}
import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF}
@@ -46,8 +44,7 @@ private[sql] class HiveSessionCatalog(
sparkSession: SparkSession,
functionResourceLoader: FunctionResourceLoader,
functionRegistry: FunctionRegistry,
- conf: SQLConf,
- hiveconf: HiveConf)
+ conf: SQLConf)
extends SessionCatalog(externalCatalog, functionResourceLoader, functionRegistry, conf) {
override def setCurrentDatabase(db: String): Unit = {
@@ -73,11 +70,6 @@ private[sql] class HiveSessionCatalog(
// | Methods and fields for interacting with HiveMetastoreCatalog |
// ----------------------------------------------------------------
- override def getDefaultDBPath(db: String): String = {
- val defaultPath = hiveconf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE)
- new Path(new Path(defaultPath), db + ".db").toString
- }
-
// Catalog for handling data source tables. TODO: This really doesn't belong here since it is
// essentially a cache for metastore tables. However, it relies on a lot of session-specific
// things so it would be a lot of work to split its functionality between HiveSessionCatalog
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index e085094383..9608f0b4ef 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.hive
-import java.util.regex.Pattern
-
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
@@ -26,7 +24,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.execution.SparkPlanner
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl}
+import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.SessionState
@@ -43,11 +41,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
}
/**
- * A Hive client used for execution.
- */
- lazy val executionHive: HiveClientImpl = sharedState.executionHive.newSession()
-
- /**
* A Hive client used for interacting with the metastore.
*/
lazy val metadataHive: HiveClient = sharedState.metadataHive.newSession()
@@ -61,9 +54,20 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
* set in the SQLConf *as well as* in the HiveConf.
*/
lazy val hiveconf: HiveConf = {
- val c = executionHive.conf
- conf.setConf(c.getAllProperties)
- c
+ val initialConf = new HiveConf(
+ sparkSession.sparkContext.hadoopConfiguration,
+ classOf[org.apache.hadoop.hive.ql.session.SessionState])
+
+ // HiveConf is a Hadoop Configuration, which has a field of classLoader and
+ // the initial value will be the current thread's context class loader
+ // (i.e. initClassLoader at here).
+ // We call initialConf.setClassLoader(initClassLoader) at here to make
+ // this action explicit.
+ initialConf.setClassLoader(sparkSession.sharedState.jarClassLoader)
+ sparkSession.sparkContext.conf.getAll.foreach { case (k, v) =>
+ initialConf.set(k, v)
+ }
+ initialConf
}
setDefaultOverrideConfs()
@@ -78,8 +82,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
sparkSession,
functionResourceLoader,
functionRegistry,
- conf,
- hiveconf)
+ conf)
}
/**
@@ -141,16 +144,13 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
override def setConf(key: String, value: String): Unit = {
super.setConf(key, value)
- executionHive.runSqlHive(s"SET $key=$value")
metadataHive.runSqlHive(s"SET $key=$value")
hiveconf.set(key, value)
}
override def addJar(path: String): Unit = {
- super.addJar(path)
- executionHive.addJar(path)
metadataHive.addJar(path)
- Thread.currentThread().setContextClassLoader(executionHive.clientLoader.classLoader)
+ super.addJar(path)
}
/**
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
index fb1f59eed3..0ea5ce9196 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
@@ -32,13 +32,6 @@ private[hive] class HiveSharedState(override val sparkContext: SparkContext)
// TODO: just share the IsolatedClientLoader instead of the client instances themselves
/**
- * A Hive client used for execution.
- */
- val executionHive: HiveClientImpl = {
- HiveUtils.newClientForExecution(sparkContext.conf, sparkContext.hadoopConfiguration)
- }
-
- /**
* A Hive client used to interact with the metastore.
*/
// This needs to be a lazy val at here because TestHiveSharedState is overriding it.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index 0380d2342b..e1950d181d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -33,6 +33,7 @@ import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.hive.HiveUtils
+import org.apache.spark.sql.internal.NonClosableMutableURLClassLoader
import org.apache.spark.util.{MutableURLClassLoader, Utils}
/** Factory for `IsolatedClientLoader` with specific versions of hive. */
@@ -278,14 +279,3 @@ private[hive] class IsolatedClientLoader(
*/
private[hive] var cachedHive: Any = null
}
-
-/**
- * URL class loader that exposes the `addURL` and `getURLs` methods in URLClassLoader.
- * This class loader cannot be closed (its `close` method is a no-op).
- */
-private[sql] class NonClosableMutableURLClassLoader(
- parent: ClassLoader)
- extends MutableURLClassLoader(Array.empty, parent) {
-
- override def close(): Unit = {}
-}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index cba10caf98..73ccec2ee0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -17,11 +17,19 @@
package org.apache.spark.sql.hive.execution
+import java.io.IOException
+import java.net.URI
+import java.text.SimpleDateFormat
import java.util
+import java.util.{Date, Random}
import scala.collection.JavaConverters._
-import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hive.common.FileUtils
+import org.apache.hadoop.hive.ql.exec.TaskRunner
+import org.apache.hadoop.hive.ql.ErrorMsg
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
import org.apache.spark.rdd.RDD
@@ -46,6 +54,61 @@ case class InsertIntoHiveTable(
def output: Seq[Attribute] = Seq.empty
+ val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
+
+ private def executionId: String = {
+ val rand: Random = new Random
+ val format: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS")
+ val executionId: String = "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
+ return executionId
+ }
+
+ private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = {
+ val inputPathUri: URI = inputPath.toUri
+ val inputPathName: String = inputPathUri.getPath
+ val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
+ val stagingPathName: String =
+ if (inputPathName.indexOf(stagingDir) == -1) {
+ new Path(inputPathName, stagingDir).toString
+ } else {
+ inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length)
+ }
+ val dir: Path =
+ fs.makeQualified(
+ new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID))
+ logDebug("Created staging dir = " + dir + " for path = " + inputPath)
+ try {
+ if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
+ throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'")
+ }
+ fs.deleteOnExit(dir)
+ }
+ catch {
+ case e: IOException =>
+ throw new RuntimeException(
+ "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)
+
+ }
+ return dir
+ }
+
+ private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = {
+ getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf)
+ }
+
+ def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = {
+ val extURI: URI = path.toUri
+ if (extURI.getScheme == "viewfs") {
+ getExtTmpPathRelTo(path.getParent, hadoopConf)
+ } else {
+ new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000")
+ }
+ }
+
+ def getExtTmpPathRelTo(path: Path, hadoopConf: Configuration): Path = {
+ new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000
+ }
+
private def saveAsHiveFile(
rdd: RDD[InternalRow],
valueClass: Class[_],
@@ -81,7 +144,7 @@ case class InsertIntoHiveTable(
val tableDesc = table.tableDesc
val tableLocation = table.hiveQlTable.getDataLocation
val hadoopConf = sessionState.newHadoopConf()
- val tmpLocation = new Context(hadoopConf).getExternalTmpPath(tableLocation)
+ val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
val isCompressed =
sessionState.conf.getConfString("hive.exec.compress.output", "false").toBoolean
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index ddb72fb1e1..c4a3a74b9b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -136,7 +136,8 @@ private[hive] class TestHiveSparkSession(
}
@transient
- override lazy val sessionState: TestHiveSessionState = new TestHiveSessionState(self)
+ override lazy val sessionState: TestHiveSessionState =
+ new TestHiveSessionState(self, warehousePath)
override def newSession(): TestHiveSparkSession = {
new TestHiveSparkSession(
@@ -156,19 +157,8 @@ private[hive] class TestHiveSparkSession(
sessionState.hiveconf.set("hive.plan.serialization.format", "javaXML")
- // A snapshot of the entries in the starting SQLConf
- // We save this because tests can mutate this singleton object if they want
- // This snapshot is saved when we create this TestHiveSparkSession.
- val initialSQLConf: SQLConf = {
- val snapshot = new SQLConf
- sessionState.conf.getAllConfs.foreach { case (k, v) => snapshot.setConfString(k, v) }
- snapshot
- }
-
- val testTempDir = Utils.createTempDir()
-
// For some hive test case which contain ${system:test.tmp.dir}
- System.setProperty("test.tmp.dir", testTempDir.getCanonicalPath)
+ System.setProperty("test.tmp.dir", Utils.createTempDir().getCanonicalPath)
/** The location of the compiled hive distribution */
lazy val hiveHome = envVarToFile("HIVE_HOME")
@@ -521,8 +511,10 @@ private[hive] class TestHiveSharedState(
}
-private[hive] class TestHiveSessionState(sparkSession: TestHiveSparkSession)
- extends HiveSessionState(sparkSession) {
+private[hive] class TestHiveSessionState(
+ sparkSession: TestHiveSparkSession,
+ warehousePath: File)
+ extends HiveSessionState(sparkSession) { self =>
override lazy val conf: SQLConf = {
new SQLConf {
@@ -530,9 +522,8 @@ private[hive] class TestHiveSessionState(sparkSession: TestHiveSparkSession)
override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
override def clear(): Unit = {
super.clear()
- TestHiveContext.overrideConfs.map {
- case (key, value) => setConfString(key, value)
- }
+ TestHiveContext.overrideConfs.foreach { case (k, v) => setConfString(k, v) }
+ setConfString("hive.metastore.warehouse.dir", self.warehousePath.toURI.toString)
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala
deleted file mode 100644
index b2c0f7e0e5..0000000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.hive.test.TestHive
-
-
-class HiveContextSuite extends SparkFunSuite {
-
- test("HiveContext can access `spark.sql.*` configs") {
- // Avoid creating another SparkContext in the same JVM
- val sc = TestHive.sparkContext
- require(sc.conf.get("spark.sql.hive.metastore.barrierPrefixes") ==
- "org.apache.spark.sql.hive.execution.PairSerDe")
- assert(TestHive.sparkSession.initialSQLConf.getConfString(
- "spark.sql.hive.metastore.barrierPrefixes") ==
- "org.apache.spark.sql.hive.execution.PairSerDe")
- // This setting should be also set in the hiveconf of the current session.
- assert(TestHive.sessionState.hiveconf.get(
- "spark.sql.hive.metastore.barrierPrefixes", "") ==
- "org.apache.spark.sql.hive.execution.PairSerDe")
- }
-
-}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala
deleted file mode 100644
index ac3a65032f..0000000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.serializer.JavaSerializer
-
-class SerializationSuite extends SparkFunSuite {
-
- test("[SPARK-5840] HiveContext should be serializable") {
- val hiveContext = org.apache.spark.sql.hive.test.TestHive
- hiveContext.sessionState.hiveconf
- val serializer = new JavaSerializer(new SparkConf()).newInstance()
- val bytes = serializer.serialize(hiveContext)
- val deSer = serializer.deserialize[AnyRef](bytes)
- }
-}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index e5a7706cc5..3bf0e84267 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -1070,51 +1070,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
assert(getConf(testKey, "0") == "")
}
- test("SET commands semantics for a HiveContext") {
- // Adapted from its SQL counterpart.
- val testKey = "spark.sql.key.usedfortestonly"
- val testVal = "test.val.0"
- val nonexistentKey = "nonexistent"
- def collectResults(df: DataFrame): Set[Any] =
- df.collect().map {
- case Row(key: String, value: String) => key -> value
- case Row(key: String, defaultValue: String, doc: String) => (key, defaultValue, doc)
- }.toSet
- conf.clear()
-
- val expectedConfs = conf.getAllDefinedConfs.toSet
- assertResult(expectedConfs)(collectResults(sql("SET -v")))
-
- // "SET" itself returns all config variables currently specified in SQLConf.
- // TODO: Should we be listing the default here always? probably...
- assert(sql("SET").collect().size === TestHiveContext.overrideConfs.size)
-
- val defaults = collectResults(sql("SET"))
- assertResult(Set(testKey -> testVal)) {
- collectResults(sql(s"SET $testKey=$testVal"))
- }
-
- assert(sessionState.hiveconf.get(testKey, "") === testVal)
- assertResult(defaults ++ Set(testKey -> testVal))(collectResults(sql("SET")))
-
- sql(s"SET ${testKey + testKey}=${testVal + testVal}")
- assert(sessionState.hiveconf.get(testKey + testKey, "") == testVal + testVal)
- assertResult(defaults ++ Set(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) {
- collectResults(sql("SET"))
- }
-
- // "SET key"
- assertResult(Set(testKey -> testVal)) {
- collectResults(sql(s"SET $testKey"))
- }
-
- assertResult(Set(nonexistentKey -> "<undefined>")) {
- collectResults(sql(s"SET $nonexistentKey"))
- }
-
- conf.clear()
- }
-
test("current_database with multiple sessions") {
sql("create database a")
sql("use a")