diff options
author | huangzhaowei <carlmartinmax@gmail.com> | 2015-07-09 19:31:31 -0700 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2015-07-09 19:31:31 -0700 |
commit | 1903641e68ce7e7e657584bf45e91db6df357e41 (patch) | |
tree | 296b374952fc877cf5c3aee25640f497fde996ed /sql/hive-thriftserver/src/main/scala | |
parent | 2727304660663fcf1e41f7b666978c1443262e4e (diff) | |
download | spark-1903641e68ce7e7e657584bf45e91db6df357e41.tar.gz spark-1903641e68ce7e7e657584bf45e91db6df357e41.tar.bz2 spark-1903641e68ce7e7e657584bf45e91db6df357e41.zip |
[SPARK-8839] [SQL] ThriftServer2 will remove session and execution no matter it's finished or not.
In my test, `sessions` and `executions` in ThriftServer2 is not the same number as the connection number.
For example, if there are 200 clients connecting to the server, but it will have more than 200 `sessions` and `executions`.
So if it reaches the `retainedStatements`, it has to remove some object which is not finished.
So it may cause the exception described in [Jira Address](https://issues.apache.org/jira/browse/SPARK-8839)
Author: huangzhaowei <carlmartinmax@gmail.com>
Closes #7239 from SaintBacchus/SPARK-8839 and squashes the following commits:
cf7ef40 [huangzhaowei] Remove the a meanless funciton call
3e9a5a6 [huangzhaowei] Add a filter before take
9d5ceb8 [huangzhaowei] [SPARK-8839][SQL]ThriftServer2 will remove session and execution no matter it's finished or not.
Diffstat (limited to 'sql/hive-thriftserver/src/main/scala')
-rw-r--r-- | sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala | 7 |
1 files changed, 5 insertions, 2 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 700d994bb6..b7db80d93f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -179,6 +179,7 @@ object HiveThriftServer2 extends Logging { def onSessionClosed(sessionId: String): Unit = { sessionList(sessionId).finishTimestamp = System.currentTimeMillis onlineSessionNum -= 1 + trimSessionIfNecessary() } def onStatementStart( @@ -206,18 +207,20 @@ object HiveThriftServer2 extends Logging { executionList(id).detail = errorMessage executionList(id).state = ExecutionState.FAILED totalRunning -= 1 + trimExecutionIfNecessary() } def onStatementFinish(id: String): Unit = { executionList(id).finishTimestamp = System.currentTimeMillis executionList(id).state = ExecutionState.FINISHED totalRunning -= 1 + trimExecutionIfNecessary() } private def trimExecutionIfNecessary() = synchronized { if (executionList.size > retainedStatements) { val toRemove = math.max(retainedStatements / 10, 1) - executionList.take(toRemove).foreach { s => + executionList.filter(_._2.finishTimestamp != 0).take(toRemove).foreach { s => executionList.remove(s._1) } } @@ -226,7 +229,7 @@ object HiveThriftServer2 extends Logging { private def trimSessionIfNecessary() = synchronized { if (sessionList.size > retainedSessions) { val toRemove = math.max(retainedSessions / 10, 1) - sessionList.take(toRemove).foreach { s => + sessionList.filter(_._2.finishTimestamp != 0).take(toRemove).foreach { s => sessionList.remove(s._1) } } |