aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala8
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala5
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala16
3 files changed, 24 insertions, 5 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 82fef92dcb..e022ee86a7 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -134,12 +134,12 @@ private[hive] class SparkExecuteStatementOperation(
def getResultSetSchema: TableSchema = resultSchema
- override def run(): Unit = {
+ override def runInternal(): Unit = {
setState(OperationState.PENDING)
setHasResultSet(true) // avoid no resultset for async run
if (!runInBackground) {
- runInternal()
+ execute()
} else {
val sparkServiceUGI = Utils.getUGI()
@@ -151,7 +151,7 @@ private[hive] class SparkExecuteStatementOperation(
val doAsAction = new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
try {
- runInternal()
+ execute()
} catch {
case e: HiveSQLException =>
setOperationException(e)
@@ -188,7 +188,7 @@ private[hive] class SparkExecuteStatementOperation(
}
}
- override def runInternal(): Unit = {
+ private def execute(): Unit = {
statementId = UUID.randomUUID().toString
logInfo(s"Running query '$statement' with $statementId")
setState(OperationState.RUNNING)
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
index af4fcdf021..de4e9c62b5 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -41,6 +41,11 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext:
override def init(hiveConf: HiveConf) {
setSuperField(this, "hiveConf", hiveConf)
+ // Create operation log root directory, if operation logging is enabled
+ if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) {
+ invoke(classOf[SessionManager], this, "initOperationLogRootDir")
+ }
+
val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS)
setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize))
getAncestorField[Log](this, 3, "LOG").info(
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 1dd898aa38..139d8e897b 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Promise, future}
+import scala.io.Source
import scala.util.{Random, Try}
import com.google.common.base.Charsets.UTF_8
@@ -507,6 +508,12 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
assert(rs2.getInt(2) === 500)
}
}
+
+ test("SPARK-11043 check operation log root directory") {
+ val expectedLine =
+ "Operation log root directory is created: " + operationLogPath.getAbsoluteFile
+ assert(Source.fromFile(logPath).getLines().exists(_.contains(expectedLine)))
+ }
}
class SingleSessionSuite extends HiveThriftJdbcTest {
@@ -642,7 +649,8 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
protected def metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true"
private val pidDir: File = Utils.createTempDir("thriftserver-pid")
- private var logPath: File = _
+ protected var logPath: File = _
+ protected var operationLogPath: File = _
private var logTailingProcess: Process = _
private var diagnosisBuffer: ArrayBuffer[String] = ArrayBuffer.empty[String]
@@ -679,6 +687,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
| --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=localhost
| --hiveconf ${ConfVars.HIVE_SERVER2_TRANSPORT_MODE}=$mode
+ | --hiveconf ${ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}=$operationLogPath
| --hiveconf $portConf=$port
| --driver-class-path $driverClassPath
| --driver-java-options -Dlog4j.debug
@@ -706,6 +715,8 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
warehousePath.delete()
metastorePath = Utils.createTempDir()
metastorePath.delete()
+ operationLogPath = Utils.createTempDir()
+ operationLogPath.delete()
logPath = null
logTailingProcess = null
@@ -782,6 +793,9 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
metastorePath.delete()
metastorePath = null
+ operationLogPath.delete()
+ operationLogPath = null
+
Option(logPath).foreach(_.delete())
logPath = null