aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2014-11-10 16:56:36 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-11-28 12:00:05 -0500
commit7fa5fff29881421ef5da0ac3c254611b2318be00 (patch)
tree28312fc34ef9a6709e77cbe0ed7649b87573e0a1 /sql/hive-thriftserver
parente9244263c97b61560e30dcb997df4bf074299085 (diff)
downloadspark-7fa5fff29881421ef5da0ac3c254611b2318be00.tar.gz
spark-7fa5fff29881421ef5da0ac3c254611b2318be00.tar.bz2
spark-7fa5fff29881421ef5da0ac3c254611b2318be00.zip
[SPARK-4308][SQL] Sets SQL operation state to ERROR when exception is thrown
In `HiveThriftServer2`, when an exception is thrown during a SQL execution, the SQL operation state should be set to `ERROR`, but now it remains `RUNNING`. This affects the result of the `GetOperationStatus` Thrift API. Author: Cheng Lian <lian@databricks.com> Closes #3175 from liancheng/fix-op-state and squashes the following commits: 6d4c1fe [Cheng Lian] Sets SQL operation state to ERROR when exception is thrown
Diffstat (limited to 'sql/hive-thriftserver')
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala2
-rw-r--r--sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala12
-rw-r--r--sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala36
3 files changed, 21 insertions, 29 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala
index fcb302edbf..6ed8fd2768 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala
@@ -19,8 +19,6 @@ package org.apache.spark.sql.hive.thriftserver
import scala.collection.JavaConversions._
-import java.util.{ArrayList => JArrayList}
-
import org.apache.commons.lang.exception.ExceptionUtils
import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema}
import org.apache.hadoop.hive.ql.Driver
diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
index e94017ea31..9258ad0cdf 100644
--- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
+++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
@@ -25,9 +25,7 @@ import scala.collection.mutable.{ArrayBuffer, Map => SMap}
import scala.math._
import org.apache.hadoop.hive.common.`type`.HiveDecimal
-import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory
import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hive.service.cli._
@@ -37,9 +35,9 @@ import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.plans.logical.SetCommand
import org.apache.spark.sql.catalyst.types._
-import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD}
-import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveContext}
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
+import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
+import org.apache.spark.sql.{SQLConf, SchemaRDD, Row => SparkRow}
/**
* A compatibility layer for interacting with Hive version 0.12.0.
@@ -71,8 +69,9 @@ private[hive] class SparkExecuteStatementOperation(
statement: String,
confOverlay: JMap[String, String])(
hiveContext: HiveContext,
- sessionToActivePool: SMap[HiveSession, String]) extends ExecuteStatementOperation(
- parentSession, statement, confOverlay) with Logging {
+ sessionToActivePool: SMap[HiveSession, String])
+ extends ExecuteStatementOperation(parentSession, statement, confOverlay) with Logging {
+
private var result: SchemaRDD = _
private var iter: Iterator[SparkRow] = _
private var dataTypes: Array[DataType] = _
@@ -217,6 +216,7 @@ private[hive] class SparkExecuteStatementOperation(
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
+ setState(OperationState.ERROR)
logError("Error executing query:",e)
throw new HiveSQLException(e.toString)
}
diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
index 6b9d18d0bb..99c1987158 100644
--- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
+++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
@@ -27,10 +27,9 @@ import scala.collection.mutable.{ArrayBuffer, Map => SMap}
import scala.math._
import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.ql.metadata.Hive
-import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory
import org.apache.hadoop.hive.ql.session.SessionState
-import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hive.service.cli._
@@ -39,9 +38,9 @@ import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.types._
-import org.apache.spark.sql.{Row => SparkRow, SchemaRDD}
-import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
+import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
+import org.apache.spark.sql.{SchemaRDD, Row => SparkRow}
/**
* A compatibility layer for interacting with Hive version 0.13.1.
@@ -100,6 +99,7 @@ private[hive] class SparkExecuteStatementOperation(
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
+ setState(OperationState.ERROR)
logError("Error executing query:",e)
throw new HiveSQLException(e.toString)
}
@@ -191,14 +191,12 @@ private[hive] class SparkExecuteStatementOperation(
try {
sqlOperationConf.verifyAndSet(confEntry.getKey, confEntry.getValue)
}
- catch {
- case e: IllegalArgumentException => {
- throw new HiveSQLException("Error applying statement specific settings", e)
- }
+ catch { case e: IllegalArgumentException =>
+ throw new HiveSQLException("Error applying statement specific settings", e)
}
}
}
- return sqlOperationConf
+ sqlOperationConf
}
def run(): Unit = {
@@ -216,7 +214,7 @@ private[hive] class SparkExecuteStatementOperation(
val currentUGI: UserGroupInformation = ShimLoader.getHadoopShims.getUGIForConf(opConfig)
val backgroundOperation: Runnable = new Runnable {
- def run {
+ def run() {
val doAsAction: PrivilegedExceptionAction[AnyRef] =
new PrivilegedExceptionAction[AnyRef] {
def run: AnyRef = {
@@ -225,23 +223,19 @@ private[hive] class SparkExecuteStatementOperation(
try {
runInternal(statement)
}
- catch {
- case e: HiveSQLException => {
- setOperationException(e)
- logError("Error running hive query: ", e)
- }
+ catch { case e: HiveSQLException =>
+ setOperationException(e)
+ logError("Error running hive query: ", e)
}
- return null
+ null
}
}
try {
ShimLoader.getHadoopShims.doAs(currentUGI, doAsAction)
}
- catch {
- case e: Exception => {
- setOperationException(new HiveSQLException(e))
- logError("Error running hive query as user : " + currentUGI.getShortUserName, e)
- }
+ catch { case e: Exception =>
+ setOperationException(new HiveSQLException(e))
+ logError("Error running hive query as user : " + currentUGI.getShortUserName, e)
}
setState(OperationState.FINISHED)
}