aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver/v0.13.1
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2014-11-28 11:42:40 -0500
committerPatrick Wendell <pwendell@gmail.com>2014-11-28 12:00:10 -0500
commit8cf12279969afe5099c66ad16897db366e7234ed (patch)
treec9f19fcc9ffc81fcacde1b3235a81de9086029f6 /sql/hive-thriftserver/v0.13.1
parent7fa5fff29881421ef5da0ac3c254611b2318be00 (diff)
downloadspark-8cf12279969afe5099c66ad16897db366e7234ed.tar.gz
spark-8cf12279969afe5099c66ad16897db366e7234ed.tar.bz2
spark-8cf12279969afe5099c66ad16897db366e7234ed.zip
[SPARK-4645][SQL] Disables asynchronous execution in Hive 0.13.1 HiveThriftServer2
This PR disables HiveThriftServer2 asynchronous execution by setting `runInBackground` argument in `ExecuteStatementOperation` to `false`, and reverting `SparkExecuteStatementOperation.run` in Hive 13 shim to Hive 12 version. This change makes Simba ODBC driver v1.0.0.1000 work. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3506) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #3506 from liancheng/disable-async-exec and squashes the following commits: 593804d [Cheng Lian] Disables asynchronous execution in Hive 0.13.1 HiveThriftServer2
Diffstat (limited to 'sql/hive-thriftserver/v0.13.1')
-rw-r--r--sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala139
1 files changed, 39 insertions, 100 deletions
diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
index 99c1987158..17f1ad3e46 100644
--- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
+++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
@@ -17,30 +17,25 @@
package org.apache.spark.sql.hive.thriftserver
-import java.security.PrivilegedExceptionAction
import java.sql.{Date, Timestamp}
-import java.util.concurrent.Future
import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, Map => SMap}
import scala.math._
-import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.hadoop.hive.ql.metadata.Hive
-import org.apache.hadoop.hive.ql.session.SessionState
-import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.ExecuteStatementOperation
import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.plans.logical.SetCommand
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
-import org.apache.spark.sql.{SchemaRDD, Row => SparkRow}
+import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD}
/**
* A compatibility layer for interacting with Hive version 0.13.1.
@@ -48,7 +43,9 @@ import org.apache.spark.sql.{SchemaRDD, Row => SparkRow}
private[thriftserver] object HiveThriftServerShim {
val version = "0.13.1"
- def setServerUserName(sparkServiceUGI: UserGroupInformation, sparkCliService:SparkSQLCLIService) = {
+ def setServerUserName(
+ sparkServiceUGI: UserGroupInformation,
+ sparkCliService:SparkSQLCLIService) = {
setSuperField(sparkCliService, "serviceUGI", sparkServiceUGI)
}
}
@@ -72,39 +69,14 @@ private[hive] class SparkExecuteStatementOperation(
confOverlay: JMap[String, String],
runInBackground: Boolean = true)(
hiveContext: HiveContext,
- sessionToActivePool: SMap[HiveSession, String]) extends ExecuteStatementOperation(
- parentSession, statement, confOverlay, runInBackground) with Logging {
+ sessionToActivePool: SMap[HiveSession, String])
+ // NOTE: `runInBackground` is set to `false` intentionally to disable asynchronous execution
+ extends ExecuteStatementOperation(parentSession, statement, confOverlay, false) with Logging {
private var result: SchemaRDD = _
private var iter: Iterator[SparkRow] = _
private var dataTypes: Array[DataType] = _
- private def runInternal(cmd: String) = {
- try {
- result = hiveContext.sql(cmd)
- logDebug(result.queryExecution.toString())
- val groupId = round(random * 1000000).toString
- hiveContext.sparkContext.setJobGroup(groupId, statement)
- iter = {
- val useIncrementalCollect =
- hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
- if (useIncrementalCollect) {
- result.toLocalIterator
- } else {
- result.collect().iterator
- }
- }
- dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
- } catch {
- // Actually do need to catch Throwable as some failures don't inherit from Exception and
- // HiveServer will silently swallow them.
- case e: Throwable =>
- setState(OperationState.ERROR)
- logError("Error executing query:",e)
- throw new HiveSQLException(e.toString)
- }
- }
-
def close(): Unit = {
// RDDs will be cleaned automatically upon garbage collection.
logDebug("CLOSING")
@@ -182,76 +154,43 @@ private[hive] class SparkExecuteStatementOperation(
}
}
- private def getConfigForOperation: HiveConf = {
- var sqlOperationConf: HiveConf = getParentSession.getHiveConf
- if (!getConfOverlay.isEmpty || shouldRunAsync) {
- sqlOperationConf = new HiveConf(sqlOperationConf)
- import scala.collection.JavaConversions._
- for (confEntry <- getConfOverlay.entrySet) {
- try {
- sqlOperationConf.verifyAndSet(confEntry.getKey, confEntry.getValue)
- }
- catch { case e: IllegalArgumentException =>
- throw new HiveSQLException("Error applying statement specific settings", e)
- }
- }
- }
- sqlOperationConf
- }
-
def run(): Unit = {
logInfo(s"Running query '$statement'")
- val opConfig: HiveConf = getConfigForOperation
setState(OperationState.RUNNING)
- setHasResultSet(true)
-
- if (!shouldRunAsync) {
- runInternal(statement)
- setState(OperationState.FINISHED)
- } else {
- val parentSessionState = SessionState.get
- val sessionHive: Hive = Hive.get
- val currentUGI: UserGroupInformation = ShimLoader.getHadoopShims.getUGIForConf(opConfig)
-
- val backgroundOperation: Runnable = new Runnable {
- def run() {
- val doAsAction: PrivilegedExceptionAction[AnyRef] =
- new PrivilegedExceptionAction[AnyRef] {
- def run: AnyRef = {
- Hive.set(sessionHive)
- SessionState.setCurrentSessionState(parentSessionState)
- try {
- runInternal(statement)
- }
- catch { case e: HiveSQLException =>
- setOperationException(e)
- logError("Error running hive query: ", e)
- }
- null
- }
- }
- try {
- ShimLoader.getHadoopShims.doAs(currentUGI, doAsAction)
- }
- catch { case e: Exception =>
- setOperationException(new HiveSQLException(e))
- logError("Error running hive query as user : " + currentUGI.getShortUserName, e)
- }
- setState(OperationState.FINISHED)
- }
+ try {
+ result = hiveContext.sql(statement)
+ logDebug(result.queryExecution.toString())
+ result.queryExecution.logical match {
+ case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) =>
+ sessionToActivePool(parentSession) = value
+ logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
+ case _ =>
}
- try {
- val backgroundHandle: Future[_] = getParentSession.getSessionManager.
- submitBackgroundOperation(backgroundOperation)
- setBackgroundHandle(backgroundHandle)
- } catch {
- // Actually do need to catch Throwable as some failures don't inherit from Exception and
- // HiveServer will silently swallow them.
- case e: Throwable =>
- logError("Error executing query:",e)
- throw new HiveSQLException(e.toString)
+ val groupId = round(random * 1000000).toString
+ hiveContext.sparkContext.setJobGroup(groupId, statement)
+ sessionToActivePool.get(parentSession).foreach { pool =>
+ hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
+ }
+ iter = {
+ val useIncrementalCollect =
+ hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
+ if (useIncrementalCollect) {
+ result.toLocalIterator
+ } else {
+ result.collect().iterator
+ }
}
+ dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
+ setHasResultSet(true)
+ } catch {
+ // Actually do need to catch Throwable as some failures don't inherit from Exception and
+ // HiveServer will silently swallow them.
+ case e: Throwable =>
+ setState(OperationState.ERROR)
+ logError("Error executing query:", e)
+ throw new HiveSQLException(e.toString)
}
+ setState(OperationState.FINISHED)
}
}