aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver
diff options
context:
space:
mode:
authorhuangzhaowei <carlmartinmax@gmail.com>2016-08-11 11:28:28 +0100
committerSean Owen <sowen@cloudera.com>2016-08-11 11:28:28 +0100
commita45fefd17ec4a499b988a2f9931ce397918d3bef (patch)
tree58b6ee26985c565327d56a8614d4574583fee866 /sql/hive-thriftserver
parent8a6b7037bb058d00cc767895c3292509576ea2f9 (diff)
downloadspark-a45fefd17ec4a499b988a2f9931ce397918d3bef.tar.gz
spark-a45fefd17ec4a499b988a2f9931ce397918d3bef.tar.bz2
spark-a45fefd17ec4a499b988a2f9931ce397918d3bef.zip
[SPARK-16941] Use concurrentHashMap instead of scala Map in SparkSQLOperationManager.
## What changes were proposed in this pull request? ThriftServer will have some thread-safe problem in **SparkSQLOperationManager**. Add a SynchronizedMap trait for the maps in it to avoid this problem. Details in [SPARK-16941](https://issues.apache.org/jira/browse/SPARK-16941) ## How was this patch tested? NA Author: huangzhaowei <carlmartinmax@gmail.com> Closes #14534 from SaintBacchus/SPARK-16941.
Diffstat (limited to 'sql/hive-thriftserver')
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala9
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala4
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala11
3 files changed, 13 insertions, 11 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index b2717ec54e..e555ebd623 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -23,7 +23,7 @@ import java.util.{Arrays, Map => JMap, UUID}
import java.util.concurrent.RejectedExecutionException
import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, Map => SMap}
+import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
import org.apache.hadoop.hive.metastore.api.FieldSchema
@@ -45,7 +45,7 @@ private[hive] class SparkExecuteStatementOperation(
statement: String,
confOverlay: JMap[String, String],
runInBackground: Boolean = true)
- (sqlContext: SQLContext, sessionToActivePool: SMap[SessionHandle, String])
+ (sqlContext: SQLContext, sessionToActivePool: JMap[SessionHandle, String])
extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground)
with Logging {
@@ -215,7 +215,8 @@ private[hive] class SparkExecuteStatementOperation(
statementId,
parentSession.getUsername)
sqlContext.sparkContext.setJobGroup(statementId, statement)
- sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool =>
+ val pool = sessionToActivePool.get(parentSession.getSessionHandle)
+ if (pool != null) {
sqlContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
try {
@@ -223,7 +224,7 @@ private[hive] class SparkExecuteStatementOperation(
logDebug(result.queryExecution.toString())
result.queryExecution.logical match {
case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) =>
- sessionToActivePool(parentSession.getSessionHandle) = value
+ sessionToActivePool.put(parentSession.getSessionHandle, value)
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
case _ =>
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
index 1e4c479085..6a5117aea4 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -79,14 +79,14 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext:
sqlContext.newSession()
}
ctx.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion)
- sparkSqlOperationManager.sessionToContexts += sessionHandle -> ctx
+ sparkSqlOperationManager.sessionToContexts.put(sessionHandle, ctx)
sessionHandle
}
override def closeSession(sessionHandle: SessionHandle) {
HiveThriftServer2.listener.onSessionClosed(sessionHandle.getSessionId.toString)
super.closeSession(sessionHandle)
- sparkSqlOperationManager.sessionToActivePool -= sessionHandle
+ sparkSqlOperationManager.sessionToActivePool.remove(sessionHandle)
sparkSqlOperationManager.sessionToContexts.remove(sessionHandle)
}
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
index 79625239de..49ab664009 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
@@ -18,8 +18,7 @@
package org.apache.spark.sql.hive.thriftserver.server
import java.util.{Map => JMap}
-
-import scala.collection.mutable.Map
+import java.util.concurrent.ConcurrentHashMap
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operation, OperationManager}
@@ -39,15 +38,17 @@ private[thriftserver] class SparkSQLOperationManager()
val handleToOperation = ReflectionUtils
.getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")
- val sessionToActivePool = Map[SessionHandle, String]()
- val sessionToContexts = Map[SessionHandle, SQLContext]()
+ val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]()
+ val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]()
override def newExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String],
async: Boolean): ExecuteStatementOperation = synchronized {
- val sqlContext = sessionToContexts(parentSession.getSessionHandle)
+ val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
+ require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
+ s" initialized or had already closed.")
val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
val runInBackground = async && sessionState.hiveThriftServerAsync
val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,