aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive-thriftserver/src/main')
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala9
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala3
2 files changed, 9 insertions, 3 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
index 6b3275b4ea..89e9ede726 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -27,11 +27,14 @@ import org.apache.hive.service.cli.session.SessionManager
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
+import org.apache.hive.service.cli.SessionHandle
private[hive] class SparkSQLSessionManager(hiveContext: HiveContext)
extends SessionManager
with ReflectedCompositeService {
+ private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext)
+
override def init(hiveConf: HiveConf) {
setSuperField(this, "hiveConf", hiveConf)
@@ -40,10 +43,14 @@ private[hive] class SparkSQLSessionManager(hiveContext: HiveContext)
getAncestorField[Log](this, 3, "LOG").info(
s"HiveServer2: Async execution pool size $backgroundPoolSize")
- val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext)
setSuperField(this, "operationManager", sparkSqlOperationManager)
addService(sparkSqlOperationManager)
initCompositeService(hiveConf)
}
+
+ override def closeSession(sessionHandle: SessionHandle) {
+ super.closeSession(sessionHandle)
+ sparkSqlOperationManager.sessionToActivePool -= sessionHandle
+ }
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
index 99c4f46a82..9c0bf02391 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
@@ -36,8 +36,7 @@ private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext)
val handleToOperation = ReflectionUtils
.getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")
- // TODO: Currenlty this will grow infinitely, even as sessions expire
- val sessionToActivePool = Map[HiveSession, String]()
+ val sessionToActivePool = Map[SessionHandle, String]()
override def newExecuteStatementOperation(
parentSession: HiveSession,