aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver/src
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive-thriftserver/src')
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala76
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala9
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala5
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala8
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala76
5 files changed, 63 insertions, 111 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 306f98bcb5..719b03e1c7 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -20,19 +20,15 @@ package org.apache.spark.sql.hive.thriftserver
import java.security.PrivilegedExceptionAction
import java.sql.{Date, Timestamp}
import java.util.concurrent.RejectedExecutionException
-import java.util.{Arrays, Map => JMap, UUID}
+import java.util.{Arrays, UUID, Map => JMap}
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, Map => SMap}
import scala.util.control.NonFatal
-import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.hive.service.cli._
-import org.apache.hadoop.hive.ql.metadata.Hive
-import org.apache.hadoop.hive.ql.metadata.HiveException
-import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.shims.Utils
+import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.ExecuteStatementOperation
import org.apache.hive.service.cli.session.HiveSession
@@ -40,7 +36,7 @@ import org.apache.spark.Logging
import org.apache.spark.sql.execution.SetCommand
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLConf}
+import org.apache.spark.sql.{DataFrame, SQLConf, Row => SparkRow}
private[hive] class SparkExecuteStatementOperation(
@@ -143,30 +139,15 @@ private[hive] class SparkExecuteStatementOperation(
if (!runInBackground) {
runInternal()
} else {
- val parentSessionState = SessionState.get()
- val hiveConf = getConfigForOperation()
val sparkServiceUGI = Utils.getUGI()
- val sessionHive = getCurrentHive()
- val currentSqlSession = hiveContext.currentSession
// Runnable impl to call runInternal asynchronously,
// from a different thread
val backgroundOperation = new Runnable() {
override def run(): Unit = {
- val doAsAction = new PrivilegedExceptionAction[Object]() {
- override def run(): Object = {
-
- // User information is part of the metastore client member in Hive
- hiveContext.setSession(currentSqlSession)
- // Always use the latest class loader provided by executionHive's state.
- val executionHiveClassLoader =
- hiveContext.executionHive.state.getConf.getClassLoader
- sessionHive.getConf.setClassLoader(executionHiveClassLoader)
- parentSessionState.getConf.setClassLoader(executionHiveClassLoader)
-
- Hive.set(sessionHive)
- SessionState.setCurrentSessionState(parentSessionState)
+ val doAsAction = new PrivilegedExceptionAction[Unit]() {
+ override def run(): Unit = {
try {
runInternal()
} catch {
@@ -174,7 +155,6 @@ private[hive] class SparkExecuteStatementOperation(
setOperationException(e)
log.error("Error running hive query: ", e)
}
- return null
}
}
@@ -191,7 +171,7 @@ private[hive] class SparkExecuteStatementOperation(
try {
// This submit blocks if no background threads are available to run this operation
val backgroundHandle =
- getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation)
+ parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
@@ -210,6 +190,11 @@ private[hive] class SparkExecuteStatementOperation(
statementId = UUID.randomUUID().toString
logInfo(s"Running query '$statement' with $statementId")
setState(OperationState.RUNNING)
+ // Always use the latest class loader provided by executionHive's state.
+ val executionHiveClassLoader =
+ hiveContext.executionHive.state.getConf.getClassLoader
+ Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
+
HiveThriftServer2.listener.onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
@@ -279,43 +264,4 @@ private[hive] class SparkExecuteStatementOperation(
}
}
}
-
- /**
- * If there are query specific settings to overlay, then create a copy of config
- * There are two cases we need to clone the session config that's being passed to hive driver
- * 1. Async query -
- * If the client changes a config setting, that shouldn't reflect in the execution
- * already underway
- * 2. confOverlay -
- * The query specific settings should only be applied to the query config and not session
- * @return new configuration
- * @throws HiveSQLException
- */
- private def getConfigForOperation(): HiveConf = {
- var sqlOperationConf = getParentSession().getHiveConf()
- if (!getConfOverlay().isEmpty() || runInBackground) {
- // clone the partent session config for this query
- sqlOperationConf = new HiveConf(sqlOperationConf)
-
- // apply overlay query specific settings, if any
- getConfOverlay().asScala.foreach { case (k, v) =>
- try {
- sqlOperationConf.verifyAndSet(k, v)
- } catch {
- case e: IllegalArgumentException =>
- throw new HiveSQLException("Error applying statement specific settings", e)
- }
- }
- }
- return sqlOperationConf
- }
-
- private def getCurrentHive(): Hive = {
- try {
- return Hive.get()
- } catch {
- case e: HiveException =>
- throw new HiveSQLException("Failed to get current Hive object", e);
- }
- }
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
index 92ac0ec3fc..33aaead3fb 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -36,7 +36,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext:
extends SessionManager(hiveServer)
with ReflectedCompositeService {
- private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext)
+ private lazy val sparkSqlOperationManager = new SparkSQLOperationManager()
override def init(hiveConf: HiveConf) {
setSuperField(this, "hiveConf", hiveConf)
@@ -60,13 +60,15 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext:
sessionConf: java.util.Map[String, String],
withImpersonation: Boolean,
delegationToken: String): SessionHandle = {
- hiveContext.openSession()
val sessionHandle =
super.openSession(protocol, username, passwd, ipAddress, sessionConf, withImpersonation,
delegationToken)
val session = super.getSession(sessionHandle)
HiveThriftServer2.listener.onSessionCreated(
session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername)
+ val ctx = hiveContext.newSession()
+ ctx.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)
+ sparkSqlOperationManager.sessionToContexts += sessionHandle -> ctx
sessionHandle
}
@@ -74,7 +76,6 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext:
HiveThriftServer2.listener.onSessionClosed(sessionHandle.getSessionId.toString)
super.closeSession(sessionHandle)
sparkSqlOperationManager.sessionToActivePool -= sessionHandle
-
- hiveContext.detachSession()
+ sparkSqlOperationManager.sessionToContexts.remove(sessionHandle)
}
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
index c8031ed0f3..476651a559 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
@@ -30,20 +30,21 @@ import org.apache.spark.sql.hive.thriftserver.{SparkExecuteStatementOperation, R
/**
* Executes queries using Spark SQL, and maintains a list of handles to active queries.
*/
-private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext)
+private[thriftserver] class SparkSQLOperationManager()
extends OperationManager with Logging {
val handleToOperation = ReflectionUtils
.getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")
val sessionToActivePool = Map[SessionHandle, String]()
+ val sessionToContexts = Map[SessionHandle, HiveContext]()
override def newExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String],
async: Boolean): ExecuteStatementOperation = synchronized {
-
+ val hiveContext = sessionToContexts(parentSession.getSessionHandle)
val runInBackground = async && hiveContext.hiveThriftServerAsync
val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
runInBackground)(hiveContext, sessionToActivePool)
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index e59a14ec00..76d1591a23 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -96,7 +96,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging {
buffer += s"${new Timestamp(new Date().getTime)} - $source> $line"
// If we haven't found all expected answers and another expected answer comes up...
- if (next < expectedAnswers.size && line.startsWith(expectedAnswers(next))) {
+ if (next < expectedAnswers.size && line.contains(expectedAnswers(next))) {
next += 1
// If all expected answers have been found...
if (next == expectedAnswers.size) {
@@ -159,7 +159,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging {
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE hive_test;"
-> "OK",
"CACHE TABLE hive_test;"
- -> "Time taken: ",
+ -> "",
"SELECT COUNT(*) FROM hive_test;"
-> "5",
"DROP TABLE hive_test;"
@@ -180,7 +180,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging {
"CREATE TABLE hive_test(key INT, val STRING);"
-> "OK",
"SHOW TABLES;"
- -> "Time taken: "
+ -> "hive_test"
)
runCliWithin(2.minute, Seq("--database", "hive_test_db", "-e", "SHOW TABLES;"))(
@@ -210,7 +210,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging {
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE sourceTable;"
-> "OK",
"INSERT INTO TABLE t1 SELECT key, val FROM sourceTable;"
- -> "Time taken:",
+ -> "",
"SELECT count(key) FROM t1;"
-> "5",
"DROP TABLE t1;"
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 19b2f24456..ff8ca01506 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -205,6 +205,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
import org.apache.spark.sql.SQLConf
var defaultV1: String = null
var defaultV2: String = null
+ var data: ArrayBuffer[Int] = null
withMultipleConnectionJdbcStatement(
// create table
@@ -214,10 +215,16 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
"DROP TABLE IF EXISTS test_map",
"CREATE TABLE test_map(key INT, value STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map",
- "CACHE TABLE test_table AS SELECT key FROM test_map ORDER BY key DESC")
+ "CACHE TABLE test_table AS SELECT key FROM test_map ORDER BY key DESC",
+ "CREATE DATABASE db1")
queries.foreach(statement.execute)
+ val plan = statement.executeQuery("explain select * from test_table")
+ plan.next()
+ plan.next()
+ assert(plan.getString(1).contains("InMemoryColumnarTableScan"))
+
val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC")
val buf1 = new collection.mutable.ArrayBuffer[Int]()
while (rs1.next()) {
@@ -233,6 +240,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
rs2.close()
assert(buf1 === buf2)
+
+ data = buf1
},
// first session, we get the default value of the session status
@@ -289,56 +298,51 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
rs2.close()
},
- // accessing the cached data in another session
+ // try to access the cached data in another session
{ statement =>
- val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC")
- val buf1 = new collection.mutable.ArrayBuffer[Int]()
- while (rs1.next()) {
- buf1 += rs1.getInt(1)
+ // Cached temporary table can't be accessed by other sessions
+ intercept[SQLException] {
+ statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC")
}
- rs1.close()
- val rs2 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
- val buf2 = new collection.mutable.ArrayBuffer[Int]()
- while (rs2.next()) {
- buf2 += rs2.getInt(1)
+ val plan = statement.executeQuery("explain select key from test_map ORDER BY key DESC")
+ plan.next()
+ plan.next()
+ assert(plan.getString(1).contains("InMemoryColumnarTableScan"))
+
+ val rs = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
+ val buf = new collection.mutable.ArrayBuffer[Int]()
+ while (rs.next()) {
+ buf += rs.getInt(1)
}
- rs2.close()
+ rs.close()
+ assert(buf === data)
+ },
- assert(buf1 === buf2)
- statement.executeQuery("UNCACHE TABLE test_table")
+ // switch another database
+ { statement =>
+ statement.execute("USE db1")
- // TODO need to figure out how to determine if the data loaded from cache
- val rs3 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
- val buf3 = new collection.mutable.ArrayBuffer[Int]()
- while (rs3.next()) {
- buf3 += rs3.getInt(1)
+ // there is no test_map table in db1
+ intercept[SQLException] {
+ statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
}
- rs3.close()
- assert(buf1 === buf3)
+ statement.execute("CREATE TABLE test_map2(key INT, value STRING)")
},
- // accessing the uncached table
+ // access default database
{ statement =>
- // TODO need to figure out how to determine if the data loaded from cache
- val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC")
- val buf1 = new collection.mutable.ArrayBuffer[Int]()
- while (rs1.next()) {
- buf1 += rs1.getInt(1)
- }
- rs1.close()
-
- val rs2 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
- val buf2 = new collection.mutable.ArrayBuffer[Int]()
- while (rs2.next()) {
- buf2 += rs2.getInt(1)
+ // current database should still be `default`
+ intercept[SQLException] {
+ statement.executeQuery("SELECT key FROM test_map2")
}
- rs2.close()
- assert(buf1 === buf2)
+ statement.execute("USE db1")
+ // access test_map2
+ statement.executeQuery("SELECT key from test_map2")
}
)
}