aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive-thriftserver/src/main')
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.scala37
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala27
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala9
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala56
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala13
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala11
6 files changed, 115 insertions, 38 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.scala b/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.scala
new file mode 100644
index 0000000000..2228f651e2
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.server
+
+import org.apache.hive.service.server.HiveServer2.{StartOptionExecutor, ServerOptionsProcessor}
+
+/**
+ * Class to upgrade a package-private class to public, and
+ * implement a `process()` operation consistent with
+ * the behavior of older Hive versions
+ * @param serverName name of the hive server
+ */
+private[apache] class HiveServerServerOptionsProcessor(serverName: String)
+ extends ServerOptionsProcessor(serverName) {
+
+ def process(args: Array[String]): Boolean = {
+ // A parse failure automatically triggers a system exit
+ val response = super.parse(args)
+ val executor = response.getServerOptionsExecutor()
+ // return true if the parsed option was to start the service
+ executor.isInstanceOf[StartOptionExecutor]
+ }
+}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index b7db80d93f..9c047347cb 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.hive.thriftserver
+import java.util.Locale
+import java.util.concurrent.atomic.AtomicBoolean
+
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -24,7 +27,7 @@ import org.apache.commons.logging.LogFactory
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService}
-import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor}
+import org.apache.hive.service.server.{HiveServerServerOptionsProcessor, HiveServer2}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart}
@@ -65,7 +68,7 @@ object HiveThriftServer2 extends Logging {
}
def main(args: Array[String]) {
- val optionsProcessor = new ServerOptionsProcessor("HiveThriftServer2")
+ val optionsProcessor = new HiveServerServerOptionsProcessor("HiveThriftServer2")
if (!optionsProcessor.process(args)) {
System.exit(-1)
}
@@ -241,9 +244,12 @@ object HiveThriftServer2 extends Logging {
private[hive] class HiveThriftServer2(hiveContext: HiveContext)
extends HiveServer2
with ReflectedCompositeService {
+ // state is tracked internally so that the server only attempts to shut down if it successfully
+ // started, and then once only.
+ private val started = new AtomicBoolean(false)
override def init(hiveConf: HiveConf) {
- val sparkSqlCliService = new SparkSQLCLIService(hiveContext)
+ val sparkSqlCliService = new SparkSQLCLIService(this, hiveContext)
setSuperField(this, "cliService", sparkSqlCliService)
addService(sparkSqlCliService)
@@ -259,8 +265,19 @@ private[hive] class HiveThriftServer2(hiveContext: HiveContext)
}
private def isHTTPTransportMode(hiveConf: HiveConf): Boolean = {
- val transportMode: String = hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE)
- transportMode.equalsIgnoreCase("http")
+ val transportMode = hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE)
+ transportMode.toLowerCase(Locale.ENGLISH).equals("http")
+ }
+
+
+ override def start(): Unit = {
+ super.start()
+ started.set(true)
}
+ override def stop(): Unit = {
+ if (started.getAndSet(false)) {
+ super.stop()
+ }
+ }
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index e8758887ff..833bf62d47 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -32,8 +32,7 @@ import org.apache.hive.service.cli._
import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.hive.ql.metadata.HiveException
import org.apache.hadoop.hive.ql.session.SessionState
-import org.apache.hadoop.hive.shims.ShimLoader
-import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.hive.shims.Utils
import org.apache.hive.service.cli.operation.ExecuteStatementOperation
import org.apache.hive.service.cli.session.HiveSession
@@ -146,7 +145,7 @@ private[hive] class SparkExecuteStatementOperation(
} else {
val parentSessionState = SessionState.get()
val hiveConf = getConfigForOperation()
- val sparkServiceUGI = ShimLoader.getHadoopShims.getUGIForConf(hiveConf)
+ val sparkServiceUGI = Utils.getUGI()
val sessionHive = getCurrentHive()
val currentSqlSession = hiveContext.currentSession
@@ -174,7 +173,7 @@ private[hive] class SparkExecuteStatementOperation(
}
try {
- ShimLoader.getHadoopShims().doAs(sparkServiceUGI, doAsAction)
+ sparkServiceUGI.doAs(doAsAction)
} catch {
case e: Exception =>
setOperationException(new HiveSQLException(e))
@@ -201,7 +200,7 @@ private[hive] class SparkExecuteStatementOperation(
}
}
- private def runInternal(): Unit = {
+ override def runInternal(): Unit = {
statementId = UUID.randomUUID().toString
logInfo(s"Running query '$statement' with $statementId")
setState(OperationState.RUNNING)
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index f66a17b209..d3886142b3 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -20,9 +20,10 @@ package org.apache.spark.sql.hive.thriftserver
import scala.collection.JavaConversions._
import java.io._
-import java.util.{ArrayList => JArrayList}
+import java.util.{ArrayList => JArrayList, Locale}
-import jline.{ConsoleReader, History}
+import jline.console.ConsoleReader
+import jline.console.history.FileHistory
import org.apache.commons.lang3.StringUtils
import org.apache.commons.logging.LogFactory
@@ -40,6 +41,10 @@ import org.apache.spark.Logging
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.util.Utils
+/**
+ * This code doesn't support remote connections in Hive 1.2+, as the underlying CliDriver
+ * has dropped its support.
+ */
private[hive] object SparkSQLCLIDriver extends Logging {
private var prompt = "spark-sql"
private var continuedPrompt = "".padTo(prompt.length, ' ')
@@ -111,16 +116,9 @@ private[hive] object SparkSQLCLIDriver extends Logging {
// Clean up after we exit
Utils.addShutdownHook { () => SparkSQLEnv.stop() }
+ val remoteMode = isRemoteMode(sessionState)
// "-h" option has been passed, so connect to Hive thrift server.
- if (sessionState.getHost != null) {
- sessionState.connect()
- if (sessionState.isRemoteMode) {
- prompt = s"[${sessionState.getHost}:${sessionState.getPort}]" + prompt
- continuedPrompt = "".padTo(prompt.length, ' ')
- }
- }
-
- if (!sessionState.isRemoteMode) {
+ if (!remoteMode) {
// Hadoop-20 and above - we need to augment classpath using hiveconf
// components.
// See also: code in ExecDriver.java
@@ -131,6 +129,9 @@ private[hive] object SparkSQLCLIDriver extends Logging {
}
conf.setClassLoader(loader)
Thread.currentThread().setContextClassLoader(loader)
+ } else {
+ // Hive 1.2 + not supported in CLI
+ throw new RuntimeException("Remote operations not supported")
}
val cli = new SparkSQLCLIDriver
@@ -171,14 +172,14 @@ private[hive] object SparkSQLCLIDriver extends Logging {
val reader = new ConsoleReader()
reader.setBellEnabled(false)
// reader.setDebug(new PrintWriter(new FileWriter("writer.debug", true)))
- CliDriver.getCommandCompletor.foreach((e) => reader.addCompletor(e))
+ CliDriver.getCommandCompleter.foreach((e) => reader.addCompleter(e))
val historyDirectory = System.getProperty("user.home")
try {
if (new File(historyDirectory).exists()) {
val historyFile = historyDirectory + File.separator + ".hivehistory"
- reader.setHistory(new History(new File(historyFile)))
+ reader.setHistory(new FileHistory(new File(historyFile)))
} else {
logWarning("WARNING: Directory for Hive history file: " + historyDirectory +
" does not exist. History will not be available during this session.")
@@ -190,10 +191,14 @@ private[hive] object SparkSQLCLIDriver extends Logging {
logWarning(e.getMessage)
}
+ // TODO: missing
+/*
val clientTransportTSocketField = classOf[CliSessionState].getDeclaredField("transport")
clientTransportTSocketField.setAccessible(true)
transport = clientTransportTSocketField.get(sessionState).asInstanceOf[TSocket]
+*/
+ transport = null
var ret = 0
var prefix = ""
@@ -230,6 +235,13 @@ private[hive] object SparkSQLCLIDriver extends Logging {
System.exit(ret)
}
+
+
+ def isRemoteMode(state: CliSessionState): Boolean = {
+ // sessionState.isRemoteMode
+ state.isHiveServerQuery
+ }
+
}
private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
@@ -239,25 +251,33 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
private val console = new SessionState.LogHelper(LOG)
+ private val isRemoteMode = {
+ SparkSQLCLIDriver.isRemoteMode(sessionState)
+ }
+
private val conf: Configuration =
if (sessionState != null) sessionState.getConf else new Configuration()
// Force initializing SparkSQLEnv. This is put here but not object SparkSQLCliDriver
// because the Hive unit tests do not go through the main() code path.
- if (!sessionState.isRemoteMode) {
+ if (!isRemoteMode) {
SparkSQLEnv.init()
+ } else {
+ // Hive 1.2 + not supported in CLI
+ throw new RuntimeException("Remote operations not supported")
}
override def processCmd(cmd: String): Int = {
val cmd_trimmed: String = cmd.trim()
+ val cmd_lower = cmd_trimmed.toLowerCase(Locale.ENGLISH)
val tokens: Array[String] = cmd_trimmed.split("\\s+")
val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
- if (cmd_trimmed.toLowerCase.equals("quit") ||
- cmd_trimmed.toLowerCase.equals("exit") ||
- tokens(0).equalsIgnoreCase("source") ||
+ if (cmd_lower.equals("quit") ||
+ cmd_lower.equals("exit") ||
+ tokens(0).toLowerCase(Locale.ENGLISH).equals("source") ||
cmd_trimmed.startsWith("!") ||
tokens(0).toLowerCase.equals("list") ||
- sessionState.isRemoteMode) {
+ isRemoteMode) {
val start = System.currentTimeMillis()
super.processCmd(cmd)
val end = System.currentTimeMillis()
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
index 41f647d5f8..644165acf7 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
@@ -23,11 +23,12 @@ import javax.security.auth.login.LoginException
import org.apache.commons.logging.Log
import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.shims.ShimLoader
+import org.apache.hadoop.hive.shims.Utils
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hive.service.Service.STATE
import org.apache.hive.service.auth.HiveAuthFactory
import org.apache.hive.service.cli._
+import org.apache.hive.service.server.HiveServer2
import org.apache.hive.service.{AbstractService, Service, ServiceException}
import org.apache.spark.sql.hive.HiveContext
@@ -35,22 +36,22 @@ import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import scala.collection.JavaConversions._
-private[hive] class SparkSQLCLIService(hiveContext: HiveContext)
- extends CLIService
+private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, hiveContext: HiveContext)
+ extends CLIService(hiveServer)
with ReflectedCompositeService {
override def init(hiveConf: HiveConf) {
setSuperField(this, "hiveConf", hiveConf)
- val sparkSqlSessionManager = new SparkSQLSessionManager(hiveContext)
+ val sparkSqlSessionManager = new SparkSQLSessionManager(hiveServer, hiveContext)
setSuperField(this, "sessionManager", sparkSqlSessionManager)
addService(sparkSqlSessionManager)
var sparkServiceUGI: UserGroupInformation = null
- if (ShimLoader.getHadoopShims.isSecurityEnabled) {
+ if (UserGroupInformation.isSecurityEnabled) {
try {
HiveAuthFactory.loginFromKeytab(hiveConf)
- sparkServiceUGI = ShimLoader.getHadoopShims.getUGIForConf(hiveConf)
+ sparkServiceUGI = Utils.getUGI()
setSuperField(this, "serviceUGI", sparkServiceUGI)
} catch {
case e @ (_: IOException | _: LoginException) =>
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
index 2d5ee68002..92ac0ec3fc 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -25,14 +25,15 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.service.cli.SessionHandle
import org.apache.hive.service.cli.session.SessionManager
import org.apache.hive.service.cli.thrift.TProtocolVersion
+import org.apache.hive.service.server.HiveServer2
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
-private[hive] class SparkSQLSessionManager(hiveContext: HiveContext)
- extends SessionManager
+private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext: HiveContext)
+ extends SessionManager(hiveServer)
with ReflectedCompositeService {
private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext)
@@ -55,12 +56,14 @@ private[hive] class SparkSQLSessionManager(hiveContext: HiveContext)
protocol: TProtocolVersion,
username: String,
passwd: String,
+ ipAddress: String,
sessionConf: java.util.Map[String, String],
withImpersonation: Boolean,
delegationToken: String): SessionHandle = {
hiveContext.openSession()
- val sessionHandle = super.openSession(
- protocol, username, passwd, sessionConf, withImpersonation, delegationToken)
+ val sessionHandle =
+ super.openSession(protocol, username, passwd, ipAddress, sessionConf, withImpersonation,
+ delegationToken)
val session = super.getSession(sessionHandle)
HiveThriftServer2.listener.onSessionCreated(
session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername)