From 8cf12279969afe5099c66ad16897db366e7234ed Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 28 Nov 2014 11:42:40 -0500 Subject: [SPARK-4645][SQL] Disables asynchronous execution in Hive 0.13.1 HiveThriftServer2 This PR disables HiveThriftServer2 asynchronous execution by setting `runInBackground` argument in `ExecuteStatementOperation` to `false`, and reverting `SparkExecuteStatementOperation.run` in Hive 13 shim to Hive 12 version. This change makes Simba ODBC driver v1.0.0.1000 work. [Review on Reviewable](https://reviewable.io/reviews/apache/spark/3506) Author: Cheng Lian Closes #3506 from liancheng/disable-async-exec and squashes the following commits: 593804d [Cheng Lian] Disables asynchronous execution in Hive 0.13.1 HiveThriftServer2 --- .../spark/sql/hive/thriftserver/Shim13.scala | 139 ++++++--------------- 1 file changed, 39 insertions(+), 100 deletions(-) (limited to 'sql/hive-thriftserver') diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala index 99c1987158..17f1ad3e46 100644 --- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala +++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala @@ -17,30 +17,25 @@ package org.apache.spark.sql.hive.thriftserver -import java.security.PrivilegedExceptionAction import java.sql.{Date, Timestamp} -import java.util.concurrent.Future import java.util.{ArrayList => JArrayList, List => JList, Map => JMap} import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, Map => SMap} import scala.math._ -import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.api.FieldSchema -import org.apache.hadoop.hive.ql.metadata.Hive -import org.apache.hadoop.hive.ql.session.SessionState -import org.apache.hadoop.hive.shims.ShimLoader import org.apache.hadoop.security.UserGroupInformation import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.ExecuteStatementOperation import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.plans.logical.SetCommand import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes} -import org.apache.spark.sql.{SchemaRDD, Row => SparkRow} +import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD} /** * A compatibility layer for interacting with Hive version 0.13.1. @@ -48,7 +43,9 @@ import org.apache.spark.sql.{SchemaRDD, Row => SparkRow} private[thriftserver] object HiveThriftServerShim { val version = "0.13.1" - def setServerUserName(sparkServiceUGI: UserGroupInformation, sparkCliService:SparkSQLCLIService) = { + def setServerUserName( + sparkServiceUGI: UserGroupInformation, + sparkCliService:SparkSQLCLIService) = { setSuperField(sparkCliService, "serviceUGI", sparkServiceUGI) } } @@ -72,39 +69,14 @@ private[hive] class SparkExecuteStatementOperation( confOverlay: JMap[String, String], runInBackground: Boolean = true)( hiveContext: HiveContext, - sessionToActivePool: SMap[HiveSession, String]) extends ExecuteStatementOperation( - parentSession, statement, confOverlay, runInBackground) with Logging { + sessionToActivePool: SMap[HiveSession, String]) + // NOTE: `runInBackground` is set to `false` intentionally to disable asynchronous execution + extends ExecuteStatementOperation(parentSession, statement, confOverlay, false) with Logging { private var result: SchemaRDD = _ private var iter: Iterator[SparkRow] = _ private var dataTypes: Array[DataType] = _ - private def runInternal(cmd: String) = { - try { - result = hiveContext.sql(cmd) - logDebug(result.queryExecution.toString()) - val groupId = round(random * 1000000).toString - hiveContext.sparkContext.setJobGroup(groupId, statement) - iter = { - val useIncrementalCollect = - hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean - if (useIncrementalCollect) { - result.toLocalIterator - } else { - result.collect().iterator - } - } - dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray - } catch { - // Actually do need to catch Throwable as some failures don't inherit from Exception and - // HiveServer will silently swallow them. - case e: Throwable => - setState(OperationState.ERROR) - logError("Error executing query:",e) - throw new HiveSQLException(e.toString) - } - } - def close(): Unit = { // RDDs will be cleaned automatically upon garbage collection. logDebug("CLOSING") @@ -182,76 +154,43 @@ private[hive] class SparkExecuteStatementOperation( } } - private def getConfigForOperation: HiveConf = { - var sqlOperationConf: HiveConf = getParentSession.getHiveConf - if (!getConfOverlay.isEmpty || shouldRunAsync) { - sqlOperationConf = new HiveConf(sqlOperationConf) - import scala.collection.JavaConversions._ - for (confEntry <- getConfOverlay.entrySet) { - try { - sqlOperationConf.verifyAndSet(confEntry.getKey, confEntry.getValue) - } - catch { case e: IllegalArgumentException => - throw new HiveSQLException("Error applying statement specific settings", e) - } - } - } - sqlOperationConf - } - def run(): Unit = { logInfo(s"Running query '$statement'") - val opConfig: HiveConf = getConfigForOperation setState(OperationState.RUNNING) - setHasResultSet(true) - - if (!shouldRunAsync) { - runInternal(statement) - setState(OperationState.FINISHED) - } else { - val parentSessionState = SessionState.get - val sessionHive: Hive = Hive.get - val currentUGI: UserGroupInformation = ShimLoader.getHadoopShims.getUGIForConf(opConfig) - - val backgroundOperation: Runnable = new Runnable { - def run() { - val doAsAction: PrivilegedExceptionAction[AnyRef] = - new PrivilegedExceptionAction[AnyRef] { - def run: AnyRef = { - Hive.set(sessionHive) - SessionState.setCurrentSessionState(parentSessionState) - try { - runInternal(statement) - } - catch { case e: HiveSQLException => - setOperationException(e) - logError("Error running hive query: ", e) - } - null - } - } - try { - ShimLoader.getHadoopShims.doAs(currentUGI, doAsAction) - } - catch { case e: Exception => - setOperationException(new HiveSQLException(e)) - logError("Error running hive query as user : " + currentUGI.getShortUserName, e) - } - setState(OperationState.FINISHED) - } + try { + result = hiveContext.sql(statement) + logDebug(result.queryExecution.toString()) + result.queryExecution.logical match { + case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) => + sessionToActivePool(parentSession) = value + logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.") + case _ => } - try { - val backgroundHandle: Future[_] = getParentSession.getSessionManager. - submitBackgroundOperation(backgroundOperation) - setBackgroundHandle(backgroundHandle) - } catch { - // Actually do need to catch Throwable as some failures don't inherit from Exception and - // HiveServer will silently swallow them. - case e: Throwable => - logError("Error executing query:",e) - throw new HiveSQLException(e.toString) + val groupId = round(random * 1000000).toString + hiveContext.sparkContext.setJobGroup(groupId, statement) + sessionToActivePool.get(parentSession).foreach { pool => + hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) + } + iter = { + val useIncrementalCollect = + hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean + if (useIncrementalCollect) { + result.toLocalIterator + } else { + result.collect().iterator + } } + dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray + setHasResultSet(true) + } catch { + // Actually do need to catch Throwable as some failures don't inherit from Exception and + // HiveServer will silently swallow them. + case e: Throwable => + setState(OperationState.ERROR) + logError("Error executing query:", e) + throw new HiveSQLException(e.toString) } + setState(OperationState.FINISHED) } } -- cgit v1.2.3