aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhuangzhaowei <carlmartinmax@gmail.com>2015-11-24 23:24:49 +0800
committerCheng Lian <lian@databricks.com>2015-11-24 23:24:49 +0800
commitd4a5e6f719079639ffd38470f4d8d1f6fde3228d (patch)
treebff862f90ce80dc2110c2b53714d306c30f4d08e
parent800bd799acf7f10a469d8d6537279953129eb2c6 (diff)
downloadspark-d4a5e6f719079639ffd38470f4d8d1f6fde3228d.tar.gz
spark-d4a5e6f719079639ffd38470f4d8d1f6fde3228d.tar.bz2
spark-d4a5e6f719079639ffd38470f4d8d1f6fde3228d.zip
[SPARK-11043][SQL] BugFix:Set the operator log in the thrift server.
`SessionManager` will set the `operationLog` if the configuration `hive.server2.logging.operation.enabled` is true in version of hive 1.2.1. But the spark did not adapt to this change, so no matter enabled the configuration or not, spark thrift server will always log the warn message. PS: if `hive.server2.logging.operation.enabled` is false, it should log the warn message (the same as hive thrift server). Author: huangzhaowei <carlmartinmax@gmail.com> Closes #9056 from SaintBacchus/SPARK-11043.
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala8
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala5
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala16
3 files changed, 24 insertions, 5 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 82fef92dcb..e022ee86a7 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -134,12 +134,12 @@ private[hive] class SparkExecuteStatementOperation(
def getResultSetSchema: TableSchema = resultSchema
- override def run(): Unit = {
+ override def runInternal(): Unit = {
setState(OperationState.PENDING)
setHasResultSet(true) // avoid no resultset for async run
if (!runInBackground) {
- runInternal()
+ execute()
} else {
val sparkServiceUGI = Utils.getUGI()
@@ -151,7 +151,7 @@ private[hive] class SparkExecuteStatementOperation(
val doAsAction = new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
try {
- runInternal()
+ execute()
} catch {
case e: HiveSQLException =>
setOperationException(e)
@@ -188,7 +188,7 @@ private[hive] class SparkExecuteStatementOperation(
}
}
- override def runInternal(): Unit = {
+ private def execute(): Unit = {
statementId = UUID.randomUUID().toString
logInfo(s"Running query '$statement' with $statementId")
setState(OperationState.RUNNING)
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
index af4fcdf021..de4e9c62b5 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -41,6 +41,11 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext:
override def init(hiveConf: HiveConf) {
setSuperField(this, "hiveConf", hiveConf)
+ // Create operation log root directory, if operation logging is enabled
+ if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) {
+ invoke(classOf[SessionManager], this, "initOperationLogRootDir")
+ }
+
val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS)
setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize))
getAncestorField[Log](this, 3, "LOG").info(
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 1dd898aa38..139d8e897b 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Promise, future}
+import scala.io.Source
import scala.util.{Random, Try}
import com.google.common.base.Charsets.UTF_8
@@ -507,6 +508,12 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
assert(rs2.getInt(2) === 500)
}
}
+
+ test("SPARK-11043 check operation log root directory") {
+ val expectedLine =
+ "Operation log root directory is created: " + operationLogPath.getAbsoluteFile
+ assert(Source.fromFile(logPath).getLines().exists(_.contains(expectedLine)))
+ }
}
class SingleSessionSuite extends HiveThriftJdbcTest {
@@ -642,7 +649,8 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
protected def metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true"
private val pidDir: File = Utils.createTempDir("thriftserver-pid")
- private var logPath: File = _
+ protected var logPath: File = _
+ protected var operationLogPath: File = _
private var logTailingProcess: Process = _
private var diagnosisBuffer: ArrayBuffer[String] = ArrayBuffer.empty[String]
@@ -679,6 +687,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
| --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=localhost
| --hiveconf ${ConfVars.HIVE_SERVER2_TRANSPORT_MODE}=$mode
+ | --hiveconf ${ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}=$operationLogPath
| --hiveconf $portConf=$port
| --driver-class-path $driverClassPath
| --driver-java-options -Dlog4j.debug
@@ -706,6 +715,8 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
warehousePath.delete()
metastorePath = Utils.createTempDir()
metastorePath.delete()
+ operationLogPath = Utils.createTempDir()
+ operationLogPath.delete()
logPath = null
logTailingProcess = null
@@ -782,6 +793,9 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
metastorePath.delete()
metastorePath = null
+ operationLogPath.delete()
+ operationLogPath = null
+
Option(logPath).foreach(_.delete())
logPath = null