aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-10-08 17:34:24 -0700
committerDavies Liu <davies.liu@gmail.com>2015-10-08 17:34:24 -0700
commit3390b400d04e40f767d8a51f1078fcccb4e64abd (patch)
treed48ed36a14abf0b15467c9ae9c7c04933fdd3a19 /sql/hive-thriftserver
parent84ea287178247c163226e835490c9c70b17d8d3b (diff)
downloadspark-3390b400d04e40f767d8a51f1078fcccb4e64abd.tar.gz
spark-3390b400d04e40f767d8a51f1078fcccb4e64abd.tar.bz2
spark-3390b400d04e40f767d8a51f1078fcccb4e64abd.zip
[SPARK-10810] [SPARK-10902] [SQL] Improve session management in SQL
This PR improve the sessions management by replacing the thread-local based to one SQLContext per session approach, introduce separated temporary tables and UDFs/UDAFs for each session. A new session of SQLContext could be created by: 1) create an new SQLContext 2) call newSession() on existing SQLContext For HiveContext, in order to reduce the cost for each session, the classloader and Hive client are shared across multiple sessions (created by newSession). CacheManager is also shared by multiple sessions, so cache a table multiple times in different sessions will not cause multiple copies of in-memory cache. Added jars are still shared by all the sessions, because SparkContext does not support sessions. cc marmbrus yhuai rxin Author: Davies Liu <davies@databricks.com> Closes #8909 from davies/sessions.
Diffstat (limited to 'sql/hive-thriftserver')
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala76
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala9
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala5
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala8
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala76
5 files changed, 63 insertions, 111 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 306f98bcb5..719b03e1c7 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -20,19 +20,15 @@ package org.apache.spark.sql.hive.thriftserver
import java.security.PrivilegedExceptionAction
import java.sql.{Date, Timestamp}
import java.util.concurrent.RejectedExecutionException
-import java.util.{Arrays, Map => JMap, UUID}
+import java.util.{Arrays, UUID, Map => JMap}
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, Map => SMap}
import scala.util.control.NonFatal
-import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.hive.service.cli._
-import org.apache.hadoop.hive.ql.metadata.Hive
-import org.apache.hadoop.hive.ql.metadata.HiveException
-import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.shims.Utils
+import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.ExecuteStatementOperation
import org.apache.hive.service.cli.session.HiveSession
@@ -40,7 +36,7 @@ import org.apache.spark.Logging
import org.apache.spark.sql.execution.SetCommand
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLConf}
+import org.apache.spark.sql.{DataFrame, SQLConf, Row => SparkRow}
private[hive] class SparkExecuteStatementOperation(
@@ -143,30 +139,15 @@ private[hive] class SparkExecuteStatementOperation(
if (!runInBackground) {
runInternal()
} else {
- val parentSessionState = SessionState.get()
- val hiveConf = getConfigForOperation()
val sparkServiceUGI = Utils.getUGI()
- val sessionHive = getCurrentHive()
- val currentSqlSession = hiveContext.currentSession
// Runnable impl to call runInternal asynchronously,
// from a different thread
val backgroundOperation = new Runnable() {
override def run(): Unit = {
- val doAsAction = new PrivilegedExceptionAction[Object]() {
- override def run(): Object = {
-
- // User information is part of the metastore client member in Hive
- hiveContext.setSession(currentSqlSession)
- // Always use the latest class loader provided by executionHive's state.
- val executionHiveClassLoader =
- hiveContext.executionHive.state.getConf.getClassLoader
- sessionHive.getConf.setClassLoader(executionHiveClassLoader)
- parentSessionState.getConf.setClassLoader(executionHiveClassLoader)
-
- Hive.set(sessionHive)
- SessionState.setCurrentSessionState(parentSessionState)
+ val doAsAction = new PrivilegedExceptionAction[Unit]() {
+ override def run(): Unit = {
try {
runInternal()
} catch {
@@ -174,7 +155,6 @@ private[hive] class SparkExecuteStatementOperation(
setOperationException(e)
log.error("Error running hive query: ", e)
}
- return null
}
}
@@ -191,7 +171,7 @@ private[hive] class SparkExecuteStatementOperation(
try {
// This submit blocks if no background threads are available to run this operation
val backgroundHandle =
- getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation)
+ parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
@@ -210,6 +190,11 @@ private[hive] class SparkExecuteStatementOperation(
statementId = UUID.randomUUID().toString
logInfo(s"Running query '$statement' with $statementId")
setState(OperationState.RUNNING)
+ // Always use the latest class loader provided by executionHive's state.
+ val executionHiveClassLoader =
+ hiveContext.executionHive.state.getConf.getClassLoader
+ Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
+
HiveThriftServer2.listener.onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
@@ -279,43 +264,4 @@ private[hive] class SparkExecuteStatementOperation(
}
}
}
-
- /**
- * If there are query specific settings to overlay, then create a copy of config
- * There are two cases we need to clone the session config that's being passed to hive driver
- * 1. Async query -
- * If the client changes a config setting, that shouldn't reflect in the execution
- * already underway
- * 2. confOverlay -
- * The query specific settings should only be applied to the query config and not session
- * @return new configuration
- * @throws HiveSQLException
- */
- private def getConfigForOperation(): HiveConf = {
- var sqlOperationConf = getParentSession().getHiveConf()
- if (!getConfOverlay().isEmpty() || runInBackground) {
- // clone the partent session config for this query
- sqlOperationConf = new HiveConf(sqlOperationConf)
-
- // apply overlay query specific settings, if any
- getConfOverlay().asScala.foreach { case (k, v) =>
- try {
- sqlOperationConf.verifyAndSet(k, v)
- } catch {
- case e: IllegalArgumentException =>
- throw new HiveSQLException("Error applying statement specific settings", e)
- }
- }
- }
- return sqlOperationConf
- }
-
- private def getCurrentHive(): Hive = {
- try {
- return Hive.get()
- } catch {
- case e: HiveException =>
- throw new HiveSQLException("Failed to get current Hive object", e);
- }
- }
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
index 92ac0ec3fc..33aaead3fb 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -36,7 +36,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext:
extends SessionManager(hiveServer)
with ReflectedCompositeService {
- private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext)
+ private lazy val sparkSqlOperationManager = new SparkSQLOperationManager()
override def init(hiveConf: HiveConf) {
setSuperField(this, "hiveConf", hiveConf)
@@ -60,13 +60,15 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext:
sessionConf: java.util.Map[String, String],
withImpersonation: Boolean,
delegationToken: String): SessionHandle = {
- hiveContext.openSession()
val sessionHandle =
super.openSession(protocol, username, passwd, ipAddress, sessionConf, withImpersonation,
delegationToken)
val session = super.getSession(sessionHandle)
HiveThriftServer2.listener.onSessionCreated(
session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername)
+ val ctx = hiveContext.newSession()
+ ctx.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)
+ sparkSqlOperationManager.sessionToContexts += sessionHandle -> ctx
sessionHandle
}
@@ -74,7 +76,6 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext:
HiveThriftServer2.listener.onSessionClosed(sessionHandle.getSessionId.toString)
super.closeSession(sessionHandle)
sparkSqlOperationManager.sessionToActivePool -= sessionHandle
-
- hiveContext.detachSession()
+ sparkSqlOperationManager.sessionToContexts.remove(sessionHandle)
}
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
index c8031ed0f3..476651a559 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
@@ -30,20 +30,21 @@ import org.apache.spark.sql.hive.thriftserver.{SparkExecuteStatementOperation, R
/**
* Executes queries using Spark SQL, and maintains a list of handles to active queries.
*/
-private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext)
+private[thriftserver] class SparkSQLOperationManager()
extends OperationManager with Logging {
val handleToOperation = ReflectionUtils
.getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")
val sessionToActivePool = Map[SessionHandle, String]()
+ val sessionToContexts = Map[SessionHandle, HiveContext]()
override def newExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String],
async: Boolean): ExecuteStatementOperation = synchronized {
-
+ val hiveContext = sessionToContexts(parentSession.getSessionHandle)
val runInBackground = async && hiveContext.hiveThriftServerAsync
val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
runInBackground)(hiveContext, sessionToActivePool)
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index e59a14ec00..76d1591a23 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -96,7 +96,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging {
buffer += s"${new Timestamp(new Date().getTime)} - $source> $line"
// If we haven't found all expected answers and another expected answer comes up...
- if (next < expectedAnswers.size && line.startsWith(expectedAnswers(next))) {
+ if (next < expectedAnswers.size && line.contains(expectedAnswers(next))) {
next += 1
// If all expected answers have been found...
if (next == expectedAnswers.size) {
@@ -159,7 +159,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging {
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE hive_test;"
-> "OK",
"CACHE TABLE hive_test;"
- -> "Time taken: ",
+ -> "",
"SELECT COUNT(*) FROM hive_test;"
-> "5",
"DROP TABLE hive_test;"
@@ -180,7 +180,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging {
"CREATE TABLE hive_test(key INT, val STRING);"
-> "OK",
"SHOW TABLES;"
- -> "Time taken: "
+ -> "hive_test"
)
runCliWithin(2.minute, Seq("--database", "hive_test_db", "-e", "SHOW TABLES;"))(
@@ -210,7 +210,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging {
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE sourceTable;"
-> "OK",
"INSERT INTO TABLE t1 SELECT key, val FROM sourceTable;"
- -> "Time taken:",
+ -> "",
"SELECT count(key) FROM t1;"
-> "5",
"DROP TABLE t1;"
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 19b2f24456..ff8ca01506 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -205,6 +205,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
import org.apache.spark.sql.SQLConf
var defaultV1: String = null
var defaultV2: String = null
+ var data: ArrayBuffer[Int] = null
withMultipleConnectionJdbcStatement(
// create table
@@ -214,10 +215,16 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
"DROP TABLE IF EXISTS test_map",
"CREATE TABLE test_map(key INT, value STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map",
- "CACHE TABLE test_table AS SELECT key FROM test_map ORDER BY key DESC")
+ "CACHE TABLE test_table AS SELECT key FROM test_map ORDER BY key DESC",
+ "CREATE DATABASE db1")
queries.foreach(statement.execute)
+ val plan = statement.executeQuery("explain select * from test_table")
+ plan.next()
+ plan.next()
+ assert(plan.getString(1).contains("InMemoryColumnarTableScan"))
+
val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC")
val buf1 = new collection.mutable.ArrayBuffer[Int]()
while (rs1.next()) {
@@ -233,6 +240,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
rs2.close()
assert(buf1 === buf2)
+
+ data = buf1
},
// first session, we get the default value of the session status
@@ -289,56 +298,51 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
rs2.close()
},
- // accessing the cached data in another session
+ // try to access the cached data in another session
{ statement =>
- val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC")
- val buf1 = new collection.mutable.ArrayBuffer[Int]()
- while (rs1.next()) {
- buf1 += rs1.getInt(1)
+ // Cached temporary table can't be accessed by other sessions
+ intercept[SQLException] {
+ statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC")
}
- rs1.close()
- val rs2 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
- val buf2 = new collection.mutable.ArrayBuffer[Int]()
- while (rs2.next()) {
- buf2 += rs2.getInt(1)
+ val plan = statement.executeQuery("explain select key from test_map ORDER BY key DESC")
+ plan.next()
+ plan.next()
+ assert(plan.getString(1).contains("InMemoryColumnarTableScan"))
+
+ val rs = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
+ val buf = new collection.mutable.ArrayBuffer[Int]()
+ while (rs.next()) {
+ buf += rs.getInt(1)
}
- rs2.close()
+ rs.close()
+ assert(buf === data)
+ },
- assert(buf1 === buf2)
- statement.executeQuery("UNCACHE TABLE test_table")
+ // switch another database
+ { statement =>
+ statement.execute("USE db1")
- // TODO need to figure out how to determine if the data loaded from cache
- val rs3 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
- val buf3 = new collection.mutable.ArrayBuffer[Int]()
- while (rs3.next()) {
- buf3 += rs3.getInt(1)
+ // there is no test_map table in db1
+ intercept[SQLException] {
+ statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
}
- rs3.close()
- assert(buf1 === buf3)
+ statement.execute("CREATE TABLE test_map2(key INT, value STRING)")
},
- // accessing the uncached table
+ // access default database
{ statement =>
- // TODO need to figure out how to determine if the data loaded from cache
- val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC")
- val buf1 = new collection.mutable.ArrayBuffer[Int]()
- while (rs1.next()) {
- buf1 += rs1.getInt(1)
- }
- rs1.close()
-
- val rs2 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
- val buf2 = new collection.mutable.ArrayBuffer[Int]()
- while (rs2.next()) {
- buf2 += rs2.getInt(1)
+ // current database should still be `default`
+ intercept[SQLException] {
+ statement.executeQuery("SELECT key FROM test_map2")
}
- rs2.close()
- assert(buf1 === buf2)
+ statement.execute("USE db1")
+ // access test_map2
+ statement.executeQuery("SELECT key from test_map2")
}
)
}