aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver
diff options
context:
space:
mode:
authorDong Wang <dong@databricks.com>2015-06-05 17:41:12 -0700
committerYin Huai <yhuai@databricks.com>2015-06-05 17:41:12 -0700
commiteb19d3f75cbd002f7e72ce02017a8de67f562792 (patch)
tree67abe7c0b12822d9bd0f199026a2d080d5387a84 /sql/hive-thriftserver
parent6ebe419f335fcfb66dd3da74baf35eb5b2fc061d (diff)
downloadspark-eb19d3f75cbd002f7e72ce02017a8de67f562792.tar.gz
spark-eb19d3f75cbd002f7e72ce02017a8de67f562792.tar.bz2
spark-eb19d3f75cbd002f7e72ce02017a8de67f562792.zip
[SPARK-6964] [SQL] Support Cancellation in the Thrift Server
Support runInBackground in SparkExecuteStatementOperation, and add cancellation Author: Dong Wang <dong@databricks.com> Closes #6207 from dongwang218/SPARK-6964-jdbc-cancel and squashes the following commits: 687c113 [Dong Wang] fix 100 characters 7bfa2a7 [Dong Wang] fix merge 380480f [Dong Wang] fix for liancheng's comments eb3e385 [Dong Wang] small nit 341885b [Dong Wang] small fix 3d8ebf8 [Dong Wang] add spark.sql.hive.thriftServer.async flag 04142c3 [Dong Wang] set SQLSession for async execution 184ec35 [Dong Wang] keep hive conf 819ae03 [Dong Wang] [SPARK-6964][SQL][WIP] Support Cancellation in the Thrift Server
Diffstat (limited to 'sql/hive-thriftserver')
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala164
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala7
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala42
3 files changed, 197 insertions, 16 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index c0d1266212..e071103df9 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -17,11 +17,23 @@
package org.apache.spark.sql.hive.thriftserver
+import java.security.PrivilegedExceptionAction
import java.sql.{Date, Timestamp}
+import java.util.concurrent.RejectedExecutionException
import java.util.{Map => JMap, UUID}
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ArrayBuffer, Map => SMap}
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hive.service.cli._
+import org.apache.hadoop.hive.ql.metadata.Hive
+import org.apache.hadoop.hive.ql.metadata.HiveException
+import org.apache.hadoop.hive.ql.session.SessionState
+import org.apache.hadoop.hive.shims.ShimLoader
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.hive.service.cli.operation.ExecuteStatementOperation
import org.apache.hive.service.cli.session.HiveSession
@@ -31,8 +43,6 @@ import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLConf}
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, Map => SMap}
private[hive] class SparkExecuteStatementOperation(
parentSession: HiveSession,
@@ -40,17 +50,19 @@ private[hive] class SparkExecuteStatementOperation(
confOverlay: JMap[String, String],
runInBackground: Boolean = true)
(hiveContext: HiveContext, sessionToActivePool: SMap[SessionHandle, String])
- // NOTE: `runInBackground` is set to `false` intentionally to disable asynchronous execution
- extends ExecuteStatementOperation(parentSession, statement, confOverlay, false)
+ extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground)
with Logging {
private var result: DataFrame = _
private var iter: Iterator[SparkRow] = _
private var dataTypes: Array[DataType] = _
+ private var statementId: String = _
def close(): Unit = {
// RDDs will be cleaned automatically upon garbage collection.
- logDebug("CLOSING")
+ hiveContext.sparkContext.clearJobGroup()
+ logDebug(s"CLOSING $statementId")
+ cleanup(OperationState.CLOSED)
}
def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int) {
@@ -114,10 +126,10 @@ private[hive] class SparkExecuteStatementOperation(
}
def getResultSetSchema: TableSchema = {
- logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}")
- if (result.queryExecution.analyzed.output.size == 0) {
+ if (result == null || result.queryExecution.analyzed.output.size == 0) {
new TableSchema(new FieldSchema("Result", "string", "") :: Nil)
} else {
+ logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}")
val schema = result.queryExecution.analyzed.output.map { attr =>
new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
}
@@ -125,9 +137,73 @@ private[hive] class SparkExecuteStatementOperation(
}
}
- def run(): Unit = {
- val statementId = UUID.randomUUID().toString
- logInfo(s"Running query '$statement'")
+ override def run(): Unit = {
+ setState(OperationState.PENDING)
+ setHasResultSet(true) // avoid no resultset for async run
+
+ if (!runInBackground) {
+ runInternal()
+ } else {
+ val parentSessionState = SessionState.get()
+ val hiveConf = getConfigForOperation()
+ val sparkServiceUGI = ShimLoader.getHadoopShims.getUGIForConf(hiveConf)
+ val sessionHive = getCurrentHive()
+ val currentSqlSession = hiveContext.currentSession
+
+ // Runnable impl to call runInternal asynchronously,
+ // from a different thread
+ val backgroundOperation = new Runnable() {
+
+ override def run(): Unit = {
+ val doAsAction = new PrivilegedExceptionAction[Object]() {
+ override def run(): Object = {
+
+ // User information is part of the metastore client member in Hive
+ hiveContext.setSession(currentSqlSession)
+ Hive.set(sessionHive)
+ SessionState.setCurrentSessionState(parentSessionState)
+ try {
+ runInternal()
+ } catch {
+ case e: HiveSQLException =>
+ setOperationException(e)
+ log.error("Error running hive query: ", e)
+ }
+ return null
+ }
+ }
+
+ try {
+ ShimLoader.getHadoopShims().doAs(sparkServiceUGI, doAsAction)
+ } catch {
+ case e: Exception =>
+ setOperationException(new HiveSQLException(e))
+ logError("Error running hive query as user : " +
+ sparkServiceUGI.getShortUserName(), e)
+ }
+ }
+ }
+ try {
+ // This submit blocks if no background threads are available to run this operation
+ val backgroundHandle =
+ getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation)
+ setBackgroundHandle(backgroundHandle)
+ } catch {
+ case rejected: RejectedExecutionException =>
+ setState(OperationState.ERROR)
+ throw new HiveSQLException("The background threadpool cannot accept" +
+ " new task for execution, please retry the operation", rejected)
+ case NonFatal(e) =>
+ logError(s"Error executing query in background", e)
+ setState(OperationState.ERROR)
+ throw e
+ }
+ }
+ }
+
+ private def runInternal(): Unit = {
+ statementId = UUID.randomUUID().toString
+ logInfo(s"Running query '$statement' with $statementId")
setState(OperationState.RUNNING)
HiveThriftServer2.listener.onStatementStart(
statementId,
@@ -159,18 +235,82 @@ private[hive] class SparkExecuteStatementOperation(
}
}
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
- setHasResultSet(true)
} catch {
+ case e: HiveSQLException =>
+ if (getStatus().getState() == OperationState.CANCELED) {
+ return
+ } else {
+ setState(OperationState.ERROR);
+ throw e
+ }
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
+ val currentState = getStatus().getState()
+ logError(s"Error executing query, currentState $currentState, ", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, e.getStackTraceString)
- logError("Error executing query:", e)
throw new HiveSQLException(e.toString)
}
setState(OperationState.FINISHED)
HiveThriftServer2.listener.onStatementFinish(statementId)
}
+
+ override def cancel(): Unit = {
+ logInfo(s"Cancel '$statement' with $statementId")
+ if (statementId != null) {
+ hiveContext.sparkContext.cancelJobGroup(statementId)
+ }
+ cleanup(OperationState.CANCELED)
+ }
+
+ private def cleanup(state: OperationState) {
+ setState(state)
+ if (runInBackground) {
+ val backgroundHandle = getBackgroundHandle()
+ if (backgroundHandle != null) {
+ backgroundHandle.cancel(true)
+ }
+ }
+ }
+
+ /**
+ * If there are query specific settings to overlay, then create a copy of config
+ * There are two cases we need to clone the session config that's being passed to hive driver
+ * 1. Async query -
+ * If the client changes a config setting, that shouldn't reflect in the execution
+ * already underway
+ * 2. confOverlay -
+ * The query specific settings should only be applied to the query config and not session
+ * @return new configuration
+ * @throws HiveSQLException
+ */
+ private def getConfigForOperation(): HiveConf = {
+ var sqlOperationConf = getParentSession().getHiveConf()
+ if (!getConfOverlay().isEmpty() || runInBackground) {
+ // clone the partent session config for this query
+ sqlOperationConf = new HiveConf(sqlOperationConf)
+
+ // apply overlay query specific settings, if any
+ getConfOverlay().foreach { case (k, v) =>
+ try {
+ sqlOperationConf.verifyAndSet(k, v)
+ } catch {
+ case e: IllegalArgumentException =>
+ throw new HiveSQLException("Error applying statement specific settings", e)
+ }
+ }
+ }
+ return sqlOperationConf
+ }
+
+ private def getCurrentHive(): Hive = {
+ try {
+ return Hive.get()
+ } catch {
+ case e: HiveException =>
+ throw new HiveSQLException("Failed to get current Hive object", e);
+ }
+ }
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
index 9c0bf02391..c8031ed0f3 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
@@ -44,9 +44,12 @@ private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext)
confOverlay: JMap[String, String],
async: Boolean): ExecuteStatementOperation = synchronized {
- val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay)(
- hiveContext, sessionToActivePool)
+ val runInBackground = async && hiveContext.hiveThriftServerAsync
+ val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
+ runInBackground)(hiveContext, sessionToActivePool)
handleToOperation.put(operation.getHandle, operation)
+ logDebug(s"Created Operation for $statement with session=$parentSession, " +
+ s"runInBackground=$runInBackground")
operation
}
}
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index f57c7083ea..178bd1f5cb 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -19,11 +19,13 @@ package org.apache.spark.sql.hive.thriftserver
import java.io.File
import java.net.URL
-import java.sql.{Date, DriverManager, Statement}
+import java.nio.charset.StandardCharsets
+import java.sql.{Date, DriverManager, SQLException, Statement}
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
-import scala.concurrent.{Await, Promise}
+import scala.concurrent.{Await, Promise, future}
+import scala.concurrent.ExecutionContext.Implicits.global
import scala.sys.process.{Process, ProcessLogger}
import scala.util.{Random, Try}
@@ -338,6 +340,42 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}
)
}
+
+ test("test jdbc cancel") {
+ withJdbcStatement { statement =>
+ val queries = Seq(
+ "DROP TABLE IF EXISTS test_map",
+ "CREATE TABLE test_map(key INT, value STRING)",
+ s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map")
+
+ queries.foreach(statement.execute)
+
+ val largeJoin = "SELECT COUNT(*) FROM test_map " +
+ List.fill(10)("join test_map").mkString(" ")
+ val f = future { Thread.sleep(100); statement.cancel(); }
+ val e = intercept[SQLException] {
+ statement.executeQuery(largeJoin)
+ }
+ assert(e.getMessage contains "cancelled")
+ Await.result(f, 3.minute)
+
+ // cancel is a noop
+ statement.executeQuery("SET spark.sql.hive.thriftServer.async=false")
+ val sf = future { Thread.sleep(100); statement.cancel(); }
+ val smallJoin = "SELECT COUNT(*) FROM test_map " +
+ List.fill(4)("join test_map").mkString(" ")
+ val rs1 = statement.executeQuery(smallJoin)
+ Await.result(sf, 3.minute)
+ rs1.next()
+ assert(rs1.getInt(1) === math.pow(5, 5))
+ rs1.close()
+
+ val rs2 = statement.executeQuery("SELECT COUNT(*) FROM test_map")
+ rs2.next()
+ assert(rs2.getInt(1) === 5)
+ rs2.close()
+ }
+ }
}
class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {