diff options
Diffstat (limited to 'sql/hive-thriftserver/src/main')
2 files changed, 9 insertions, 3 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index 6b3275b4ea..89e9ede726 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -27,11 +27,14 @@ import org.apache.hive.service.cli.session.SessionManager import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager +import org.apache.hive.service.cli.SessionHandle private[hive] class SparkSQLSessionManager(hiveContext: HiveContext) extends SessionManager with ReflectedCompositeService { + private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext) + override def init(hiveConf: HiveConf) { setSuperField(this, "hiveConf", hiveConf) @@ -40,10 +43,14 @@ private[hive] class SparkSQLSessionManager(hiveContext: HiveContext) getAncestorField[Log](this, 3, "LOG").info( s"HiveServer2: Async execution pool size $backgroundPoolSize") - val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext) setSuperField(this, "operationManager", sparkSqlOperationManager) addService(sparkSqlOperationManager) initCompositeService(hiveConf) } + + override def closeSession(sessionHandle: SessionHandle) { + super.closeSession(sessionHandle) + sparkSqlOperationManager.sessionToActivePool -= sessionHandle + } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 99c4f46a82..9c0bf02391 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -36,8 +36,7 @@ private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext) val handleToOperation = ReflectionUtils .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation") - // TODO: Currenlty this will grow infinitely, even as sessions expire - val sessionToActivePool = Map[HiveSession, String]() + val sessionToActivePool = Map[SessionHandle, String]() override def newExecuteStatementOperation( parentSession: HiveSession, |