From 22ddb6e0338f4d101389a0655424a8fde6c4cff4 Mon Sep 17 00:00:00 2001 From: guowei2 Date: Thu, 18 Dec 2014 20:10:23 -0800 Subject: [SPARK-4756][SQL] FIX: sessionToActivePool grow infinitely, even as sessions expire **sessionToActivePool** in **SparkSQLOperationManager** grow infinitely, even as sessions expire. we should remove the pool value when the session closed, even though **sessionToActivePool** would not exist in all of sessions. Author: guowei2 Closes #3617 from guowei2/SPARK-4756 and squashes the following commits: e9b97b8 [guowei2] fix compile bug with Shim12 cf0f521 [guowei2] Merge remote-tracking branch 'apache/master' into SPARK-4756 e070998 [guowei2] fix: remove active pool of the session when it expired --- .../spark/sql/hive/thriftserver/SparkSQLSessionManager.scala | 9 ++++++++- .../sql/hive/thriftserver/server/SparkSQLOperationManager.scala | 3 +-- .../scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala | 6 +++--- .../scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala | 6 +++--- 4 files changed, 15 insertions(+), 9 deletions(-) (limited to 'sql') diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index 6b3275b4ea..89e9ede726 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -27,11 +27,14 @@ import org.apache.hive.service.cli.session.SessionManager import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager +import org.apache.hive.service.cli.SessionHandle private[hive] class SparkSQLSessionManager(hiveContext: HiveContext) extends SessionManager with ReflectedCompositeService { + private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext) + override def init(hiveConf: HiveConf) { setSuperField(this, "hiveConf", hiveConf) @@ -40,10 +43,14 @@ private[hive] class SparkSQLSessionManager(hiveContext: HiveContext) getAncestorField[Log](this, 3, "LOG").info( s"HiveServer2: Async execution pool size $backgroundPoolSize") - val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext) setSuperField(this, "operationManager", sparkSqlOperationManager) addService(sparkSqlOperationManager) initCompositeService(hiveConf) } + + override def closeSession(sessionHandle: SessionHandle) { + super.closeSession(sessionHandle) + sparkSqlOperationManager.sessionToActivePool -= sessionHandle + } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 99c4f46a82..9c0bf02391 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -36,8 +36,7 @@ private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext) val handleToOperation = ReflectionUtils .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation") - // TODO: Currenlty this will grow infinitely, even as sessions expire - val sessionToActivePool = Map[HiveSession, String]() + val sessionToActivePool = Map[SessionHandle, String]() override def newExecuteStatementOperation( parentSession: HiveSession, diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala index 9258ad0cdf..5550183621 100644 --- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala +++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala @@ -69,7 +69,7 @@ private[hive] class SparkExecuteStatementOperation( statement: String, confOverlay: JMap[String, String])( hiveContext: HiveContext, - sessionToActivePool: SMap[HiveSession, String]) + sessionToActivePool: SMap[SessionHandle, String]) extends ExecuteStatementOperation(parentSession, statement, confOverlay) with Logging { private var result: SchemaRDD = _ @@ -191,14 +191,14 @@ private[hive] class SparkExecuteStatementOperation( logDebug(result.queryExecution.toString()) result.queryExecution.logical match { case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) => - sessionToActivePool(parentSession) = value + sessionToActivePool(parentSession.getSessionHandle) = value logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.") case _ => } val groupId = round(random * 1000000).toString hiveContext.sparkContext.setJobGroup(groupId, statement) - sessionToActivePool.get(parentSession).foreach { pool => + sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool => hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) } iter = { diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala index 17f1ad3e46..798a690a20 100644 --- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala +++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala @@ -69,7 +69,7 @@ private[hive] class SparkExecuteStatementOperation( confOverlay: JMap[String, String], runInBackground: Boolean = true)( hiveContext: HiveContext, - sessionToActivePool: SMap[HiveSession, String]) + sessionToActivePool: SMap[SessionHandle, String]) // NOTE: `runInBackground` is set to `false` intentionally to disable asynchronous execution extends ExecuteStatementOperation(parentSession, statement, confOverlay, false) with Logging { @@ -162,14 +162,14 @@ private[hive] class SparkExecuteStatementOperation( logDebug(result.queryExecution.toString()) result.queryExecution.logical match { case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) => - sessionToActivePool(parentSession) = value + sessionToActivePool(parentSession.getSessionHandle) = value logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.") case _ => } val groupId = round(random * 1000000).toString hiveContext.sparkContext.setJobGroup(groupId, statement) - sessionToActivePool.get(parentSession).foreach { pool => + sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool => hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) } iter = { -- cgit v1.2.3