From acb55aeddbe58758d75b9aed130634afe21797cf Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 10 Nov 2014 16:56:36 -0800 Subject: [SPARK-4308][SQL] Sets SQL operation state to ERROR when exception is thrown In `HiveThriftServer2`, when an exception is thrown during a SQL execution, the SQL operation state should be set to `ERROR`, but now it remains `RUNNING`. This affects the result of the `GetOperationStatus` Thrift API. Author: Cheng Lian Closes #3175 from liancheng/fix-op-state and squashes the following commits: 6d4c1fe [Cheng Lian] Sets SQL operation state to ERROR when exception is thrown --- .../hive/thriftserver/AbstractSparkSQLDriver.scala | 2 -- .../spark/sql/hive/thriftserver/Shim12.scala | 12 ++++---- .../spark/sql/hive/thriftserver/Shim13.scala | 36 +++++++++------------- 3 files changed, 21 insertions(+), 29 deletions(-) (limited to 'sql/hive-thriftserver') diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala index fcb302edbf..6ed8fd2768 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.hive.thriftserver import scala.collection.JavaConversions._ -import java.util.{ArrayList => JArrayList} - import org.apache.commons.lang.exception.ExceptionUtils import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema} import org.apache.hadoop.hive.ql.Driver diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala index e3ba9914c6..aa2e3cab72 100644 --- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala +++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala @@ -25,9 +25,7 @@ import scala.collection.mutable.{ArrayBuffer, Map => SMap} import scala.math._ import org.apache.hadoop.hive.common.`type`.HiveDecimal -import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.api.FieldSchema -import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory import org.apache.hadoop.hive.shims.ShimLoader import org.apache.hadoop.security.UserGroupInformation import org.apache.hive.service.cli._ @@ -37,9 +35,9 @@ import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.Logging import org.apache.spark.sql.catalyst.plans.logical.SetCommand import org.apache.spark.sql.catalyst.types._ -import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD} -import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveContext} import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ +import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes} +import org.apache.spark.sql.{SQLConf, SchemaRDD, Row => SparkRow} /** * A compatibility layer for interacting with Hive version 0.12.0. @@ -71,8 +69,9 @@ private[hive] class SparkExecuteStatementOperation( statement: String, confOverlay: JMap[String, String])( hiveContext: HiveContext, - sessionToActivePool: SMap[HiveSession, String]) extends ExecuteStatementOperation( - parentSession, statement, confOverlay) with Logging { + sessionToActivePool: SMap[HiveSession, String]) + extends ExecuteStatementOperation(parentSession, statement, confOverlay) with Logging { + private var result: SchemaRDD = _ private var iter: Iterator[SparkRow] = _ private var dataTypes: Array[DataType] = _ @@ -216,6 +215,7 @@ private[hive] class SparkExecuteStatementOperation( // Actually do need to catch Throwable as some failures don't inherit from Exception and // HiveServer will silently swallow them. case e: Throwable => + setState(OperationState.ERROR) logError("Error executing query:",e) throw new HiveSQLException(e.toString) } diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala index f2ceba8282..a642478d08 100644 --- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala +++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala @@ -27,10 +27,9 @@ import scala.collection.mutable.{ArrayBuffer, Map => SMap} import scala.math._ import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.hive.ql.metadata.Hive -import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory import org.apache.hadoop.hive.ql.session.SessionState -import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.hive.shims.ShimLoader import org.apache.hadoop.security.UserGroupInformation import org.apache.hive.service.cli._ @@ -39,9 +38,9 @@ import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.Logging import org.apache.spark.sql.catalyst.types._ -import org.apache.spark.sql.{Row => SparkRow, SchemaRDD} -import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes} import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ +import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes} +import org.apache.spark.sql.{SchemaRDD, Row => SparkRow} /** * A compatibility layer for interacting with Hive version 0.12.0. @@ -100,6 +99,7 @@ private[hive] class SparkExecuteStatementOperation( // Actually do need to catch Throwable as some failures don't inherit from Exception and // HiveServer will silently swallow them. case e: Throwable => + setState(OperationState.ERROR) logError("Error executing query:",e) throw new HiveSQLException(e.toString) } @@ -194,14 +194,12 @@ private[hive] class SparkExecuteStatementOperation( try { sqlOperationConf.verifyAndSet(confEntry.getKey, confEntry.getValue) } - catch { - case e: IllegalArgumentException => { - throw new HiveSQLException("Error applying statement specific settings", e) - } + catch { case e: IllegalArgumentException => + throw new HiveSQLException("Error applying statement specific settings", e) } } } - return sqlOperationConf + sqlOperationConf } def run(): Unit = { @@ -219,7 +217,7 @@ private[hive] class SparkExecuteStatementOperation( val currentUGI: UserGroupInformation = ShimLoader.getHadoopShims.getUGIForConf(opConfig) val backgroundOperation: Runnable = new Runnable { - def run { + def run() { val doAsAction: PrivilegedExceptionAction[AnyRef] = new PrivilegedExceptionAction[AnyRef] { def run: AnyRef = { @@ -228,23 +226,19 @@ private[hive] class SparkExecuteStatementOperation( try { runInternal(statement) } - catch { - case e: HiveSQLException => { - setOperationException(e) - logError("Error running hive query: ", e) - } + catch { case e: HiveSQLException => + setOperationException(e) + logError("Error running hive query: ", e) } - return null + null } } try { ShimLoader.getHadoopShims.doAs(currentUGI, doAsAction) } - catch { - case e: Exception => { - setOperationException(new HiveSQLException(e)) - logError("Error running hive query as user : " + currentUGI.getShortUserName, e) - } + catch { case e: Exception => + setOperationException(new HiveSQLException(e)) + logError("Error running hive query as user : " + currentUGI.getShortUserName, e) } setState(OperationState.FINISHED) } -- cgit v1.2.3