diff options
author | Davies Liu <davies@databricks.com> | 2015-10-08 17:34:24 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2015-10-08 17:34:24 -0700 |
commit | 3390b400d04e40f767d8a51f1078fcccb4e64abd (patch) | |
tree | d48ed36a14abf0b15467c9ae9c7c04933fdd3a19 /sql/hive-thriftserver/src/main | |
parent | 84ea287178247c163226e835490c9c70b17d8d3b (diff) | |
download | spark-3390b400d04e40f767d8a51f1078fcccb4e64abd.tar.gz spark-3390b400d04e40f767d8a51f1078fcccb4e64abd.tar.bz2 spark-3390b400d04e40f767d8a51f1078fcccb4e64abd.zip |
[SPARK-10810] [SPARK-10902] [SQL] Improve session management in SQL
This PR improve the sessions management by replacing the thread-local based to one SQLContext per session approach, introduce separated temporary tables and UDFs/UDAFs for each session.
A new session of SQLContext could be created by:
1) create an new SQLContext
2) call newSession() on existing SQLContext
For HiveContext, in order to reduce the cost for each session, the classloader and Hive client are shared across multiple sessions (created by newSession).
CacheManager is also shared by multiple sessions, so cache a table multiple times in different sessions will not cause multiple copies of in-memory cache.
Added jars are still shared by all the sessions, because SparkContext does not support sessions.
cc marmbrus yhuai rxin
Author: Davies Liu <davies@databricks.com>
Closes #8909 from davies/sessions.
Diffstat (limited to 'sql/hive-thriftserver/src/main')
3 files changed, 19 insertions, 71 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 306f98bcb5..719b03e1c7 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -20,19 +20,15 @@ package org.apache.spark.sql.hive.thriftserver import java.security.PrivilegedExceptionAction import java.sql.{Date, Timestamp} import java.util.concurrent.RejectedExecutionException -import java.util.{Arrays, Map => JMap, UUID} +import java.util.{Arrays, UUID, Map => JMap} import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, Map => SMap} import scala.util.control.NonFatal -import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.api.FieldSchema -import org.apache.hive.service.cli._ -import org.apache.hadoop.hive.ql.metadata.Hive -import org.apache.hadoop.hive.ql.metadata.HiveException -import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.shims.Utils +import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.ExecuteStatementOperation import org.apache.hive.service.cli.session.HiveSession @@ -40,7 +36,7 @@ import org.apache.spark.Logging import org.apache.spark.sql.execution.SetCommand import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes} import org.apache.spark.sql.types._ -import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLConf} +import org.apache.spark.sql.{DataFrame, SQLConf, Row => SparkRow} private[hive] class SparkExecuteStatementOperation( @@ -143,30 +139,15 @@ private[hive] class SparkExecuteStatementOperation( if (!runInBackground) { runInternal() } else { - val parentSessionState = SessionState.get() - val hiveConf = getConfigForOperation() val sparkServiceUGI = Utils.getUGI() - val sessionHive = getCurrentHive() - val currentSqlSession = hiveContext.currentSession // Runnable impl to call runInternal asynchronously, // from a different thread val backgroundOperation = new Runnable() { override def run(): Unit = { - val doAsAction = new PrivilegedExceptionAction[Object]() { - override def run(): Object = { - - // User information is part of the metastore client member in Hive - hiveContext.setSession(currentSqlSession) - // Always use the latest class loader provided by executionHive's state. - val executionHiveClassLoader = - hiveContext.executionHive.state.getConf.getClassLoader - sessionHive.getConf.setClassLoader(executionHiveClassLoader) - parentSessionState.getConf.setClassLoader(executionHiveClassLoader) - - Hive.set(sessionHive) - SessionState.setCurrentSessionState(parentSessionState) + val doAsAction = new PrivilegedExceptionAction[Unit]() { + override def run(): Unit = { try { runInternal() } catch { @@ -174,7 +155,6 @@ private[hive] class SparkExecuteStatementOperation( setOperationException(e) log.error("Error running hive query: ", e) } - return null } } @@ -191,7 +171,7 @@ private[hive] class SparkExecuteStatementOperation( try { // This submit blocks if no background threads are available to run this operation val backgroundHandle = - getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation) + parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation) setBackgroundHandle(backgroundHandle) } catch { case rejected: RejectedExecutionException => @@ -210,6 +190,11 @@ private[hive] class SparkExecuteStatementOperation( statementId = UUID.randomUUID().toString logInfo(s"Running query '$statement' with $statementId") setState(OperationState.RUNNING) + // Always use the latest class loader provided by executionHive's state. + val executionHiveClassLoader = + hiveContext.executionHive.state.getConf.getClassLoader + Thread.currentThread().setContextClassLoader(executionHiveClassLoader) + HiveThriftServer2.listener.onStatementStart( statementId, parentSession.getSessionHandle.getSessionId.toString, @@ -279,43 +264,4 @@ private[hive] class SparkExecuteStatementOperation( } } } - - /** - * If there are query specific settings to overlay, then create a copy of config - * There are two cases we need to clone the session config that's being passed to hive driver - * 1. Async query - - * If the client changes a config setting, that shouldn't reflect in the execution - * already underway - * 2. confOverlay - - * The query specific settings should only be applied to the query config and not session - * @return new configuration - * @throws HiveSQLException - */ - private def getConfigForOperation(): HiveConf = { - var sqlOperationConf = getParentSession().getHiveConf() - if (!getConfOverlay().isEmpty() || runInBackground) { - // clone the partent session config for this query - sqlOperationConf = new HiveConf(sqlOperationConf) - - // apply overlay query specific settings, if any - getConfOverlay().asScala.foreach { case (k, v) => - try { - sqlOperationConf.verifyAndSet(k, v) - } catch { - case e: IllegalArgumentException => - throw new HiveSQLException("Error applying statement specific settings", e) - } - } - } - return sqlOperationConf - } - - private def getCurrentHive(): Hive = { - try { - return Hive.get() - } catch { - case e: HiveException => - throw new HiveSQLException("Failed to get current Hive object", e); - } - } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index 92ac0ec3fc..33aaead3fb 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -36,7 +36,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext: extends SessionManager(hiveServer) with ReflectedCompositeService { - private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext) + private lazy val sparkSqlOperationManager = new SparkSQLOperationManager() override def init(hiveConf: HiveConf) { setSuperField(this, "hiveConf", hiveConf) @@ -60,13 +60,15 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext: sessionConf: java.util.Map[String, String], withImpersonation: Boolean, delegationToken: String): SessionHandle = { - hiveContext.openSession() val sessionHandle = super.openSession(protocol, username, passwd, ipAddress, sessionConf, withImpersonation, delegationToken) val session = super.getSession(sessionHandle) HiveThriftServer2.listener.onSessionCreated( session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername) + val ctx = hiveContext.newSession() + ctx.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion) + sparkSqlOperationManager.sessionToContexts += sessionHandle -> ctx sessionHandle } @@ -74,7 +76,6 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext: HiveThriftServer2.listener.onSessionClosed(sessionHandle.getSessionId.toString) super.closeSession(sessionHandle) sparkSqlOperationManager.sessionToActivePool -= sessionHandle - - hiveContext.detachSession() + sparkSqlOperationManager.sessionToContexts.remove(sessionHandle) } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index c8031ed0f3..476651a559 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -30,20 +30,21 @@ import org.apache.spark.sql.hive.thriftserver.{SparkExecuteStatementOperation, R /** * Executes queries using Spark SQL, and maintains a list of handles to active queries. */ -private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext) +private[thriftserver] class SparkSQLOperationManager() extends OperationManager with Logging { val handleToOperation = ReflectionUtils .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation") val sessionToActivePool = Map[SessionHandle, String]() + val sessionToContexts = Map[SessionHandle, HiveContext]() override def newExecuteStatementOperation( parentSession: HiveSession, statement: String, confOverlay: JMap[String, String], async: Boolean): ExecuteStatementOperation = synchronized { - + val hiveContext = sessionToContexts(parentSession.getSessionHandle) val runInBackground = async && hiveContext.hiveThriftServerAsync val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground)(hiveContext, sessionToActivePool) |