diff options
author | Dong Wang <dong@databricks.com> | 2015-06-05 17:41:12 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2015-06-05 17:41:12 -0700 |
commit | eb19d3f75cbd002f7e72ce02017a8de67f562792 (patch) | |
tree | 67abe7c0b12822d9bd0f199026a2d080d5387a84 /sql/hive-thriftserver/src/main | |
parent | 6ebe419f335fcfb66dd3da74baf35eb5b2fc061d (diff) | |
download | spark-eb19d3f75cbd002f7e72ce02017a8de67f562792.tar.gz spark-eb19d3f75cbd002f7e72ce02017a8de67f562792.tar.bz2 spark-eb19d3f75cbd002f7e72ce02017a8de67f562792.zip |
[SPARK-6964] [SQL] Support Cancellation in the Thrift Server
Support runInBackground in SparkExecuteStatementOperation, and add cancellation
Author: Dong Wang <dong@databricks.com>
Closes #6207 from dongwang218/SPARK-6964-jdbc-cancel and squashes the following commits:
687c113 [Dong Wang] fix 100 characters
7bfa2a7 [Dong Wang] fix merge
380480f [Dong Wang] fix for liancheng's comments
eb3e385 [Dong Wang] small nit
341885b [Dong Wang] small fix
3d8ebf8 [Dong Wang] add spark.sql.hive.thriftServer.async flag
04142c3 [Dong Wang] set SQLSession for async execution
184ec35 [Dong Wang] keep hive conf
819ae03 [Dong Wang] [SPARK-6964][SQL][WIP] Support Cancellation in the Thrift Server
Diffstat (limited to 'sql/hive-thriftserver/src/main')
2 files changed, 157 insertions, 14 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index c0d1266212..e071103df9 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -17,11 +17,23 @@ package org.apache.spark.sql.hive.thriftserver +import java.security.PrivilegedExceptionAction import java.sql.{Date, Timestamp} +import java.util.concurrent.RejectedExecutionException import java.util.{Map => JMap, UUID} +import scala.collection.JavaConversions._ +import scala.collection.mutable.{ArrayBuffer, Map => SMap} +import scala.util.control.NonFatal + +import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hive.service.cli._ +import org.apache.hadoop.hive.ql.metadata.Hive +import org.apache.hadoop.hive.ql.metadata.HiveException +import org.apache.hadoop.hive.ql.session.SessionState +import org.apache.hadoop.hive.shims.ShimLoader +import org.apache.hadoop.security.UserGroupInformation import org.apache.hive.service.cli.operation.ExecuteStatementOperation import org.apache.hive.service.cli.session.HiveSession @@ -31,8 +43,6 @@ import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes} import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLConf} -import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, Map => SMap} private[hive] class SparkExecuteStatementOperation( parentSession: HiveSession, @@ -40,17 +50,19 @@ private[hive] class SparkExecuteStatementOperation( confOverlay: JMap[String, String], runInBackground: Boolean = true) (hiveContext: HiveContext, sessionToActivePool: SMap[SessionHandle, String]) - // NOTE: `runInBackground` is set to `false` intentionally to disable asynchronous execution - extends ExecuteStatementOperation(parentSession, statement, confOverlay, false) + extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground) with Logging { private var result: DataFrame = _ private var iter: Iterator[SparkRow] = _ private var dataTypes: Array[DataType] = _ + private var statementId: String = _ def close(): Unit = { // RDDs will be cleaned automatically upon garbage collection. - logDebug("CLOSING") + hiveContext.sparkContext.clearJobGroup() + logDebug(s"CLOSING $statementId") + cleanup(OperationState.CLOSED) } def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int) { @@ -114,10 +126,10 @@ private[hive] class SparkExecuteStatementOperation( } def getResultSetSchema: TableSchema = { - logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}") - if (result.queryExecution.analyzed.output.size == 0) { + if (result == null || result.queryExecution.analyzed.output.size == 0) { new TableSchema(new FieldSchema("Result", "string", "") :: Nil) } else { + logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}") val schema = result.queryExecution.analyzed.output.map { attr => new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "") } @@ -125,9 +137,73 @@ private[hive] class SparkExecuteStatementOperation( } } - def run(): Unit = { - val statementId = UUID.randomUUID().toString - logInfo(s"Running query '$statement'") + override def run(): Unit = { + setState(OperationState.PENDING) + setHasResultSet(true) // avoid no resultset for async run + + if (!runInBackground) { + runInternal() + } else { + val parentSessionState = SessionState.get() + val hiveConf = getConfigForOperation() + val sparkServiceUGI = ShimLoader.getHadoopShims.getUGIForConf(hiveConf) + val sessionHive = getCurrentHive() + val currentSqlSession = hiveContext.currentSession + + // Runnable impl to call runInternal asynchronously, + // from a different thread + val backgroundOperation = new Runnable() { + + override def run(): Unit = { + val doAsAction = new PrivilegedExceptionAction[Object]() { + override def run(): Object = { + + // User information is part of the metastore client member in Hive + hiveContext.setSession(currentSqlSession) + Hive.set(sessionHive) + SessionState.setCurrentSessionState(parentSessionState) + try { + runInternal() + } catch { + case e: HiveSQLException => + setOperationException(e) + log.error("Error running hive query: ", e) + } + return null + } + } + + try { + ShimLoader.getHadoopShims().doAs(sparkServiceUGI, doAsAction) + } catch { + case e: Exception => + setOperationException(new HiveSQLException(e)) + logError("Error running hive query as user : " + + sparkServiceUGI.getShortUserName(), e) + } + } + } + try { + // This submit blocks if no background threads are available to run this operation + val backgroundHandle = + getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation) + setBackgroundHandle(backgroundHandle) + } catch { + case rejected: RejectedExecutionException => + setState(OperationState.ERROR) + throw new HiveSQLException("The background threadpool cannot accept" + + " new task for execution, please retry the operation", rejected) + case NonFatal(e) => + logError(s"Error executing query in background", e) + setState(OperationState.ERROR) + throw e + } + } + } + + private def runInternal(): Unit = { + statementId = UUID.randomUUID().toString + logInfo(s"Running query '$statement' with $statementId") setState(OperationState.RUNNING) HiveThriftServer2.listener.onStatementStart( statementId, @@ -159,18 +235,82 @@ private[hive] class SparkExecuteStatementOperation( } } dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray - setHasResultSet(true) } catch { + case e: HiveSQLException => + if (getStatus().getState() == OperationState.CANCELED) { + return + } else { + setState(OperationState.ERROR); + throw e + } // Actually do need to catch Throwable as some failures don't inherit from Exception and // HiveServer will silently swallow them. case e: Throwable => + val currentState = getStatus().getState() + logError(s"Error executing query, currentState $currentState, ", e) setState(OperationState.ERROR) HiveThriftServer2.listener.onStatementError( statementId, e.getMessage, e.getStackTraceString) - logError("Error executing query:", e) throw new HiveSQLException(e.toString) } setState(OperationState.FINISHED) HiveThriftServer2.listener.onStatementFinish(statementId) } + + override def cancel(): Unit = { + logInfo(s"Cancel '$statement' with $statementId") + if (statementId != null) { + hiveContext.sparkContext.cancelJobGroup(statementId) + } + cleanup(OperationState.CANCELED) + } + + private def cleanup(state: OperationState) { + setState(state) + if (runInBackground) { + val backgroundHandle = getBackgroundHandle() + if (backgroundHandle != null) { + backgroundHandle.cancel(true) + } + } + } + + /** + * If there are query specific settings to overlay, then create a copy of config + * There are two cases we need to clone the session config that's being passed to hive driver + * 1. Async query - + * If the client changes a config setting, that shouldn't reflect in the execution + * already underway + * 2. confOverlay - + * The query specific settings should only be applied to the query config and not session + * @return new configuration + * @throws HiveSQLException + */ + private def getConfigForOperation(): HiveConf = { + var sqlOperationConf = getParentSession().getHiveConf() + if (!getConfOverlay().isEmpty() || runInBackground) { + // clone the partent session config for this query + sqlOperationConf = new HiveConf(sqlOperationConf) + + // apply overlay query specific settings, if any + getConfOverlay().foreach { case (k, v) => + try { + sqlOperationConf.verifyAndSet(k, v) + } catch { + case e: IllegalArgumentException => + throw new HiveSQLException("Error applying statement specific settings", e) + } + } + } + return sqlOperationConf + } + + private def getCurrentHive(): Hive = { + try { + return Hive.get() + } catch { + case e: HiveException => + throw new HiveSQLException("Failed to get current Hive object", e); + } + } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 9c0bf02391..c8031ed0f3 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -44,9 +44,12 @@ private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext) confOverlay: JMap[String, String], async: Boolean): ExecuteStatementOperation = synchronized { - val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay)( - hiveContext, sessionToActivePool) + val runInBackground = async && hiveContext.hiveThriftServerAsync + val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay, + runInBackground)(hiveContext, sessionToActivePool) handleToOperation.put(operation.getHandle, operation) + logDebug(s"Created Operation for $statement with session=$parentSession, " + + s"runInBackground=$runInBackground") operation } } |