aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-04-25 13:23:05 -0700
committerAndrew Or <andrew@databricks.com>2016-04-25 13:23:05 -0700
commit3c5e65c339a9b4d5e01375d7f073e444898d34c8 (patch)
tree039f7e382124f03495e9b22cdc00df7791affeb7 /sql/hive-thriftserver
parent6bfe42a3be4fbf8bc6f93a4709038fda8ad0610b (diff)
downloadspark-3c5e65c339a9b4d5e01375d7f073e444898d34c8.tar.gz
spark-3c5e65c339a9b4d5e01375d7f073e444898d34c8.tar.bz2
spark-3c5e65c339a9b4d5e01375d7f073e444898d34c8.zip
[SPARK-14721][SQL] Remove HiveContext (part 2)
## What changes were proposed in this pull request? This removes the class `HiveContext` itself along with all code usages associated with it. The bulk of the work was already done in #12485. This is mainly just code cleanup and actually removing the class. Note: A couple of things will break after this patch. These will be fixed separately. - the python HiveContext - all the documentation / comments referencing HiveContext - there will be no more HiveContext in the REPL (fixed by #12589) ## How was this patch tested? No change in functionality. Author: Andrew Or <andrew@databricks.com> Closes #12585 from andrewor14/delete-hive-context.
Diffstat (limited to 'sql/hive-thriftserver')
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala17
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala22
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala2
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala8
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala6
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala23
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala12
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala12
8 files changed, 54 insertions, 48 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index 6703cdbac3..24a25023a6 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -33,7 +33,8 @@ import org.apache.spark.SparkContext
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart}
-import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.hive.HiveSessionState
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab
import org.apache.spark.sql.internal.SQLConf
@@ -53,9 +54,9 @@ object HiveThriftServer2 extends Logging {
* Starts a new thrift server with the given context.
*/
@DeveloperApi
- def startWithContext(sqlContext: HiveContext): Unit = {
+ def startWithContext(sqlContext: SQLContext): Unit = {
val server = new HiveThriftServer2(sqlContext)
- server.init(sqlContext.sessionState.hiveconf)
+ server.init(sqlContext.sessionState.asInstanceOf[HiveSessionState].hiveconf)
server.start()
listener = new HiveThriftServer2Listener(server, sqlContext.conf)
sqlContext.sparkContext.addSparkListener(listener)
@@ -82,11 +83,11 @@ object HiveThriftServer2 extends Logging {
}
try {
- val server = new HiveThriftServer2(SparkSQLEnv.hiveContext)
- server.init(SparkSQLEnv.hiveContext.sessionState.hiveconf)
+ val server = new HiveThriftServer2(SparkSQLEnv.sqlContext)
+ server.init(SparkSQLEnv.sqlContext.sessionState.asInstanceOf[HiveSessionState].hiveconf)
server.start()
logInfo("HiveThriftServer2 started")
- listener = new HiveThriftServer2Listener(server, SparkSQLEnv.hiveContext.conf)
+ listener = new HiveThriftServer2Listener(server, SparkSQLEnv.sqlContext.conf)
SparkSQLEnv.sparkContext.addSparkListener(listener)
uiTab = if (SparkSQLEnv.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) {
Some(new ThriftServerTab(SparkSQLEnv.sparkContext))
@@ -261,7 +262,7 @@ object HiveThriftServer2 extends Logging {
}
}
-private[hive] class HiveThriftServer2(hiveContext: HiveContext)
+private[hive] class HiveThriftServer2(sqlContext: SQLContext)
extends HiveServer2
with ReflectedCompositeService {
// state is tracked internally so that the server only attempts to shut down if it successfully
@@ -269,7 +270,7 @@ private[hive] class HiveThriftServer2(hiveContext: HiveContext)
private val started = new AtomicBoolean(false)
override def init(hiveConf: HiveConf) {
- val sparkSqlCliService = new SparkSQLCLIService(this, hiveContext)
+ val sparkSqlCliService = new SparkSQLCLIService(this, sqlContext)
setSuperField(this, "cliService", sparkSqlCliService)
addService(sparkSqlCliService)
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 4e6dcaa8f4..18b78ab506 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -33,9 +33,9 @@ import org.apache.hive.service.cli.operation.ExecuteStatementOperation
import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, Row => SparkRow}
+import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLContext}
import org.apache.spark.sql.execution.command.SetCommand
-import org.apache.spark.sql.hive.{HiveContext, HiveUtils}
+import org.apache.spark.sql.hive.{HiveSessionState, HiveUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.{Utils => SparkUtils}
@@ -45,7 +45,7 @@ private[hive] class SparkExecuteStatementOperation(
statement: String,
confOverlay: JMap[String, String],
runInBackground: Boolean = true)
- (hiveContext: HiveContext, sessionToActivePool: SMap[SessionHandle, String])
+ (sqlContext: SQLContext, sessionToActivePool: SMap[SessionHandle, String])
extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground)
with Logging {
@@ -68,7 +68,7 @@ private[hive] class SparkExecuteStatementOperation(
def close(): Unit = {
// RDDs will be cleaned automatically upon garbage collection.
- hiveContext.sparkContext.clearJobGroup()
+ sqlContext.sparkContext.clearJobGroup()
logDebug(s"CLOSING $statementId")
cleanup(OperationState.CLOSED)
}
@@ -193,9 +193,9 @@ private[hive] class SparkExecuteStatementOperation(
statementId = UUID.randomUUID().toString
logInfo(s"Running query '$statement' with $statementId")
setState(OperationState.RUNNING)
+ val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
// Always use the latest class loader provided by executionHive's state.
- val executionHiveClassLoader =
- hiveContext.sessionState.executionHive.state.getConf.getClassLoader
+ val executionHiveClassLoader = sessionState.executionHive.state.getConf.getClassLoader
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
HiveThriftServer2.listener.onStatementStart(
@@ -204,12 +204,12 @@ private[hive] class SparkExecuteStatementOperation(
statement,
statementId,
parentSession.getUsername)
- hiveContext.sparkContext.setJobGroup(statementId, statement)
+ sqlContext.sparkContext.setJobGroup(statementId, statement)
sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool =>
- hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
+ sqlContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
try {
- result = hiveContext.sql(statement)
+ result = sqlContext.sql(statement)
logDebug(result.queryExecution.toString())
result.queryExecution.logical match {
case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) =>
@@ -220,7 +220,7 @@ private[hive] class SparkExecuteStatementOperation(
HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString())
iter = {
val useIncrementalCollect =
- hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
+ sqlContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
if (useIncrementalCollect) {
result.toLocalIterator.asScala
} else {
@@ -253,7 +253,7 @@ private[hive] class SparkExecuteStatementOperation(
override def cancel(): Unit = {
logInfo(s"Cancel '$statement' with $statementId")
if (statementId != null) {
- hiveContext.sparkContext.cancelJobGroup(statementId)
+ sqlContext.sparkContext.cancelJobGroup(statementId)
}
cleanup(OperationState.CANCELED)
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 057fbbe6d9..1402e0a687 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -150,7 +150,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
}
if (sessionState.database != null) {
- SparkSQLEnv.hiveContext.sessionState.catalog.setCurrentDatabase(
+ SparkSQLEnv.sqlContext.sessionState.catalog.setCurrentDatabase(
s"${sessionState.database}")
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
index 6fe57554cf..1b17a9a56e 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
@@ -33,17 +33,17 @@ import org.apache.hive.service.auth.HiveAuthFactory
import org.apache.hive.service.cli._
import org.apache.hive.service.server.HiveServer2
-import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
-private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, hiveContext: HiveContext)
+private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLContext)
extends CLIService(hiveServer)
with ReflectedCompositeService {
override def init(hiveConf: HiveConf) {
setSuperField(this, "hiveConf", hiveConf)
- val sparkSqlSessionManager = new SparkSQLSessionManager(hiveServer, hiveContext)
+ val sparkSqlSessionManager = new SparkSQLSessionManager(hiveServer, sqlContext)
setSuperField(this, "sessionManager", sparkSqlSessionManager)
addService(sparkSqlSessionManager)
var sparkServiceUGI: UserGroupInformation = null
@@ -66,7 +66,7 @@ private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, hiveContext: Hiv
getInfoType match {
case GetInfoType.CLI_SERVER_NAME => new GetInfoValue("Spark SQL")
case GetInfoType.CLI_DBMS_NAME => new GetInfoValue("Spark SQL")
- case GetInfoType.CLI_DBMS_VER => new GetInfoValue(hiveContext.sparkContext.version)
+ case GetInfoType.CLI_DBMS_VER => new GetInfoValue(sqlContext.sparkContext.version)
case _ => super.getInfo(sessionHandle, getInfoType)
}
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
index 1fa885177e..c24e474d9c 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
@@ -27,11 +27,11 @@ import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{AnalysisException, SQLContext}
import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.hive.HiveContext
-private[hive] class SparkSQLDriver(val context: HiveContext = SparkSQLEnv.hiveContext)
+
+private[hive] class SparkSQLDriver(val context: SQLContext = SparkSQLEnv.sqlContext)
extends Driver
with Logging {
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
index a44b0d3e8e..268ba2f0bc 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
@@ -23,18 +23,19 @@ import scala.collection.JavaConverters._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.hive.{HiveContext, HiveUtils}
+import org.apache.spark.sql.{SparkSession, SQLContext}
+import org.apache.spark.sql.hive.{HiveSessionState, HiveUtils}
import org.apache.spark.util.Utils
/** A singleton object for the master program. The slaves should not access this. */
private[hive] object SparkSQLEnv extends Logging {
logDebug("Initializing SparkSQLEnv")
- var hiveContext: HiveContext = _
+ var sqlContext: SQLContext = _
var sparkContext: SparkContext = _
def init() {
- if (hiveContext == null) {
+ if (sqlContext == null) {
val sparkConf = new SparkConf(loadDefaults = true)
val maybeSerializer = sparkConf.getOption("spark.serializer")
val maybeKryoReferenceTracking = sparkConf.getOption("spark.kryo.referenceTracking")
@@ -54,16 +55,16 @@ private[hive] object SparkSQLEnv extends Logging {
maybeKryoReferenceTracking.getOrElse("false"))
sparkContext = new SparkContext(sparkConf)
- hiveContext = new HiveContext(sparkContext)
+ sqlContext = SparkSession.withHiveSupport(sparkContext).wrapped
+ val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
+ sessionState.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8"))
+ sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
+ sessionState.metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))
- hiveContext.sessionState.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8"))
- hiveContext.sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
- hiveContext.sessionState.metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))
-
- hiveContext.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion)
+ sqlContext.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion)
if (log.isDebugEnabled) {
- hiveContext.sessionState.hiveconf.getAllProperties.asScala.toSeq.sorted
+ sessionState.hiveconf.getAllProperties.asScala.toSeq.sorted
.foreach { case (k, v) => logDebug(s"HiveConf var: $k=$v") }
}
}
@@ -76,7 +77,7 @@ private[hive] object SparkSQLEnv extends Logging {
if (SparkSQLEnv.sparkContext != null) {
sparkContext.stop()
sparkContext = null
- hiveContext = null
+ sqlContext = null
}
}
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
index a0beffdaa2..1e4c479085 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -27,12 +27,13 @@ import org.apache.hive.service.cli.session.SessionManager
import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.hive.service.server.HiveServer2
-import org.apache.spark.sql.hive.{HiveContext, HiveUtils}
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.hive.{HiveSessionState, HiveUtils}
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
-private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext: HiveContext)
+private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: SQLContext)
extends SessionManager(hiveServer)
with ReflectedCompositeService {
@@ -71,10 +72,11 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext:
val session = super.getSession(sessionHandle)
HiveThriftServer2.listener.onSessionCreated(
session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername)
- val ctx = if (hiveContext.sessionState.hiveThriftServerSingleSession) {
- hiveContext
+ val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
+ val ctx = if (sessionState.hiveThriftServerSingleSession) {
+ sqlContext
} else {
- hiveContext.newSession()
+ sqlContext.newSession()
}
ctx.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion)
sparkSqlOperationManager.sessionToContexts += sessionHandle -> ctx
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
index da410c68c8..79625239de 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
@@ -26,7 +26,8 @@ import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operati
import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.hive.HiveSessionState
import org.apache.spark.sql.hive.thriftserver.{ReflectionUtils, SparkExecuteStatementOperation}
/**
@@ -39,17 +40,18 @@ private[thriftserver] class SparkSQLOperationManager()
.getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")
val sessionToActivePool = Map[SessionHandle, String]()
- val sessionToContexts = Map[SessionHandle, HiveContext]()
+ val sessionToContexts = Map[SessionHandle, SQLContext]()
override def newExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String],
async: Boolean): ExecuteStatementOperation = synchronized {
- val hiveContext = sessionToContexts(parentSession.getSessionHandle)
- val runInBackground = async && hiveContext.sessionState.hiveThriftServerAsync
+ val sqlContext = sessionToContexts(parentSession.getSessionHandle)
+ val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
+ val runInBackground = async && sessionState.hiveThriftServerAsync
val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
- runInBackground)(hiveContext, sessionToActivePool)
+ runInBackground)(sqlContext, sessionToActivePool)
handleToOperation.put(operation.getHandle, operation)
logDebug(s"Created Operation for $statement with session=$parentSession, " +
s"runInBackground=$runInBackground")