From 3c5e65c339a9b4d5e01375d7f073e444898d34c8 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 25 Apr 2016 13:23:05 -0700 Subject: [SPARK-14721][SQL] Remove HiveContext (part 2) ## What changes were proposed in this pull request? This removes the class `HiveContext` itself along with all code usages associated with it. The bulk of the work was already done in #12485. This is mainly just code cleanup and actually removing the class. Note: A couple of things will break after this patch. These will be fixed separately. - the python HiveContext - all the documentation / comments referencing HiveContext - there will be no more HiveContext in the REPL (fixed by #12589) ## How was this patch tested? No change in functionality. Author: Andrew Or Closes #12585 from andrewor14/delete-hive-context. --- .../sql/hive/thriftserver/HiveThriftServer2.scala | 17 ++++++++-------- .../SparkExecuteStatementOperation.scala | 22 ++++++++++----------- .../sql/hive/thriftserver/SparkSQLCLIDriver.scala | 2 +- .../sql/hive/thriftserver/SparkSQLCLIService.scala | 8 ++++---- .../sql/hive/thriftserver/SparkSQLDriver.scala | 6 +++--- .../spark/sql/hive/thriftserver/SparkSQLEnv.scala | 23 +++++++++++----------- .../hive/thriftserver/SparkSQLSessionManager.scala | 12 ++++++----- .../server/SparkSQLOperationManager.scala | 12 ++++++----- 8 files changed, 54 insertions(+), 48 deletions(-) (limited to 'sql/hive-thriftserver') diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 6703cdbac3..24a25023a6 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -33,7 +33,8 @@ import org.apache.spark.SparkContext import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart} -import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.hive.HiveSessionState import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab import org.apache.spark.sql.internal.SQLConf @@ -53,9 +54,9 @@ object HiveThriftServer2 extends Logging { * Starts a new thrift server with the given context. */ @DeveloperApi - def startWithContext(sqlContext: HiveContext): Unit = { + def startWithContext(sqlContext: SQLContext): Unit = { val server = new HiveThriftServer2(sqlContext) - server.init(sqlContext.sessionState.hiveconf) + server.init(sqlContext.sessionState.asInstanceOf[HiveSessionState].hiveconf) server.start() listener = new HiveThriftServer2Listener(server, sqlContext.conf) sqlContext.sparkContext.addSparkListener(listener) @@ -82,11 +83,11 @@ object HiveThriftServer2 extends Logging { } try { - val server = new HiveThriftServer2(SparkSQLEnv.hiveContext) - server.init(SparkSQLEnv.hiveContext.sessionState.hiveconf) + val server = new HiveThriftServer2(SparkSQLEnv.sqlContext) + server.init(SparkSQLEnv.sqlContext.sessionState.asInstanceOf[HiveSessionState].hiveconf) server.start() logInfo("HiveThriftServer2 started") - listener = new HiveThriftServer2Listener(server, SparkSQLEnv.hiveContext.conf) + listener = new HiveThriftServer2Listener(server, SparkSQLEnv.sqlContext.conf) SparkSQLEnv.sparkContext.addSparkListener(listener) uiTab = if (SparkSQLEnv.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) { Some(new ThriftServerTab(SparkSQLEnv.sparkContext)) @@ -261,7 +262,7 @@ object HiveThriftServer2 extends Logging { } } -private[hive] class HiveThriftServer2(hiveContext: HiveContext) +private[hive] class HiveThriftServer2(sqlContext: SQLContext) extends HiveServer2 with ReflectedCompositeService { // state is tracked internally so that the server only attempts to shut down if it successfully @@ -269,7 +270,7 @@ private[hive] class HiveThriftServer2(hiveContext: HiveContext) private val started = new AtomicBoolean(false) override def init(hiveConf: HiveConf) { - val sparkSqlCliService = new SparkSQLCLIService(this, hiveContext) + val sparkSqlCliService = new SparkSQLCLIService(this, sqlContext) setSuperField(this, "cliService", sparkSqlCliService) addService(sparkSqlCliService) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 4e6dcaa8f4..18b78ab506 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -33,9 +33,9 @@ import org.apache.hive.service.cli.operation.ExecuteStatementOperation import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.Logging -import org.apache.spark.sql.{DataFrame, Row => SparkRow} +import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLContext} import org.apache.spark.sql.execution.command.SetCommand -import org.apache.spark.sql.hive.{HiveContext, HiveUtils} +import org.apache.spark.sql.hive.{HiveSessionState, HiveUtils} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.{Utils => SparkUtils} @@ -45,7 +45,7 @@ private[hive] class SparkExecuteStatementOperation( statement: String, confOverlay: JMap[String, String], runInBackground: Boolean = true) - (hiveContext: HiveContext, sessionToActivePool: SMap[SessionHandle, String]) + (sqlContext: SQLContext, sessionToActivePool: SMap[SessionHandle, String]) extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground) with Logging { @@ -68,7 +68,7 @@ private[hive] class SparkExecuteStatementOperation( def close(): Unit = { // RDDs will be cleaned automatically upon garbage collection. - hiveContext.sparkContext.clearJobGroup() + sqlContext.sparkContext.clearJobGroup() logDebug(s"CLOSING $statementId") cleanup(OperationState.CLOSED) } @@ -193,9 +193,9 @@ private[hive] class SparkExecuteStatementOperation( statementId = UUID.randomUUID().toString logInfo(s"Running query '$statement' with $statementId") setState(OperationState.RUNNING) + val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] // Always use the latest class loader provided by executionHive's state. - val executionHiveClassLoader = - hiveContext.sessionState.executionHive.state.getConf.getClassLoader + val executionHiveClassLoader = sessionState.executionHive.state.getConf.getClassLoader Thread.currentThread().setContextClassLoader(executionHiveClassLoader) HiveThriftServer2.listener.onStatementStart( @@ -204,12 +204,12 @@ private[hive] class SparkExecuteStatementOperation( statement, statementId, parentSession.getUsername) - hiveContext.sparkContext.setJobGroup(statementId, statement) + sqlContext.sparkContext.setJobGroup(statementId, statement) sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool => - hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) + sqlContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) } try { - result = hiveContext.sql(statement) + result = sqlContext.sql(statement) logDebug(result.queryExecution.toString()) result.queryExecution.logical match { case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) => @@ -220,7 +220,7 @@ private[hive] class SparkExecuteStatementOperation( HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString()) iter = { val useIncrementalCollect = - hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean + sqlContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean if (useIncrementalCollect) { result.toLocalIterator.asScala } else { @@ -253,7 +253,7 @@ private[hive] class SparkExecuteStatementOperation( override def cancel(): Unit = { logInfo(s"Cancel '$statement' with $statementId") if (statementId != null) { - hiveContext.sparkContext.cancelJobGroup(statementId) + sqlContext.sparkContext.cancelJobGroup(statementId) } cleanup(OperationState.CANCELED) } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 057fbbe6d9..1402e0a687 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -150,7 +150,7 @@ private[hive] object SparkSQLCLIDriver extends Logging { } if (sessionState.database != null) { - SparkSQLEnv.hiveContext.sessionState.catalog.setCurrentDatabase( + SparkSQLEnv.sqlContext.sessionState.catalog.setCurrentDatabase( s"${sessionState.database}") } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala index 6fe57554cf..1b17a9a56e 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala @@ -33,17 +33,17 @@ import org.apache.hive.service.auth.HiveAuthFactory import org.apache.hive.service.cli._ import org.apache.hive.service.server.HiveServer2 -import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ -private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, hiveContext: HiveContext) +private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLContext) extends CLIService(hiveServer) with ReflectedCompositeService { override def init(hiveConf: HiveConf) { setSuperField(this, "hiveConf", hiveConf) - val sparkSqlSessionManager = new SparkSQLSessionManager(hiveServer, hiveContext) + val sparkSqlSessionManager = new SparkSQLSessionManager(hiveServer, sqlContext) setSuperField(this, "sessionManager", sparkSqlSessionManager) addService(sparkSqlSessionManager) var sparkServiceUGI: UserGroupInformation = null @@ -66,7 +66,7 @@ private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, hiveContext: Hiv getInfoType match { case GetInfoType.CLI_SERVER_NAME => new GetInfoValue("Spark SQL") case GetInfoType.CLI_DBMS_NAME => new GetInfoValue("Spark SQL") - case GetInfoType.CLI_DBMS_VER => new GetInfoValue(hiveContext.sparkContext.version) + case GetInfoType.CLI_DBMS_VER => new GetInfoValue(sqlContext.sparkContext.version) case _ => super.getInfo(sessionHandle, getInfoType) } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala index 1fa885177e..c24e474d9c 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala @@ -27,11 +27,11 @@ import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse import org.apache.spark.internal.Logging -import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.{AnalysisException, SQLContext} import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.hive.HiveContext -private[hive] class SparkSQLDriver(val context: HiveContext = SparkSQLEnv.hiveContext) + +private[hive] class SparkSQLDriver(val context: SQLContext = SparkSQLEnv.sqlContext) extends Driver with Logging { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index a44b0d3e8e..268ba2f0bc 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -23,18 +23,19 @@ import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging -import org.apache.spark.sql.hive.{HiveContext, HiveUtils} +import org.apache.spark.sql.{SparkSession, SQLContext} +import org.apache.spark.sql.hive.{HiveSessionState, HiveUtils} import org.apache.spark.util.Utils /** A singleton object for the master program. The slaves should not access this. */ private[hive] object SparkSQLEnv extends Logging { logDebug("Initializing SparkSQLEnv") - var hiveContext: HiveContext = _ + var sqlContext: SQLContext = _ var sparkContext: SparkContext = _ def init() { - if (hiveContext == null) { + if (sqlContext == null) { val sparkConf = new SparkConf(loadDefaults = true) val maybeSerializer = sparkConf.getOption("spark.serializer") val maybeKryoReferenceTracking = sparkConf.getOption("spark.kryo.referenceTracking") @@ -54,16 +55,16 @@ private[hive] object SparkSQLEnv extends Logging { maybeKryoReferenceTracking.getOrElse("false")) sparkContext = new SparkContext(sparkConf) - hiveContext = new HiveContext(sparkContext) + sqlContext = SparkSession.withHiveSupport(sparkContext).wrapped + val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] + sessionState.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8")) + sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8")) + sessionState.metadataHive.setError(new PrintStream(System.err, true, "UTF-8")) - hiveContext.sessionState.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8")) - hiveContext.sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8")) - hiveContext.sessionState.metadataHive.setError(new PrintStream(System.err, true, "UTF-8")) - - hiveContext.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion) + sqlContext.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion) if (log.isDebugEnabled) { - hiveContext.sessionState.hiveconf.getAllProperties.asScala.toSeq.sorted + sessionState.hiveconf.getAllProperties.asScala.toSeq.sorted .foreach { case (k, v) => logDebug(s"HiveConf var: $k=$v") } } } @@ -76,7 +77,7 @@ private[hive] object SparkSQLEnv extends Logging { if (SparkSQLEnv.sparkContext != null) { sparkContext.stop() sparkContext = null - hiveContext = null + sqlContext = null } } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index a0beffdaa2..1e4c479085 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -27,12 +27,13 @@ import org.apache.hive.service.cli.session.SessionManager import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.hive.service.server.HiveServer2 -import org.apache.spark.sql.hive.{HiveContext, HiveUtils} +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.hive.{HiveSessionState, HiveUtils} import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager -private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext: HiveContext) +private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: SQLContext) extends SessionManager(hiveServer) with ReflectedCompositeService { @@ -71,10 +72,11 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext: val session = super.getSession(sessionHandle) HiveThriftServer2.listener.onSessionCreated( session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername) - val ctx = if (hiveContext.sessionState.hiveThriftServerSingleSession) { - hiveContext + val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] + val ctx = if (sessionState.hiveThriftServerSingleSession) { + sqlContext } else { - hiveContext.newSession() + sqlContext.newSession() } ctx.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion) sparkSqlOperationManager.sessionToContexts += sessionHandle -> ctx diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index da410c68c8..79625239de 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -26,7 +26,8 @@ import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operati import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.Logging -import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.hive.HiveSessionState import org.apache.spark.sql.hive.thriftserver.{ReflectionUtils, SparkExecuteStatementOperation} /** @@ -39,17 +40,18 @@ private[thriftserver] class SparkSQLOperationManager() .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation") val sessionToActivePool = Map[SessionHandle, String]() - val sessionToContexts = Map[SessionHandle, HiveContext]() + val sessionToContexts = Map[SessionHandle, SQLContext]() override def newExecuteStatementOperation( parentSession: HiveSession, statement: String, confOverlay: JMap[String, String], async: Boolean): ExecuteStatementOperation = synchronized { - val hiveContext = sessionToContexts(parentSession.getSessionHandle) - val runInBackground = async && hiveContext.sessionState.hiveThriftServerAsync + val sqlContext = sessionToContexts(parentSession.getSessionHandle) + val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] + val runInBackground = async && sessionState.hiveThriftServerAsync val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay, - runInBackground)(hiveContext, sessionToActivePool) + runInBackground)(sqlContext, sessionToActivePool) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created Operation for $statement with session=$parentSession, " + s"runInBackground=$runInBackground") -- cgit v1.2.3