aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver/src/main
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-10-08 17:34:24 -0700
committerDavies Liu <davies.liu@gmail.com>2015-10-08 17:34:24 -0700
commit3390b400d04e40f767d8a51f1078fcccb4e64abd (patch)
treed48ed36a14abf0b15467c9ae9c7c04933fdd3a19 /sql/hive-thriftserver/src/main
parent84ea287178247c163226e835490c9c70b17d8d3b (diff)
downloadspark-3390b400d04e40f767d8a51f1078fcccb4e64abd.tar.gz
spark-3390b400d04e40f767d8a51f1078fcccb4e64abd.tar.bz2
spark-3390b400d04e40f767d8a51f1078fcccb4e64abd.zip
[SPARK-10810] [SPARK-10902] [SQL] Improve session management in SQL
This PR improve the sessions management by replacing the thread-local based to one SQLContext per session approach, introduce separated temporary tables and UDFs/UDAFs for each session. A new session of SQLContext could be created by: 1) create an new SQLContext 2) call newSession() on existing SQLContext For HiveContext, in order to reduce the cost for each session, the classloader and Hive client are shared across multiple sessions (created by newSession). CacheManager is also shared by multiple sessions, so cache a table multiple times in different sessions will not cause multiple copies of in-memory cache. Added jars are still shared by all the sessions, because SparkContext does not support sessions. cc marmbrus yhuai rxin Author: Davies Liu <davies@databricks.com> Closes #8909 from davies/sessions.
Diffstat (limited to 'sql/hive-thriftserver/src/main')
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala76
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala9
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala5
3 files changed, 19 insertions, 71 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 306f98bcb5..719b03e1c7 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -20,19 +20,15 @@ package org.apache.spark.sql.hive.thriftserver
import java.security.PrivilegedExceptionAction
import java.sql.{Date, Timestamp}
import java.util.concurrent.RejectedExecutionException
-import java.util.{Arrays, Map => JMap, UUID}
+import java.util.{Arrays, UUID, Map => JMap}
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, Map => SMap}
import scala.util.control.NonFatal
-import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.hive.service.cli._
-import org.apache.hadoop.hive.ql.metadata.Hive
-import org.apache.hadoop.hive.ql.metadata.HiveException
-import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.shims.Utils
+import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.ExecuteStatementOperation
import org.apache.hive.service.cli.session.HiveSession
@@ -40,7 +36,7 @@ import org.apache.spark.Logging
import org.apache.spark.sql.execution.SetCommand
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLConf}
+import org.apache.spark.sql.{DataFrame, SQLConf, Row => SparkRow}
private[hive] class SparkExecuteStatementOperation(
@@ -143,30 +139,15 @@ private[hive] class SparkExecuteStatementOperation(
if (!runInBackground) {
runInternal()
} else {
- val parentSessionState = SessionState.get()
- val hiveConf = getConfigForOperation()
val sparkServiceUGI = Utils.getUGI()
- val sessionHive = getCurrentHive()
- val currentSqlSession = hiveContext.currentSession
// Runnable impl to call runInternal asynchronously,
// from a different thread
val backgroundOperation = new Runnable() {
override def run(): Unit = {
- val doAsAction = new PrivilegedExceptionAction[Object]() {
- override def run(): Object = {
-
- // User information is part of the metastore client member in Hive
- hiveContext.setSession(currentSqlSession)
- // Always use the latest class loader provided by executionHive's state.
- val executionHiveClassLoader =
- hiveContext.executionHive.state.getConf.getClassLoader
- sessionHive.getConf.setClassLoader(executionHiveClassLoader)
- parentSessionState.getConf.setClassLoader(executionHiveClassLoader)
-
- Hive.set(sessionHive)
- SessionState.setCurrentSessionState(parentSessionState)
+ val doAsAction = new PrivilegedExceptionAction[Unit]() {
+ override def run(): Unit = {
try {
runInternal()
} catch {
@@ -174,7 +155,6 @@ private[hive] class SparkExecuteStatementOperation(
setOperationException(e)
log.error("Error running hive query: ", e)
}
- return null
}
}
@@ -191,7 +171,7 @@ private[hive] class SparkExecuteStatementOperation(
try {
// This submit blocks if no background threads are available to run this operation
val backgroundHandle =
- getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation)
+ parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
@@ -210,6 +190,11 @@ private[hive] class SparkExecuteStatementOperation(
statementId = UUID.randomUUID().toString
logInfo(s"Running query '$statement' with $statementId")
setState(OperationState.RUNNING)
+ // Always use the latest class loader provided by executionHive's state.
+ val executionHiveClassLoader =
+ hiveContext.executionHive.state.getConf.getClassLoader
+ Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
+
HiveThriftServer2.listener.onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
@@ -279,43 +264,4 @@ private[hive] class SparkExecuteStatementOperation(
}
}
}
-
- /**
- * If there are query specific settings to overlay, then create a copy of config
- * There are two cases we need to clone the session config that's being passed to hive driver
- * 1. Async query -
- * If the client changes a config setting, that shouldn't reflect in the execution
- * already underway
- * 2. confOverlay -
- * The query specific settings should only be applied to the query config and not session
- * @return new configuration
- * @throws HiveSQLException
- */
- private def getConfigForOperation(): HiveConf = {
- var sqlOperationConf = getParentSession().getHiveConf()
- if (!getConfOverlay().isEmpty() || runInBackground) {
- // clone the partent session config for this query
- sqlOperationConf = new HiveConf(sqlOperationConf)
-
- // apply overlay query specific settings, if any
- getConfOverlay().asScala.foreach { case (k, v) =>
- try {
- sqlOperationConf.verifyAndSet(k, v)
- } catch {
- case e: IllegalArgumentException =>
- throw new HiveSQLException("Error applying statement specific settings", e)
- }
- }
- }
- return sqlOperationConf
- }
-
- private def getCurrentHive(): Hive = {
- try {
- return Hive.get()
- } catch {
- case e: HiveException =>
- throw new HiveSQLException("Failed to get current Hive object", e);
- }
- }
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
index 92ac0ec3fc..33aaead3fb 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -36,7 +36,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext:
extends SessionManager(hiveServer)
with ReflectedCompositeService {
- private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext)
+ private lazy val sparkSqlOperationManager = new SparkSQLOperationManager()
override def init(hiveConf: HiveConf) {
setSuperField(this, "hiveConf", hiveConf)
@@ -60,13 +60,15 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext:
sessionConf: java.util.Map[String, String],
withImpersonation: Boolean,
delegationToken: String): SessionHandle = {
- hiveContext.openSession()
val sessionHandle =
super.openSession(protocol, username, passwd, ipAddress, sessionConf, withImpersonation,
delegationToken)
val session = super.getSession(sessionHandle)
HiveThriftServer2.listener.onSessionCreated(
session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername)
+ val ctx = hiveContext.newSession()
+ ctx.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)
+ sparkSqlOperationManager.sessionToContexts += sessionHandle -> ctx
sessionHandle
}
@@ -74,7 +76,6 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext:
HiveThriftServer2.listener.onSessionClosed(sessionHandle.getSessionId.toString)
super.closeSession(sessionHandle)
sparkSqlOperationManager.sessionToActivePool -= sessionHandle
-
- hiveContext.detachSession()
+ sparkSqlOperationManager.sessionToContexts.remove(sessionHandle)
}
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
index c8031ed0f3..476651a559 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
@@ -30,20 +30,21 @@ import org.apache.spark.sql.hive.thriftserver.{SparkExecuteStatementOperation, R
/**
* Executes queries using Spark SQL, and maintains a list of handles to active queries.
*/
-private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext)
+private[thriftserver] class SparkSQLOperationManager()
extends OperationManager with Logging {
val handleToOperation = ReflectionUtils
.getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")
val sessionToActivePool = Map[SessionHandle, String]()
+ val sessionToContexts = Map[SessionHandle, HiveContext]()
override def newExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String],
async: Boolean): ExecuteStatementOperation = synchronized {
-
+ val hiveContext = sessionToContexts(parentSession.getSessionHandle)
val runInBackground = async && hiveContext.hiveThriftServerAsync
val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
runInBackground)(hiveContext, sessionToActivePool)