aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-04-18 14:28:47 -0700
committerYin Huai <yhuai@databricks.com>2016-04-18 14:28:47 -0700
commitf1a11976db5cd7a4f2c5467ef784b6755f84260c (patch)
treef45c448acdd112afbcd79cb5b93a2d0d4becffef
parent8bd8121329cb1bb137e935dec124aa23f0fcf8c5 (diff)
downloadspark-f1a11976db5cd7a4f2c5467ef784b6755f84260c.tar.gz
spark-f1a11976db5cd7a4f2c5467ef784b6755f84260c.tar.bz2
spark-f1a11976db5cd7a4f2c5467ef784b6755f84260c.zip
[SPARK-14674][SQL] Move HiveContext.hiveconf to HiveSessionState
## What changes were proposed in this pull request? This is just cleanup. This allows us to remove HiveContext later without inflating the diff too much. This PR fixes the conflicts of https://github.com/apache/spark/pull/12431. It also removes the `def hiveConf` from `HiveSqlParser`. So, we will pass the HiveConf associated with a session explicitly instead of relying on Hive's `SessionState` to pass `HiveConf`. ## How was this patch tested? Existing tests. Closes #12431 Author: Andrew Or <andrew@databricks.com> Author: Yin Huai <yhuai@databricks.com> Closes #12449 from yhuai/hiveconf.
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala4
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala5
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala19
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala18
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala22
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala18
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala10
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala4
18 files changed, 59 insertions, 71 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index ee0d23a6e5..6703cdbac3 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -55,7 +55,7 @@ object HiveThriftServer2 extends Logging {
@DeveloperApi
def startWithContext(sqlContext: HiveContext): Unit = {
val server = new HiveThriftServer2(sqlContext)
- server.init(sqlContext.hiveconf)
+ server.init(sqlContext.sessionState.hiveconf)
server.start()
listener = new HiveThriftServer2Listener(server, sqlContext.conf)
sqlContext.sparkContext.addSparkListener(listener)
@@ -83,7 +83,7 @@ object HiveThriftServer2 extends Logging {
try {
val server = new HiveThriftServer2(SparkSQLEnv.hiveContext)
- server.init(SparkSQLEnv.hiveContext.hiveconf)
+ server.init(SparkSQLEnv.hiveContext.sessionState.hiveconf)
server.start()
logInfo("HiveThriftServer2 started")
listener = new HiveThriftServer2Listener(server, SparkSQLEnv.hiveContext.conf)
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
index 2594c5bfdb..ae1d737b58 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
@@ -65,9 +65,8 @@ private[hive] object SparkSQLEnv extends Logging {
hiveContext.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)
if (log.isDebugEnabled) {
- hiveContext.hiveconf.getAllProperties.asScala.toSeq.sorted.foreach { case (k, v) =>
- logDebug(s"HiveConf var: $k=$v")
- }
+ hiveContext.sessionState.hiveconf.getAllProperties.asScala.toSeq.sorted
+ .foreach { case (k, v) => logDebug(s"HiveConf var: $k=$v") }
}
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 71ef99a6a9..b26a9ab699 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -153,7 +153,7 @@ class HiveContext private[hive](
protected[sql] override def parseSql(sql: String): LogicalPlan = {
executionHive.withHiveState {
- super.parseSql(substitutor.substitute(hiveconf, sql))
+ super.parseSql(substitutor.substitute(sessionState.hiveconf, sql))
}
}
@@ -200,28 +200,13 @@ class HiveContext private[hive](
// Also, calling hiveconf will create a default session containing a HiveConf, which
// will interfer with the creation of executionHive (which is a lazy val). So,
// we put hiveconf.set at the end of this method.
- hiveconf.set(key, value)
+ sessionState.hiveconf.set(key, value)
}
override private[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
setConf(entry.key, entry.stringConverter(value))
}
- /**
- * SQLConf and HiveConf contracts:
- *
- * 1. create a new o.a.h.hive.ql.session.SessionState for each HiveContext
- * 2. when the Hive session is first initialized, params in HiveConf will get picked up by the
- * SQLConf. Additionally, any properties set by set() or a SET command inside sql() will be
- * set in the SQLConf *as well as* in the HiveConf.
- */
- @transient
- protected[hive] lazy val hiveconf: HiveConf = {
- val c = executionHive.conf
- setConf(c.getAllProperties)
- c
- }
-
private def functionOrMacroDDLPattern(command: String) = Pattern.compile(
".*(create|drop)\\s+(temporary\\s+)?(function|macro).+", Pattern.DOTALL).matcher(command)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index ccc8345d73..697cf719c1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -299,7 +299,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
CatalogTableType.MANAGED_TABLE
}
- val maybeSerDe = HiveSerDe.sourceToSerDe(provider, hive.hiveconf)
+ val maybeSerDe = HiveSerDe.sourceToSerDe(provider, hive.sessionState.hiveconf)
val dataSource =
DataSource(
hive,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 0cccc22e5a..f91393fc76 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -75,7 +75,7 @@ private[sql] class HiveSessionCatalog(
// ----------------------------------------------------------------
override def getDefaultDBPath(db: String): String = {
- val defaultPath = context.hiveconf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE)
+ val defaultPath = context.sessionState.hiveconf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE)
new Path(new Path(defaultPath), db + ".db").toString
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index bc28b55d06..2b848524f3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.hive
+import org.apache.hadoop.hive.conf.HiveConf
+
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.parser.ParserInterface
@@ -33,6 +35,20 @@ import org.apache.spark.sql.internal.{SessionState, SQLConf}
private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) {
/**
+ * SQLConf and HiveConf contracts:
+ *
+ * 1. create a new o.a.h.hive.ql.session.SessionState for each [[HiveContext]]
+ * 2. when the Hive session is first initialized, params in HiveConf will get picked up by the
+ * SQLConf. Additionally, any properties set by set() or a SET command inside sql() will be
+ * set in the SQLConf *as well as* in the HiveConf.
+ */
+ lazy val hiveconf: HiveConf = {
+ val c = ctx.executionHive.conf
+ ctx.setConf(c.getAllProperties)
+ c
+ }
+
+ /**
* A Hive client used for execution.
*/
val executionHive: HiveClientImpl = ctx.hiveSharedState.executionHive.newSession()
@@ -80,7 +96,7 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx)
/**
* Parser for HiveQl query texts.
*/
- override lazy val sqlParser: ParserInterface = HiveSqlParser
+ override lazy val sqlParser: ParserInterface = new HiveSqlParser(hiveconf)
/**
* Planner that takes into account Hive-specific strategies.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 54afe9c2a3..98a427380d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -72,7 +72,9 @@ class HadoopTableReader(
private val _minSplitsPerRDD = if (sc.sparkContext.isLocal) {
0 // will splitted based on block by default.
} else {
- math.max(sc.hiveconf.getInt("mapred.map.tasks", 1), sc.sparkContext.defaultMinPartitions)
+ math.max(
+ sc.sessionState.hiveconf.getInt("mapred.map.tasks", 1),
+ sc.sparkContext.defaultMinPartitions)
}
SparkHadoopUtil.get.appendS3AndSparkHadoopConfigurations(sc.sparkContext.conf, hiveExtraConf)
@@ -162,7 +164,7 @@ class HadoopTableReader(
case (partition, partDeserializer) =>
def updateExistPathSetByPathPattern(pathPatternStr: String) {
val pathPattern = new Path(pathPatternStr)
- val fs = pathPattern.getFileSystem(sc.hiveconf)
+ val fs = pathPattern.getFileSystem(sc.sessionState.hiveconf)
val matches = fs.globStatus(pathPattern)
matches.foreach(fileStatus => existPathSet += fileStatus.getPath.toString)
}
@@ -259,7 +261,7 @@ class HadoopTableReader(
private def applyFilterIfNeeded(path: Path, filterOpt: Option[PathFilter]): String = {
filterOpt match {
case Some(filter) =>
- val fs = path.getFileSystem(sc.hiveconf)
+ val fs = path.getFileSystem(sc.sessionState.hiveconf)
val filteredFiles = fs.listStatus(path, filter).map(_.getPath.toString)
filteredFiles.mkString(",")
case None => path.toString
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
index a97b65e27b..d5d3ee43d7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
@@ -39,8 +39,8 @@ import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
/**
* Concrete parser for HiveQl statements.
*/
-object HiveSqlParser extends AbstractSqlParser {
- val astBuilder = new HiveSqlAstBuilder
+class HiveSqlParser(hiveConf: HiveConf) extends AbstractSqlParser {
+ val astBuilder = new HiveSqlAstBuilder(hiveConf)
override protected def nativeCommand(sqlText: String): LogicalPlan = {
HiveNativeCommand(sqlText)
@@ -50,26 +50,10 @@ object HiveSqlParser extends AbstractSqlParser {
/**
* Builder that converts an ANTLR ParseTree into a LogicalPlan/Expression/TableIdentifier.
*/
-class HiveSqlAstBuilder extends SparkSqlAstBuilder {
+class HiveSqlAstBuilder(hiveConf: HiveConf) extends SparkSqlAstBuilder {
import ParserUtils._
/**
- * Get the current Hive Configuration.
- */
- private[this] def hiveConf: HiveConf = {
- var ss = SessionState.get()
- // SessionState is lazy initialization, it can be null here
- if (ss == null) {
- val original = Thread.currentThread().getContextClassLoader
- val conf = new HiveConf(classOf[SessionState])
- conf.setClassLoader(original)
- ss = new SessionState(conf)
- SessionState.start(ss)
- }
- ss.getConf
- }
-
- /**
* Pass a command to Hive using a [[HiveNativeCommand]].
*/
override def visitExecuteNativeCommand(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
index 235b80b7c6..3c46b836dc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
@@ -75,7 +75,7 @@ case class HiveTableScan(
// Create a local copy of hiveconf,so that scan specific modifications should not impact
// other queries
@transient
- private[this] val hiveExtraConf = new HiveConf(context.hiveconf)
+ private[this] val hiveExtraConf = new HiveConf(context.sessionState.hiveconf)
// append columns ids and names before broadcast
addColumnMetadataToConf(hiveExtraConf)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 430fa4616f..ed538630d2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -44,7 +44,7 @@ case class InsertIntoHiveTable(
ifNotExists: Boolean) extends UnaryNode {
@transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
- @transient private lazy val hiveContext = new Context(sc.hiveconf)
+ @transient private lazy val hiveContext = new Context(sc.sessionState.hiveconf)
@transient private lazy val client = sc.metadataHive
def output: Seq[Attribute] = Seq.empty
@@ -86,17 +86,17 @@ case class InsertIntoHiveTable(
val tableLocation = table.hiveQlTable.getDataLocation
val tmpLocation = hiveContext.getExternalTmpPath(tableLocation)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
- val isCompressed = sc.hiveconf.getBoolean(
+ val isCompressed = sc.sessionState.hiveconf.getBoolean(
ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal)
if (isCompressed) {
// Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
// and "mapred.output.compression.type" have no impact on ORC because it uses table properties
// to store compression information.
- sc.hiveconf.set("mapred.output.compress", "true")
+ sc.sessionState.hiveconf.set("mapred.output.compress", "true")
fileSinkConf.setCompressed(true)
- fileSinkConf.setCompressCodec(sc.hiveconf.get("mapred.output.compression.codec"))
- fileSinkConf.setCompressType(sc.hiveconf.get("mapred.output.compression.type"))
+ fileSinkConf.setCompressCodec(sc.sessionState.hiveconf.get("mapred.output.compression.codec"))
+ fileSinkConf.setCompressType(sc.sessionState.hiveconf.get("mapred.output.compression.type"))
}
val numDynamicPartitions = partition.values.count(_.isEmpty)
@@ -113,13 +113,13 @@ case class InsertIntoHiveTable(
// Validate partition spec if there exist any dynamic partitions
if (numDynamicPartitions > 0) {
// Report error if dynamic partitioning is not enabled
- if (!sc.hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) {
+ if (!sc.sessionState.hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) {
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
}
// Report error if dynamic partition strict mode is on but no static partition is found
- if (numStaticPartitions == 0 &&
- sc.hiveconf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) {
+ if (numStaticPartitions == 0 && sc.sessionState.hiveconf.getVar(
+ HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) {
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
}
@@ -130,7 +130,7 @@ case class InsertIntoHiveTable(
}
}
- val jobConf = new JobConf(sc.hiveconf)
+ val jobConf = new JobConf(sc.sessionState.hiveconf)
val jobConfSer = new SerializableJobConf(jobConf)
// When speculation is on and output committer class name contains "Direct", we should warn
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index 3566526561..ea48b0e5c2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -64,7 +64,7 @@ case class ScriptTransformation(
override def producedAttributes: AttributeSet = outputSet -- inputSet
- private val serializedHiveConf = new SerializableConfiguration(sc.hiveconf)
+ private val serializedHiveConf = new SerializableConfiguration(sc.sessionState.hiveconf)
protected override def doExecute(): RDD[InternalRow] = {
def processIterator(inputIterator: Iterator[InternalRow]): Iterator[InternalRow] = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 0c06608ff9..5ef502afa5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -105,7 +105,8 @@ case class AnalyzeTable(tableName: String) extends RunnableCommand {
Option(tableParameters.get(StatsSetupConst.TOTAL_SIZE))
.map(_.toLong)
.getOrElse(0L)
- val newTotalSize = getFileSizeForTable(hiveContext.hiveconf, relation.hiveQlTable)
+ val newTotalSize =
+ getFileSizeForTable(hiveContext.sessionState.hiveconf, relation.hiveQlTable)
// Update the Hive metastore if the total size of the table is different than the size
// recorded in the Hive metastore.
// This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index d56d36fe32..2767528395 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -112,9 +112,9 @@ class TestHiveContext private[hive](
// By clearing the port we force Spark to pick a new one. This allows us to rerun tests
// without restarting the JVM.
System.clearProperty("spark.hostPort")
- CommandProcessorFactory.clean(hiveconf)
+ CommandProcessorFactory.clean(sessionState.hiveconf)
- hiveconf.set("hive.plan.serialization.format", "javaXML")
+ sessionState.hiveconf.set("hive.plan.serialization.format", "javaXML")
// A snapshot of the entries in the starting SQLConf
// We save this because tests can mutate this singleton object if they want
@@ -136,7 +136,7 @@ class TestHiveContext private[hive](
// Override so we can intercept relative paths and rewrite them to point at hive.
override def runSqlHive(sql: String): Seq[String] =
- super.runSqlHive(rewritePaths(substitutor.substitute(this.hiveconf, sql)))
+ super.runSqlHive(rewritePaths(substitutor.substitute(sessionState.hiveconf, sql)))
override def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution(plan)
@@ -461,7 +461,7 @@ class TestHiveContext private[hive](
foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) }
// Some tests corrupt this value on purpose, which breaks the RESET call below.
- hiveconf.set("fs.default.name", new File(".").toURI.toString)
+ sessionState.hiveconf.set("fs.default.name", new File(".").toURI.toString)
// It is important that we RESET first as broken hooks that might have been set could break
// other sql exec here.
executionHive.runSqlHive("RESET")
@@ -476,7 +476,7 @@ class TestHiveContext private[hive](
// In case a test changed any of these values, restore all the original ones here.
TestHiveContext.hiveClientConfigurations(
- hiveconf, warehousePath, scratchDirPath, metastoreTemporaryConf)
+ sessionState.hiveconf, warehousePath, scratchDirPath, metastoreTemporaryConf)
.foreach { case (k, v) => metadataHive.runSqlHive(s"SET $k=$v") }
defaultOverrides()
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala
index d9664680f4..b9e7a36b41 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala
@@ -131,7 +131,7 @@ class ErrorPositionSuite extends QueryTest with TestHiveSingleton with BeforeAnd
* @param token a unique token in the string that should be indicated by the exception
*/
def positionTest(name: String, query: String, token: String): Unit = {
- def ast = HiveSqlParser.parsePlan(query)
+ def ast = hiveContext.sessionState.sqlParser.parsePlan(query)
def parseTree = Try(quietly(ast.treeString)).getOrElse("<failed to parse>")
test(name) {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 110c6d19d8..484cf528e6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -30,10 +30,11 @@ import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation}
import org.apache.spark.sql.execution.command.{CreateTable, CreateTableLike}
-import org.apache.spark.sql.hive.execution.{HiveNativeCommand, HiveSqlParser}
+import org.apache.spark.sql.hive.execution.HiveNativeCommand
+import org.apache.spark.sql.hive.test.TestHive
class HiveDDLCommandSuite extends PlanTest {
- val parser = HiveSqlParser
+ val parser = TestHive.sessionState.sqlParser
private def extractTableDesc(sql: String): (CatalogTable, Boolean) = {
parser.parsePlan(sql).collect {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala
index 93dcb10f7a..ac3a65032f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala
@@ -24,7 +24,7 @@ class SerializationSuite extends SparkFunSuite {
test("[SPARK-5840] HiveContext should be serializable") {
val hiveContext = org.apache.spark.sql.hive.test.TestHive
- hiveContext.hiveconf
+ hiveContext.sessionState.hiveconf
val serializer = new JavaSerializer(new SparkConf()).newInstance()
val bytes = serializer.serialize(hiveContext)
val deSer = serializer.deserialize[AnyRef](bytes)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 05318f51af..d14c72b34b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -31,7 +31,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
test("parse analyze commands") {
def assertAnalyzeCommand(analyzeCommand: String, c: Class[_]) {
- val parsed = HiveSqlParser.parsePlan(analyzeCommand)
+ val parsed = hiveContext.sessionState.sqlParser.parsePlan(analyzeCommand)
val operators = parsed.collect {
case a: AnalyzeTable => a
case o => o
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index af73baa1f3..2e7a1d921b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -1157,11 +1157,11 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
collectResults(sql(s"SET $testKey=$testVal"))
}
- assert(hiveconf.get(testKey, "") === testVal)
+ assert(sessionState.hiveconf.get(testKey, "") === testVal)
assertResult(defaults ++ Set(testKey -> testVal))(collectResults(sql("SET")))
sql(s"SET ${testKey + testKey}=${testVal + testVal}")
- assert(hiveconf.get(testKey + testKey, "") == testVal + testVal)
+ assert(sessionState.hiveconf.get(testKey + testKey, "") == testVal + testVal)
assertResult(defaults ++ Set(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) {
collectResults(sql("SET"))
}