aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorguowei2 <guowei2@asiainfo.com>2014-12-18 20:10:23 -0800
committerMichael Armbrust <michael@databricks.com>2014-12-18 20:10:23 -0800
commit22ddb6e0338f4d101389a0655424a8fde6c4cff4 (patch)
tree6d6f15457970d5f1db8c31cf30e149834981bca8 /sql
parentb68bc6d2647f8a5caf8aa558e4115f9cc254f67c (diff)
downloadspark-22ddb6e0338f4d101389a0655424a8fde6c4cff4.tar.gz
spark-22ddb6e0338f4d101389a0655424a8fde6c4cff4.tar.bz2
spark-22ddb6e0338f4d101389a0655424a8fde6c4cff4.zip
[SPARK-4756][SQL] FIX: sessionToActivePool grow infinitely, even as sessions expire
**sessionToActivePool** in **SparkSQLOperationManager** grow infinitely, even as sessions expire. we should remove the pool value when the session closed, even though **sessionToActivePool** would not exist in all of sessions. Author: guowei2 <guowei2@asiainfo.com> Closes #3617 from guowei2/SPARK-4756 and squashes the following commits: e9b97b8 [guowei2] fix compile bug with Shim12 cf0f521 [guowei2] Merge remote-tracking branch 'apache/master' into SPARK-4756 e070998 [guowei2] fix: remove active pool of the session when it expired
Diffstat (limited to 'sql')
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala9
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala3
-rw-r--r--sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala6
-rw-r--r--sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala6
4 files changed, 15 insertions, 9 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
index 6b3275b4ea..89e9ede726 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -27,11 +27,14 @@ import org.apache.hive.service.cli.session.SessionManager
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
+import org.apache.hive.service.cli.SessionHandle
private[hive] class SparkSQLSessionManager(hiveContext: HiveContext)
extends SessionManager
with ReflectedCompositeService {
+ private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext)
+
override def init(hiveConf: HiveConf) {
setSuperField(this, "hiveConf", hiveConf)
@@ -40,10 +43,14 @@ private[hive] class SparkSQLSessionManager(hiveContext: HiveContext)
getAncestorField[Log](this, 3, "LOG").info(
s"HiveServer2: Async execution pool size $backgroundPoolSize")
- val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext)
setSuperField(this, "operationManager", sparkSqlOperationManager)
addService(sparkSqlOperationManager)
initCompositeService(hiveConf)
}
+
+ override def closeSession(sessionHandle: SessionHandle) {
+ super.closeSession(sessionHandle)
+ sparkSqlOperationManager.sessionToActivePool -= sessionHandle
+ }
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
index 99c4f46a82..9c0bf02391 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
@@ -36,8 +36,7 @@ private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext)
val handleToOperation = ReflectionUtils
.getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")
- // TODO: Currenlty this will grow infinitely, even as sessions expire
- val sessionToActivePool = Map[HiveSession, String]()
+ val sessionToActivePool = Map[SessionHandle, String]()
override def newExecuteStatementOperation(
parentSession: HiveSession,
diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
index 9258ad0cdf..5550183621 100644
--- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
+++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
@@ -69,7 +69,7 @@ private[hive] class SparkExecuteStatementOperation(
statement: String,
confOverlay: JMap[String, String])(
hiveContext: HiveContext,
- sessionToActivePool: SMap[HiveSession, String])
+ sessionToActivePool: SMap[SessionHandle, String])
extends ExecuteStatementOperation(parentSession, statement, confOverlay) with Logging {
private var result: SchemaRDD = _
@@ -191,14 +191,14 @@ private[hive] class SparkExecuteStatementOperation(
logDebug(result.queryExecution.toString())
result.queryExecution.logical match {
case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) =>
- sessionToActivePool(parentSession) = value
+ sessionToActivePool(parentSession.getSessionHandle) = value
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
case _ =>
}
val groupId = round(random * 1000000).toString
hiveContext.sparkContext.setJobGroup(groupId, statement)
- sessionToActivePool.get(parentSession).foreach { pool =>
+ sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool =>
hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
iter = {
diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
index 17f1ad3e46..798a690a20 100644
--- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
+++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
@@ -69,7 +69,7 @@ private[hive] class SparkExecuteStatementOperation(
confOverlay: JMap[String, String],
runInBackground: Boolean = true)(
hiveContext: HiveContext,
- sessionToActivePool: SMap[HiveSession, String])
+ sessionToActivePool: SMap[SessionHandle, String])
// NOTE: `runInBackground` is set to `false` intentionally to disable asynchronous execution
extends ExecuteStatementOperation(parentSession, statement, confOverlay, false) with Logging {
@@ -162,14 +162,14 @@ private[hive] class SparkExecuteStatementOperation(
logDebug(result.queryExecution.toString())
result.queryExecution.logical match {
case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) =>
- sessionToActivePool(parentSession) = value
+ sessionToActivePool(parentSession.getSessionHandle) = value
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
case _ =>
}
val groupId = round(random * 1000000).toString
hiveContext.sparkContext.setJobGroup(groupId, statement)
- sessionToActivePool.get(parentSession).foreach { pool =>
+ sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool =>
hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
iter = {