aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala4
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala5
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala19
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala18
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala22
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala18
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala10
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala4
18 files changed, 59 insertions, 71 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index ee0d23a6e5..6703cdbac3 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -55,7 +55,7 @@ object HiveThriftServer2 extends Logging {
@DeveloperApi
def startWithContext(sqlContext: HiveContext): Unit = {
val server = new HiveThriftServer2(sqlContext)
- server.init(sqlContext.hiveconf)
+ server.init(sqlContext.sessionState.hiveconf)
server.start()
listener = new HiveThriftServer2Listener(server, sqlContext.conf)
sqlContext.sparkContext.addSparkListener(listener)
@@ -83,7 +83,7 @@ object HiveThriftServer2 extends Logging {
try {
val server = new HiveThriftServer2(SparkSQLEnv.hiveContext)
- server.init(SparkSQLEnv.hiveContext.hiveconf)
+ server.init(SparkSQLEnv.hiveContext.sessionState.hiveconf)
server.start()
logInfo("HiveThriftServer2 started")
listener = new HiveThriftServer2Listener(server, SparkSQLEnv.hiveContext.conf)
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
index 2594c5bfdb..ae1d737b58 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
@@ -65,9 +65,8 @@ private[hive] object SparkSQLEnv extends Logging {
hiveContext.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)
if (log.isDebugEnabled) {
- hiveContext.hiveconf.getAllProperties.asScala.toSeq.sorted.foreach { case (k, v) =>
- logDebug(s"HiveConf var: $k=$v")
- }
+ hiveContext.sessionState.hiveconf.getAllProperties.asScala.toSeq.sorted
+ .foreach { case (k, v) => logDebug(s"HiveConf var: $k=$v") }
}
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 71ef99a6a9..b26a9ab699 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -153,7 +153,7 @@ class HiveContext private[hive](
protected[sql] override def parseSql(sql: String): LogicalPlan = {
executionHive.withHiveState {
- super.parseSql(substitutor.substitute(hiveconf, sql))
+ super.parseSql(substitutor.substitute(sessionState.hiveconf, sql))
}
}
@@ -200,28 +200,13 @@ class HiveContext private[hive](
// Also, calling hiveconf will create a default session containing a HiveConf, which
// will interfer with the creation of executionHive (which is a lazy val). So,
// we put hiveconf.set at the end of this method.
- hiveconf.set(key, value)
+ sessionState.hiveconf.set(key, value)
}
override private[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
setConf(entry.key, entry.stringConverter(value))
}
- /**
- * SQLConf and HiveConf contracts:
- *
- * 1. create a new o.a.h.hive.ql.session.SessionState for each HiveContext
- * 2. when the Hive session is first initialized, params in HiveConf will get picked up by the
- * SQLConf. Additionally, any properties set by set() or a SET command inside sql() will be
- * set in the SQLConf *as well as* in the HiveConf.
- */
- @transient
- protected[hive] lazy val hiveconf: HiveConf = {
- val c = executionHive.conf
- setConf(c.getAllProperties)
- c
- }
-
private def functionOrMacroDDLPattern(command: String) = Pattern.compile(
".*(create|drop)\\s+(temporary\\s+)?(function|macro).+", Pattern.DOTALL).matcher(command)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index ccc8345d73..697cf719c1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -299,7 +299,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
CatalogTableType.MANAGED_TABLE
}
- val maybeSerDe = HiveSerDe.sourceToSerDe(provider, hive.hiveconf)
+ val maybeSerDe = HiveSerDe.sourceToSerDe(provider, hive.sessionState.hiveconf)
val dataSource =
DataSource(
hive,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 0cccc22e5a..f91393fc76 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -75,7 +75,7 @@ private[sql] class HiveSessionCatalog(
// ----------------------------------------------------------------
override def getDefaultDBPath(db: String): String = {
- val defaultPath = context.hiveconf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE)
+ val defaultPath = context.sessionState.hiveconf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE)
new Path(new Path(defaultPath), db + ".db").toString
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index bc28b55d06..2b848524f3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.hive
+import org.apache.hadoop.hive.conf.HiveConf
+
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.parser.ParserInterface
@@ -33,6 +35,20 @@ import org.apache.spark.sql.internal.{SessionState, SQLConf}
private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) {
/**
+ * SQLConf and HiveConf contracts:
+ *
+ * 1. create a new o.a.h.hive.ql.session.SessionState for each [[HiveContext]]
+ * 2. when the Hive session is first initialized, params in HiveConf will get picked up by the
+ * SQLConf. Additionally, any properties set by set() or a SET command inside sql() will be
+ * set in the SQLConf *as well as* in the HiveConf.
+ */
+ lazy val hiveconf: HiveConf = {
+ val c = ctx.executionHive.conf
+ ctx.setConf(c.getAllProperties)
+ c
+ }
+
+ /**
* A Hive client used for execution.
*/
val executionHive: HiveClientImpl = ctx.hiveSharedState.executionHive.newSession()
@@ -80,7 +96,7 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx)
/**
* Parser for HiveQl query texts.
*/
- override lazy val sqlParser: ParserInterface = HiveSqlParser
+ override lazy val sqlParser: ParserInterface = new HiveSqlParser(hiveconf)
/**
* Planner that takes into account Hive-specific strategies.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 54afe9c2a3..98a427380d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -72,7 +72,9 @@ class HadoopTableReader(
private val _minSplitsPerRDD = if (sc.sparkContext.isLocal) {
0 // will splitted based on block by default.
} else {
- math.max(sc.hiveconf.getInt("mapred.map.tasks", 1), sc.sparkContext.defaultMinPartitions)
+ math.max(
+ sc.sessionState.hiveconf.getInt("mapred.map.tasks", 1),
+ sc.sparkContext.defaultMinPartitions)
}
SparkHadoopUtil.get.appendS3AndSparkHadoopConfigurations(sc.sparkContext.conf, hiveExtraConf)
@@ -162,7 +164,7 @@ class HadoopTableReader(
case (partition, partDeserializer) =>
def updateExistPathSetByPathPattern(pathPatternStr: String) {
val pathPattern = new Path(pathPatternStr)
- val fs = pathPattern.getFileSystem(sc.hiveconf)
+ val fs = pathPattern.getFileSystem(sc.sessionState.hiveconf)
val matches = fs.globStatus(pathPattern)
matches.foreach(fileStatus => existPathSet += fileStatus.getPath.toString)
}
@@ -259,7 +261,7 @@ class HadoopTableReader(
private def applyFilterIfNeeded(path: Path, filterOpt: Option[PathFilter]): String = {
filterOpt match {
case Some(filter) =>
- val fs = path.getFileSystem(sc.hiveconf)
+ val fs = path.getFileSystem(sc.sessionState.hiveconf)
val filteredFiles = fs.listStatus(path, filter).map(_.getPath.toString)
filteredFiles.mkString(",")
case None => path.toString
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
index a97b65e27b..d5d3ee43d7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
@@ -39,8 +39,8 @@ import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
/**
* Concrete parser for HiveQl statements.
*/
-object HiveSqlParser extends AbstractSqlParser {
- val astBuilder = new HiveSqlAstBuilder
+class HiveSqlParser(hiveConf: HiveConf) extends AbstractSqlParser {
+ val astBuilder = new HiveSqlAstBuilder(hiveConf)
override protected def nativeCommand(sqlText: String): LogicalPlan = {
HiveNativeCommand(sqlText)
@@ -50,26 +50,10 @@ object HiveSqlParser extends AbstractSqlParser {
/**
* Builder that converts an ANTLR ParseTree into a LogicalPlan/Expression/TableIdentifier.
*/
-class HiveSqlAstBuilder extends SparkSqlAstBuilder {
+class HiveSqlAstBuilder(hiveConf: HiveConf) extends SparkSqlAstBuilder {
import ParserUtils._
/**
- * Get the current Hive Configuration.
- */
- private[this] def hiveConf: HiveConf = {
- var ss = SessionState.get()
- // SessionState is lazy initialization, it can be null here
- if (ss == null) {
- val original = Thread.currentThread().getContextClassLoader
- val conf = new HiveConf(classOf[SessionState])
- conf.setClassLoader(original)
- ss = new SessionState(conf)
- SessionState.start(ss)
- }
- ss.getConf
- }
-
- /**
* Pass a command to Hive using a [[HiveNativeCommand]].
*/
override def visitExecuteNativeCommand(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
index 235b80b7c6..3c46b836dc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
@@ -75,7 +75,7 @@ case class HiveTableScan(
// Create a local copy of hiveconf,so that scan specific modifications should not impact
// other queries
@transient
- private[this] val hiveExtraConf = new HiveConf(context.hiveconf)
+ private[this] val hiveExtraConf = new HiveConf(context.sessionState.hiveconf)
// append columns ids and names before broadcast
addColumnMetadataToConf(hiveExtraConf)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 430fa4616f..ed538630d2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -44,7 +44,7 @@ case class InsertIntoHiveTable(
ifNotExists: Boolean) extends UnaryNode {
@transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
- @transient private lazy val hiveContext = new Context(sc.hiveconf)
+ @transient private lazy val hiveContext = new Context(sc.sessionState.hiveconf)
@transient private lazy val client = sc.metadataHive
def output: Seq[Attribute] = Seq.empty
@@ -86,17 +86,17 @@ case class InsertIntoHiveTable(
val tableLocation = table.hiveQlTable.getDataLocation
val tmpLocation = hiveContext.getExternalTmpPath(tableLocation)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
- val isCompressed = sc.hiveconf.getBoolean(
+ val isCompressed = sc.sessionState.hiveconf.getBoolean(
ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal)
if (isCompressed) {
// Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
// and "mapred.output.compression.type" have no impact on ORC because it uses table properties
// to store compression information.
- sc.hiveconf.set("mapred.output.compress", "true")
+ sc.sessionState.hiveconf.set("mapred.output.compress", "true")
fileSinkConf.setCompressed(true)
- fileSinkConf.setCompressCodec(sc.hiveconf.get("mapred.output.compression.codec"))
- fileSinkConf.setCompressType(sc.hiveconf.get("mapred.output.compression.type"))
+ fileSinkConf.setCompressCodec(sc.sessionState.hiveconf.get("mapred.output.compression.codec"))
+ fileSinkConf.setCompressType(sc.sessionState.hiveconf.get("mapred.output.compression.type"))
}
val numDynamicPartitions = partition.values.count(_.isEmpty)
@@ -113,13 +113,13 @@ case class InsertIntoHiveTable(
// Validate partition spec if there exist any dynamic partitions
if (numDynamicPartitions > 0) {
// Report error if dynamic partitioning is not enabled
- if (!sc.hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) {
+ if (!sc.sessionState.hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) {
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
}
// Report error if dynamic partition strict mode is on but no static partition is found
- if (numStaticPartitions == 0 &&
- sc.hiveconf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) {
+ if (numStaticPartitions == 0 && sc.sessionState.hiveconf.getVar(
+ HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) {
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
}
@@ -130,7 +130,7 @@ case class InsertIntoHiveTable(
}
}
- val jobConf = new JobConf(sc.hiveconf)
+ val jobConf = new JobConf(sc.sessionState.hiveconf)
val jobConfSer = new SerializableJobConf(jobConf)
// When speculation is on and output committer class name contains "Direct", we should warn
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index 3566526561..ea48b0e5c2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -64,7 +64,7 @@ case class ScriptTransformation(
override def producedAttributes: AttributeSet = outputSet -- inputSet
- private val serializedHiveConf = new SerializableConfiguration(sc.hiveconf)
+ private val serializedHiveConf = new SerializableConfiguration(sc.sessionState.hiveconf)
protected override def doExecute(): RDD[InternalRow] = {
def processIterator(inputIterator: Iterator[InternalRow]): Iterator[InternalRow] = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 0c06608ff9..5ef502afa5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -105,7 +105,8 @@ case class AnalyzeTable(tableName: String) extends RunnableCommand {
Option(tableParameters.get(StatsSetupConst.TOTAL_SIZE))
.map(_.toLong)
.getOrElse(0L)
- val newTotalSize = getFileSizeForTable(hiveContext.hiveconf, relation.hiveQlTable)
+ val newTotalSize =
+ getFileSizeForTable(hiveContext.sessionState.hiveconf, relation.hiveQlTable)
// Update the Hive metastore if the total size of the table is different than the size
// recorded in the Hive metastore.
// This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index d56d36fe32..2767528395 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -112,9 +112,9 @@ class TestHiveContext private[hive](
// By clearing the port we force Spark to pick a new one. This allows us to rerun tests
// without restarting the JVM.
System.clearProperty("spark.hostPort")
- CommandProcessorFactory.clean(hiveconf)
+ CommandProcessorFactory.clean(sessionState.hiveconf)
- hiveconf.set("hive.plan.serialization.format", "javaXML")
+ sessionState.hiveconf.set("hive.plan.serialization.format", "javaXML")
// A snapshot of the entries in the starting SQLConf
// We save this because tests can mutate this singleton object if they want
@@ -136,7 +136,7 @@ class TestHiveContext private[hive](
// Override so we can intercept relative paths and rewrite them to point at hive.
override def runSqlHive(sql: String): Seq[String] =
- super.runSqlHive(rewritePaths(substitutor.substitute(this.hiveconf, sql)))
+ super.runSqlHive(rewritePaths(substitutor.substitute(sessionState.hiveconf, sql)))
override def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution(plan)
@@ -461,7 +461,7 @@ class TestHiveContext private[hive](
foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) }
// Some tests corrupt this value on purpose, which breaks the RESET call below.
- hiveconf.set("fs.default.name", new File(".").toURI.toString)
+ sessionState.hiveconf.set("fs.default.name", new File(".").toURI.toString)
// It is important that we RESET first as broken hooks that might have been set could break
// other sql exec here.
executionHive.runSqlHive("RESET")
@@ -476,7 +476,7 @@ class TestHiveContext private[hive](
// In case a test changed any of these values, restore all the original ones here.
TestHiveContext.hiveClientConfigurations(
- hiveconf, warehousePath, scratchDirPath, metastoreTemporaryConf)
+ sessionState.hiveconf, warehousePath, scratchDirPath, metastoreTemporaryConf)
.foreach { case (k, v) => metadataHive.runSqlHive(s"SET $k=$v") }
defaultOverrides()
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala
index d9664680f4..b9e7a36b41 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala
@@ -131,7 +131,7 @@ class ErrorPositionSuite extends QueryTest with TestHiveSingleton with BeforeAnd
* @param token a unique token in the string that should be indicated by the exception
*/
def positionTest(name: String, query: String, token: String): Unit = {
- def ast = HiveSqlParser.parsePlan(query)
+ def ast = hiveContext.sessionState.sqlParser.parsePlan(query)
def parseTree = Try(quietly(ast.treeString)).getOrElse("<failed to parse>")
test(name) {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 110c6d19d8..484cf528e6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -30,10 +30,11 @@ import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation}
import org.apache.spark.sql.execution.command.{CreateTable, CreateTableLike}
-import org.apache.spark.sql.hive.execution.{HiveNativeCommand, HiveSqlParser}
+import org.apache.spark.sql.hive.execution.HiveNativeCommand
+import org.apache.spark.sql.hive.test.TestHive
class HiveDDLCommandSuite extends PlanTest {
- val parser = HiveSqlParser
+ val parser = TestHive.sessionState.sqlParser
private def extractTableDesc(sql: String): (CatalogTable, Boolean) = {
parser.parsePlan(sql).collect {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala
index 93dcb10f7a..ac3a65032f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala
@@ -24,7 +24,7 @@ class SerializationSuite extends SparkFunSuite {
test("[SPARK-5840] HiveContext should be serializable") {
val hiveContext = org.apache.spark.sql.hive.test.TestHive
- hiveContext.hiveconf
+ hiveContext.sessionState.hiveconf
val serializer = new JavaSerializer(new SparkConf()).newInstance()
val bytes = serializer.serialize(hiveContext)
val deSer = serializer.deserialize[AnyRef](bytes)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 05318f51af..d14c72b34b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -31,7 +31,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
test("parse analyze commands") {
def assertAnalyzeCommand(analyzeCommand: String, c: Class[_]) {
- val parsed = HiveSqlParser.parsePlan(analyzeCommand)
+ val parsed = hiveContext.sessionState.sqlParser.parsePlan(analyzeCommand)
val operators = parsed.collect {
case a: AnalyzeTable => a
case o => o
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index af73baa1f3..2e7a1d921b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -1157,11 +1157,11 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
collectResults(sql(s"SET $testKey=$testVal"))
}
- assert(hiveconf.get(testKey, "") === testVal)
+ assert(sessionState.hiveconf.get(testKey, "") === testVal)
assertResult(defaults ++ Set(testKey -> testVal))(collectResults(sql("SET")))
sql(s"SET ${testKey + testKey}=${testVal + testVal}")
- assert(hiveconf.get(testKey + testKey, "") == testVal + testVal)
+ assert(sessionState.hiveconf.get(testKey + testKey, "") == testVal + testVal)
assertResult(defaults ++ Set(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) {
collectResults(sql("SET"))
}