diff options
author | huangzhaowei <carlmartinmax@gmail.com> | 2015-11-24 23:24:49 +0800 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2015-11-24 23:24:49 +0800 |
commit | d4a5e6f719079639ffd38470f4d8d1f6fde3228d (patch) | |
tree | bff862f90ce80dc2110c2b53714d306c30f4d08e /sql/hive-thriftserver/src/main | |
parent | 800bd799acf7f10a469d8d6537279953129eb2c6 (diff) | |
download | spark-d4a5e6f719079639ffd38470f4d8d1f6fde3228d.tar.gz spark-d4a5e6f719079639ffd38470f4d8d1f6fde3228d.tar.bz2 spark-d4a5e6f719079639ffd38470f4d8d1f6fde3228d.zip |
[SPARK-11043][SQL] BugFix:Set the operator log in the thrift server.
`SessionManager` will set the `operationLog` if the configuration `hive.server2.logging.operation.enabled` is true in version of hive 1.2.1.
But the spark did not adapt to this change, so no matter enabled the configuration or not, spark thrift server will always log the warn message.
PS: if `hive.server2.logging.operation.enabled` is false, it should log the warn message (the same as hive thrift server).
Author: huangzhaowei <carlmartinmax@gmail.com>
Closes #9056 from SaintBacchus/SPARK-11043.
Diffstat (limited to 'sql/hive-thriftserver/src/main')
2 files changed, 9 insertions, 4 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 82fef92dcb..e022ee86a7 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -134,12 +134,12 @@ private[hive] class SparkExecuteStatementOperation( def getResultSetSchema: TableSchema = resultSchema - override def run(): Unit = { + override def runInternal(): Unit = { setState(OperationState.PENDING) setHasResultSet(true) // avoid no resultset for async run if (!runInBackground) { - runInternal() + execute() } else { val sparkServiceUGI = Utils.getUGI() @@ -151,7 +151,7 @@ private[hive] class SparkExecuteStatementOperation( val doAsAction = new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { try { - runInternal() + execute() } catch { case e: HiveSQLException => setOperationException(e) @@ -188,7 +188,7 @@ private[hive] class SparkExecuteStatementOperation( } } - override def runInternal(): Unit = { + private def execute(): Unit = { statementId = UUID.randomUUID().toString logInfo(s"Running query '$statement' with $statementId") setState(OperationState.RUNNING) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index af4fcdf021..de4e9c62b5 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -41,6 +41,11 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext: override def init(hiveConf: HiveConf) { setSuperField(this, "hiveConf", hiveConf) + // Create operation log root directory, if operation logging is enabled + if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) { + invoke(classOf[SessionManager], this, "initOperationLogRootDir") + } + val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS) setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize)) getAncestorField[Log](this, 3, "LOG").info( |