aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver
diff options
context:
space:
mode:
authorSteve Loughran <stevel@hortonworks.com>2015-08-03 15:24:34 -0700
committerMichael Armbrust <michael@databricks.com>2015-08-03 15:24:42 -0700
commita2409d1c8e8ddec04b529ac6f6a12b5993f0eeda (patch)
treed0df37d141895084eb52875f324141de42108605 /sql/hive-thriftserver
parentb2e4b85d2db0320e9cbfaf5a5542f749f1f11cf4 (diff)
downloadspark-a2409d1c8e8ddec04b529ac6f6a12b5993f0eeda.tar.gz
spark-a2409d1c8e8ddec04b529ac6f6a12b5993f0eeda.tar.bz2
spark-a2409d1c8e8ddec04b529ac6f6a12b5993f0eeda.zip
[SPARK-8064] [SQL] Build against Hive 1.2.1
Cherry picked the parts of the initial SPARK-8064 WiP branch needed to get sql/hive to compile against hive 1.2.1. That's the ASF release packaged under org.apache.hive, not any fork. Tests not run yet: that's what the machines are for Author: Steve Loughran <stevel@hortonworks.com> Author: Cheng Lian <lian@databricks.com> Author: Michael Armbrust <michael@databricks.com> Author: Patrick Wendell <patrick@databricks.com> Closes #7191 from steveloughran/stevel/feature/SPARK-8064-hive-1.2-002 and squashes the following commits: 7556d85 [Cheng Lian] Updates .q files and corresponding golden files ef4af62 [Steve Loughran] Merge commit '6a92bb09f46a04d6cd8c41bdba3ecb727ebb9030' into stevel/feature/SPARK-8064-hive-1.2-002 6a92bb0 [Cheng Lian] Overrides HiveConf time vars dcbb391 [Cheng Lian] Adds com.twitter:parquet-hadoop-bundle:1.6.0 for Hive Parquet SerDe 0bbe475 [Steve Loughran] SPARK-8064 scalastyle rejects the standard Hadoop ASF license header... fdf759b [Steve Loughran] SPARK-8064 classpath dependency suite to be in sync with shading in final (?) hive-exec spark 7a6c727 [Steve Loughran] SPARK-8064 switch to second staging repo of the spark-hive artifacts. This one has the protobuf-shaded hive-exec jar 376c003 [Steve Loughran] SPARK-8064 purge duplicate protobuf declaration 2c74697 [Steve Loughran] SPARK-8064 switch to the protobuf shaded hive-exec jar with tests to chase it down cc44020 [Steve Loughran] SPARK-8064 remove hadoop.version from runtest.py, as profile will fix that automatically. 6901fa9 [Steve Loughran] SPARK-8064 explicit protobuf import da310dc [Michael Armbrust] Fixes for Hive tests. a775a75 [Steve Loughran] SPARK-8064 cherry-pick-incomplete 7404f34 [Patrick Wendell] Add spark-hive staging repo 832c164 [Steve Loughran] SPARK-8064 try to supress compiler warnings on Complex.java pasted-thrift-code 312c0d4 [Steve Loughran] SPARK-8064 maven/ivy dependency purge; calcite declaration needed fa5ae7b [Steve Loughran] HIVE-8064 fix up hive-thriftserver dependencies and cut back on evicted references in the hive- packages; this keeps mvn and ivy resolution compatible, as the reconciliation policy is "by hand" c188048 [Steve Loughran] SPARK-8064 manage the Hive depencencies to that -things that aren't needed are excluded -sql/hive built with ivy is in sync with the maven reconciliation policy, rather than latest-first 4c8be8d [Cheng Lian] WIP: Partial fix for Thrift server and CLI tests 314eb3c [Steve Loughran] SPARK-8064 deprecation warning noise in one of the tests 17b0341 [Steve Loughran] SPARK-8064 IDE-hinted cleanups of Complex.java to reduce compiler warnings. It's all autogenerated code, so still ugly. d029b92 [Steve Loughran] SPARK-8064 rely on unescaping to have already taken place, so go straight to map of serde options 23eca7e [Steve Loughran] HIVE-8064 handle raw and escaped property tokens 54d9b06 [Steve Loughran] SPARK-8064 fix compilation regression surfacing from rebase 0b12d5f [Steve Loughran] HIVE-8064 use subset of hive complex type whose types deserialize fce73b6 [Steve Loughran] SPARK-8064 poms rely implicitly on the version of kryo chill provides fd3aa5d [Steve Loughran] SPARK-8064 version of hive to d/l from ivy is 1.2.1 dc73ece [Steve Loughran] SPARK-8064 revert to master's determinstic pushdown strategy d3c1e4a [Steve Loughran] SPARK-8064 purge UnionType 051cc21 [Steve Loughran] SPARK-8064 switch to an unshaded version of hive-exec-core, which must have been built with Kryo 2.21. This currently looks for a (locally built) version 1.2.1.spark 6684c60 [Steve Loughran] SPARK-8064 ignore RTE raised in blocking process.exitValue() call e6121e5 [Steve Loughran] SPARK-8064 address review comments aa43dc6 [Steve Loughran] SPARK-8064 more robust teardown on JavaMetastoreDatasourcesSuite f2bff01 [Steve Loughran] SPARK-8064 better takeup of asynchronously caught error text 8b1ef38 [Steve Loughran] SPARK-8064: on failures executing spark-submit in HiveSparkSubmitSuite, print command line and all logged output. 5a9ce6b [Steve Loughran] SPARK-8064 add explicit reason for kv split failure, rather than array OOB. *does not address the issue* 642b63a [Steve Loughran] SPARK-8064 reinstate something cut briefly during rebasing 97194dc [Steve Loughran] SPARK-8064 add extra logging to the YarnClusterSuite classpath test. There should be no reason why this is failing on jenkins, but as it is (and presumably its CP-related), improve the logging including any exception raised. 335357f [Steve Loughran] SPARK-8064 fail fast on thrive process spawning tests on exit codes and/or error string patterns seen in log. 3ed872f [Steve Loughran] SPARK-8064 rename field double to dbl bca55e5 [Steve Loughran] SPARK-8064 missed one of the `date` escapes 41d6479 [Steve Loughran] SPARK-8064 wrap tests with withTable() calls to avoid table-exists exceptions 2bc29a4 [Steve Loughran] SPARK-8064 ParquetSuites to escape `date` field name 1ab9bc4 [Steve Loughran] SPARK-8064 TestHive to use sered2.thrift.test.Complex bf3a249 [Steve Loughran] SPARK-8064: more resubmit than fix; tighten startup timeout to 60s. Still no obvious reason why jersey server code in spark-assembly isn't being picked up -it hasn't been shaded c829b8f [Steve Loughran] SPARK-8064: reinstate yarn-rm-server dependencies to hive-exec to ensure that jersey server is on classpath on hadoop versions < 2.6 0b0f738 [Steve Loughran] SPARK-8064: thrift server startup to fail fast on any exception in the main thread 13abaf1 [Steve Loughran] SPARK-8064 Hive compatibilty tests sin sync with explain/show output from Hive 1.2.1 d14d5ea [Steve Loughran] SPARK-8064: DATE is now a predicate; you can't use it as a field in select ops 26eef1c [Steve Loughran] SPARK-8064: HIVE-9039 renamed TOK_UNION => TOK_UNIONALL while adding TOK_UNIONDISTINCT 3d64523 [Steve Loughran] SPARK-8064 improve diagns on uknown token; fix scalastyle failure d0360f6 [Steve Loughran] SPARK-8064: delicate merge in of the branch vanzin/hive-1.1 1126e5a [Steve Loughran] SPARK-8064: name of unrecognized file format wasn't appearing in error text 8cb09c4 [Steve Loughran] SPARK-8064: test resilience/assertion improvements. Independent of the rest of the work; can be backported to earlier versions dec12cb [Steve Loughran] SPARK-8064: when a CLI suite test fails include the full output text in the raised exception; this ensures that the stdout/stderr is included in jenkins reports, so it becomes possible to diagnose the cause. 463a670 [Steve Loughran] SPARK-8064 run-tests.py adds a hadoop-2.6 profile, and changes info messages to say "w/Hive 1.2.1" in console output 2531099 [Steve Loughran] SPARK-8064 successful attempt to get rid of pentaho as a transitive dependency of hive-exec 1d59100 [Steve Loughran] SPARK-8064 (unsuccessful) attempt to get rid of pentaho as a transitive dependency of hive-exec 75733fc [Steve Loughran] SPARK-8064 change thrift binary startup message to "Starting ThriftBinaryCLIService on port" 3ebc279 [Steve Loughran] SPARK-8064 move strings used to check for http/bin thrift services up into constants c80979d [Steve Loughran] SPARK-8064: SparkSQLCLIDriver drops remote mode support. CLISuite Tests pass instead of timing out: undetected regression? 27e8370 [Steve Loughran] SPARK-8064 fix some style & IDE warnings 00e50d6 [Steve Loughran] SPARK-8064 stop excluding hive shims from dependency (commented out , for now) cb4f142 [Steve Loughran] SPARK-8054 cut pentaho dependency from calcite f7aa9cb [Steve Loughran] SPARK-8064 everything compiles with some commenting and moving of classes into a hive package 6c310b4 [Steve Loughran] SPARK-8064 subclass Hive ServerOptionsProcessor to make it public again f61a675 [Steve Loughran] SPARK-8064 thrift server switched to Hive 1.2.1, though it doesn't compile everywhere 4890b9d [Steve Loughran] SPARK-8064, build against Hive 1.2.1
Diffstat (limited to 'sql/hive-thriftserver')
-rw-r--r--sql/hive-thriftserver/pom.xml22
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.scala37
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala27
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala9
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala56
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala13
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala11
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala75
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala40
9 files changed, 222 insertions, 68 deletions
diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml
index 73e6ccdb1e..2dfbcb2425 100644
--- a/sql/hive-thriftserver/pom.xml
+++ b/sql/hive-thriftserver/pom.xml
@@ -62,19 +62,29 @@
</dependency>
<dependency>
<groupId>${hive.group}</groupId>
+ <artifactId>hive-service</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${hive.group}</groupId>
<artifactId>hive-beeline</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </dependency>
<!-- Added for selenium: -->
<dependency>
<groupId>org.seleniumhq.selenium</groupId>
<artifactId>selenium-java</artifactId>
<scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- </exclusions>
</dependency>
</dependencies>
<build>
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.scala b/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.scala
new file mode 100644
index 0000000000..2228f651e2
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.server
+
+import org.apache.hive.service.server.HiveServer2.{StartOptionExecutor, ServerOptionsProcessor}
+
+/**
+ * Class to upgrade a package-private class to public, and
+ * implement a `process()` operation consistent with
+ * the behavior of older Hive versions
+ * @param serverName name of the hive server
+ */
+private[apache] class HiveServerServerOptionsProcessor(serverName: String)
+ extends ServerOptionsProcessor(serverName) {
+
+ def process(args: Array[String]): Boolean = {
+ // A parse failure automatically triggers a system exit
+ val response = super.parse(args)
+ val executor = response.getServerOptionsExecutor()
+ // return true if the parsed option was to start the service
+ executor.isInstanceOf[StartOptionExecutor]
+ }
+}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index b7db80d93f..9c047347cb 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.hive.thriftserver
+import java.util.Locale
+import java.util.concurrent.atomic.AtomicBoolean
+
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -24,7 +27,7 @@ import org.apache.commons.logging.LogFactory
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService}
-import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor}
+import org.apache.hive.service.server.{HiveServerServerOptionsProcessor, HiveServer2}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart}
@@ -65,7 +68,7 @@ object HiveThriftServer2 extends Logging {
}
def main(args: Array[String]) {
- val optionsProcessor = new ServerOptionsProcessor("HiveThriftServer2")
+ val optionsProcessor = new HiveServerServerOptionsProcessor("HiveThriftServer2")
if (!optionsProcessor.process(args)) {
System.exit(-1)
}
@@ -241,9 +244,12 @@ object HiveThriftServer2 extends Logging {
private[hive] class HiveThriftServer2(hiveContext: HiveContext)
extends HiveServer2
with ReflectedCompositeService {
+ // state is tracked internally so that the server only attempts to shut down if it successfully
+ // started, and then once only.
+ private val started = new AtomicBoolean(false)
override def init(hiveConf: HiveConf) {
- val sparkSqlCliService = new SparkSQLCLIService(hiveContext)
+ val sparkSqlCliService = new SparkSQLCLIService(this, hiveContext)
setSuperField(this, "cliService", sparkSqlCliService)
addService(sparkSqlCliService)
@@ -259,8 +265,19 @@ private[hive] class HiveThriftServer2(hiveContext: HiveContext)
}
private def isHTTPTransportMode(hiveConf: HiveConf): Boolean = {
- val transportMode: String = hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE)
- transportMode.equalsIgnoreCase("http")
+ val transportMode = hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE)
+ transportMode.toLowerCase(Locale.ENGLISH).equals("http")
+ }
+
+
+ override def start(): Unit = {
+ super.start()
+ started.set(true)
}
+ override def stop(): Unit = {
+ if (started.getAndSet(false)) {
+ super.stop()
+ }
+ }
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index e8758887ff..833bf62d47 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -32,8 +32,7 @@ import org.apache.hive.service.cli._
import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.hive.ql.metadata.HiveException
import org.apache.hadoop.hive.ql.session.SessionState
-import org.apache.hadoop.hive.shims.ShimLoader
-import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.hive.shims.Utils
import org.apache.hive.service.cli.operation.ExecuteStatementOperation
import org.apache.hive.service.cli.session.HiveSession
@@ -146,7 +145,7 @@ private[hive] class SparkExecuteStatementOperation(
} else {
val parentSessionState = SessionState.get()
val hiveConf = getConfigForOperation()
- val sparkServiceUGI = ShimLoader.getHadoopShims.getUGIForConf(hiveConf)
+ val sparkServiceUGI = Utils.getUGI()
val sessionHive = getCurrentHive()
val currentSqlSession = hiveContext.currentSession
@@ -174,7 +173,7 @@ private[hive] class SparkExecuteStatementOperation(
}
try {
- ShimLoader.getHadoopShims().doAs(sparkServiceUGI, doAsAction)
+ sparkServiceUGI.doAs(doAsAction)
} catch {
case e: Exception =>
setOperationException(new HiveSQLException(e))
@@ -201,7 +200,7 @@ private[hive] class SparkExecuteStatementOperation(
}
}
- private def runInternal(): Unit = {
+ override def runInternal(): Unit = {
statementId = UUID.randomUUID().toString
logInfo(s"Running query '$statement' with $statementId")
setState(OperationState.RUNNING)
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index f66a17b209..d3886142b3 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -20,9 +20,10 @@ package org.apache.spark.sql.hive.thriftserver
import scala.collection.JavaConversions._
import java.io._
-import java.util.{ArrayList => JArrayList}
+import java.util.{ArrayList => JArrayList, Locale}
-import jline.{ConsoleReader, History}
+import jline.console.ConsoleReader
+import jline.console.history.FileHistory
import org.apache.commons.lang3.StringUtils
import org.apache.commons.logging.LogFactory
@@ -40,6 +41,10 @@ import org.apache.spark.Logging
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.util.Utils
+/**
+ * This code doesn't support remote connections in Hive 1.2+, as the underlying CliDriver
+ * has dropped its support.
+ */
private[hive] object SparkSQLCLIDriver extends Logging {
private var prompt = "spark-sql"
private var continuedPrompt = "".padTo(prompt.length, ' ')
@@ -111,16 +116,9 @@ private[hive] object SparkSQLCLIDriver extends Logging {
// Clean up after we exit
Utils.addShutdownHook { () => SparkSQLEnv.stop() }
+ val remoteMode = isRemoteMode(sessionState)
// "-h" option has been passed, so connect to Hive thrift server.
- if (sessionState.getHost != null) {
- sessionState.connect()
- if (sessionState.isRemoteMode) {
- prompt = s"[${sessionState.getHost}:${sessionState.getPort}]" + prompt
- continuedPrompt = "".padTo(prompt.length, ' ')
- }
- }
-
- if (!sessionState.isRemoteMode) {
+ if (!remoteMode) {
// Hadoop-20 and above - we need to augment classpath using hiveconf
// components.
// See also: code in ExecDriver.java
@@ -131,6 +129,9 @@ private[hive] object SparkSQLCLIDriver extends Logging {
}
conf.setClassLoader(loader)
Thread.currentThread().setContextClassLoader(loader)
+ } else {
+ // Hive 1.2 + not supported in CLI
+ throw new RuntimeException("Remote operations not supported")
}
val cli = new SparkSQLCLIDriver
@@ -171,14 +172,14 @@ private[hive] object SparkSQLCLIDriver extends Logging {
val reader = new ConsoleReader()
reader.setBellEnabled(false)
// reader.setDebug(new PrintWriter(new FileWriter("writer.debug", true)))
- CliDriver.getCommandCompletor.foreach((e) => reader.addCompletor(e))
+ CliDriver.getCommandCompleter.foreach((e) => reader.addCompleter(e))
val historyDirectory = System.getProperty("user.home")
try {
if (new File(historyDirectory).exists()) {
val historyFile = historyDirectory + File.separator + ".hivehistory"
- reader.setHistory(new History(new File(historyFile)))
+ reader.setHistory(new FileHistory(new File(historyFile)))
} else {
logWarning("WARNING: Directory for Hive history file: " + historyDirectory +
" does not exist. History will not be available during this session.")
@@ -190,10 +191,14 @@ private[hive] object SparkSQLCLIDriver extends Logging {
logWarning(e.getMessage)
}
+ // TODO: missing
+/*
val clientTransportTSocketField = classOf[CliSessionState].getDeclaredField("transport")
clientTransportTSocketField.setAccessible(true)
transport = clientTransportTSocketField.get(sessionState).asInstanceOf[TSocket]
+*/
+ transport = null
var ret = 0
var prefix = ""
@@ -230,6 +235,13 @@ private[hive] object SparkSQLCLIDriver extends Logging {
System.exit(ret)
}
+
+
+ def isRemoteMode(state: CliSessionState): Boolean = {
+ // sessionState.isRemoteMode
+ state.isHiveServerQuery
+ }
+
}
private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
@@ -239,25 +251,33 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
private val console = new SessionState.LogHelper(LOG)
+ private val isRemoteMode = {
+ SparkSQLCLIDriver.isRemoteMode(sessionState)
+ }
+
private val conf: Configuration =
if (sessionState != null) sessionState.getConf else new Configuration()
// Force initializing SparkSQLEnv. This is put here but not object SparkSQLCliDriver
// because the Hive unit tests do not go through the main() code path.
- if (!sessionState.isRemoteMode) {
+ if (!isRemoteMode) {
SparkSQLEnv.init()
+ } else {
+ // Hive 1.2 + not supported in CLI
+ throw new RuntimeException("Remote operations not supported")
}
override def processCmd(cmd: String): Int = {
val cmd_trimmed: String = cmd.trim()
+ val cmd_lower = cmd_trimmed.toLowerCase(Locale.ENGLISH)
val tokens: Array[String] = cmd_trimmed.split("\\s+")
val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
- if (cmd_trimmed.toLowerCase.equals("quit") ||
- cmd_trimmed.toLowerCase.equals("exit") ||
- tokens(0).equalsIgnoreCase("source") ||
+ if (cmd_lower.equals("quit") ||
+ cmd_lower.equals("exit") ||
+ tokens(0).toLowerCase(Locale.ENGLISH).equals("source") ||
cmd_trimmed.startsWith("!") ||
tokens(0).toLowerCase.equals("list") ||
- sessionState.isRemoteMode) {
+ isRemoteMode) {
val start = System.currentTimeMillis()
super.processCmd(cmd)
val end = System.currentTimeMillis()
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
index 41f647d5f8..644165acf7 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
@@ -23,11 +23,12 @@ import javax.security.auth.login.LoginException
import org.apache.commons.logging.Log
import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.shims.ShimLoader
+import org.apache.hadoop.hive.shims.Utils
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hive.service.Service.STATE
import org.apache.hive.service.auth.HiveAuthFactory
import org.apache.hive.service.cli._
+import org.apache.hive.service.server.HiveServer2
import org.apache.hive.service.{AbstractService, Service, ServiceException}
import org.apache.spark.sql.hive.HiveContext
@@ -35,22 +36,22 @@ import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import scala.collection.JavaConversions._
-private[hive] class SparkSQLCLIService(hiveContext: HiveContext)
- extends CLIService
+private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, hiveContext: HiveContext)
+ extends CLIService(hiveServer)
with ReflectedCompositeService {
override def init(hiveConf: HiveConf) {
setSuperField(this, "hiveConf", hiveConf)
- val sparkSqlSessionManager = new SparkSQLSessionManager(hiveContext)
+ val sparkSqlSessionManager = new SparkSQLSessionManager(hiveServer, hiveContext)
setSuperField(this, "sessionManager", sparkSqlSessionManager)
addService(sparkSqlSessionManager)
var sparkServiceUGI: UserGroupInformation = null
- if (ShimLoader.getHadoopShims.isSecurityEnabled) {
+ if (UserGroupInformation.isSecurityEnabled) {
try {
HiveAuthFactory.loginFromKeytab(hiveConf)
- sparkServiceUGI = ShimLoader.getHadoopShims.getUGIForConf(hiveConf)
+ sparkServiceUGI = Utils.getUGI()
setSuperField(this, "serviceUGI", sparkServiceUGI)
} catch {
case e @ (_: IOException | _: LoginException) =>
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
index 2d5ee68002..92ac0ec3fc 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -25,14 +25,15 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.service.cli.SessionHandle
import org.apache.hive.service.cli.session.SessionManager
import org.apache.hive.service.cli.thrift.TProtocolVersion
+import org.apache.hive.service.server.HiveServer2
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
-private[hive] class SparkSQLSessionManager(hiveContext: HiveContext)
- extends SessionManager
+private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext: HiveContext)
+ extends SessionManager(hiveServer)
with ReflectedCompositeService {
private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext)
@@ -55,12 +56,14 @@ private[hive] class SparkSQLSessionManager(hiveContext: HiveContext)
protocol: TProtocolVersion,
username: String,
passwd: String,
+ ipAddress: String,
sessionConf: java.util.Map[String, String],
withImpersonation: Boolean,
delegationToken: String): SessionHandle = {
hiveContext.openSession()
- val sessionHandle = super.openSession(
- protocol, username, passwd, sessionConf, withImpersonation, delegationToken)
+ val sessionHandle =
+ super.openSession(protocol, username, passwd, ipAddress, sessionConf, withImpersonation,
+ delegationToken)
val session = super.getSession(sessionHandle)
HiveThriftServer2.listener.onSessionCreated(
session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername)
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index df80d04b40..121b3e077f 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.concurrent.{Await, Promise}
import scala.sys.process.{Process, ProcessLogger}
+import scala.util.Failure
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.BeforeAndAfter
@@ -37,31 +38,46 @@ import org.apache.spark.util.Utils
class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging {
val warehousePath = Utils.createTempDir()
val metastorePath = Utils.createTempDir()
+ val scratchDirPath = Utils.createTempDir()
before {
- warehousePath.delete()
- metastorePath.delete()
+ warehousePath.delete()
+ metastorePath.delete()
+ scratchDirPath.delete()
}
after {
- warehousePath.delete()
- metastorePath.delete()
+ warehousePath.delete()
+ metastorePath.delete()
+ scratchDirPath.delete()
}
+ /**
+ * Run a CLI operation and expect all the queries and expected answers to be returned.
+ * @param timeout maximum time for the commands to complete
+ * @param extraArgs any extra arguments
+ * @param errorResponses a sequence of strings whose presence in the stdout of the forked process
+ * is taken as an immediate error condition. That is: if a line beginning
+ * with one of these strings is found, fail the test immediately.
+ * The default value is `Seq("Error:")`
+ *
+ * @param queriesAndExpectedAnswers one or more tupes of query + answer
+ */
def runCliWithin(
timeout: FiniteDuration,
- extraArgs: Seq[String] = Seq.empty)(
+ extraArgs: Seq[String] = Seq.empty,
+ errorResponses: Seq[String] = Seq("Error:"))(
queriesAndExpectedAnswers: (String, String)*): Unit = {
val (queries, expectedAnswers) = queriesAndExpectedAnswers.unzip
- val cliScript = "../../bin/spark-sql".split("/").mkString(File.separator)
-
val command = {
+ val cliScript = "../../bin/spark-sql".split("/").mkString(File.separator)
val jdbcUrl = s"jdbc:derby:;databaseName=$metastorePath;create=true"
s"""$cliScript
| --master local
| --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$jdbcUrl
| --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath
+ | --hiveconf ${ConfVars.SCRATCHDIR}=$scratchDirPath
""".stripMargin.split("\\s+").toSeq ++ extraArgs
}
@@ -81,6 +97,12 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging {
if (next == expectedAnswers.size) {
foundAllExpectedAnswers.trySuccess(())
}
+ } else {
+ errorResponses.foreach( r => {
+ if (line.startsWith(r)) {
+ foundAllExpectedAnswers.tryFailure(
+ new RuntimeException(s"Failed with error line '$line'"))
+ }})
}
}
@@ -88,16 +110,44 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging {
val process = (Process(command, None) #< queryStream).run(
ProcessLogger(captureOutput("stdout"), captureOutput("stderr")))
+ // catch the output value
+ class exitCodeCatcher extends Runnable {
+ var exitValue = 0
+
+ override def run(): Unit = {
+ try {
+ exitValue = process.exitValue()
+ } catch {
+ case rte: RuntimeException =>
+ // ignored as it will get triggered when the process gets destroyed
+ logDebug("Ignoring exception while waiting for exit code", rte)
+ }
+ if (exitValue != 0) {
+ // process exited: fail fast
+ foundAllExpectedAnswers.tryFailure(
+ new RuntimeException(s"Failed with exit code $exitValue"))
+ }
+ }
+ }
+ // spin off the code catche thread. No attempt is made to kill this
+ // as it will exit once the launched process terminates.
+ val codeCatcherThread = new Thread(new exitCodeCatcher())
+ codeCatcherThread.start()
+
try {
- Await.result(foundAllExpectedAnswers.future, timeout)
+ Await.ready(foundAllExpectedAnswers.future, timeout)
+ foundAllExpectedAnswers.future.value match {
+ case Some(Failure(t)) => throw t
+ case _ =>
+ }
} catch { case cause: Throwable =>
- logError(
+ val message =
s"""
|=======================
|CliSuite failure output
|=======================
|Spark SQL CLI command line: ${command.mkString(" ")}
- |
+ |Exception: $cause
|Executed query $next "${queries(next)}",
|But failed to capture expected output "${expectedAnswers(next)}" within $timeout.
|
@@ -105,8 +155,9 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging {
|===========================
|End CliSuite failure output
|===========================
- """.stripMargin, cause)
- throw cause
+ """.stripMargin
+ logError(message, cause)
+ fail(message, cause)
} finally {
process.destroy()
}
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 39b31523e0..8374629b5d 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive.thriftserver
import java.io.File
import java.net.URL
-import java.nio.charset.StandardCharsets
import java.sql.{Date, DriverManager, SQLException, Statement}
import scala.collection.mutable.ArrayBuffer
@@ -492,7 +491,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
new File(s"$tempLog4jConf/log4j.properties"),
UTF_8)
- tempLog4jConf + File.pathSeparator + sys.props("java.class.path")
+ tempLog4jConf // + File.pathSeparator + sys.props("java.class.path")
}
s"""$startScript
@@ -508,6 +507,20 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
""".stripMargin.split("\\s+").toSeq
}
+ /**
+ * String to scan for when looking for the the thrift binary endpoint running.
+ * This can change across Hive versions.
+ */
+ val THRIFT_BINARY_SERVICE_LIVE = "Starting ThriftBinaryCLIService on port"
+
+ /**
+ * String to scan for when looking for the the thrift HTTP endpoint running.
+ * This can change across Hive versions.
+ */
+ val THRIFT_HTTP_SERVICE_LIVE = "Started ThriftHttpCLIService in http"
+
+ val SERVER_STARTUP_TIMEOUT = 1.minute
+
private def startThriftServer(port: Int, attempt: Int) = {
warehousePath = Utils.createTempDir()
warehousePath.delete()
@@ -545,23 +558,26 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
// Ensures that the following "tail" command won't fail.
logPath.createNewFile()
+ val successLines = Seq(THRIFT_BINARY_SERVICE_LIVE, THRIFT_HTTP_SERVICE_LIVE)
+ val failureLines = Seq("HiveServer2 is stopped", "Exception in thread", "Error:")
logTailingProcess =
// Using "-n +0" to make sure all lines in the log file are checked.
Process(s"/usr/bin/env tail -n +0 -f ${logPath.getCanonicalPath}").run(ProcessLogger(
(line: String) => {
diagnosisBuffer += line
-
- if (line.contains("ThriftBinaryCLIService listening on") ||
- line.contains("Started ThriftHttpCLIService in http")) {
- serverStarted.trySuccess(())
- } else if (line.contains("HiveServer2 is stopped")) {
- // This log line appears when the server fails to start and terminates gracefully (e.g.
- // because of port contention).
- serverStarted.tryFailure(new RuntimeException("Failed to start HiveThriftServer2"))
- }
+ successLines.foreach(r => {
+ if (line.contains(r)) {
+ serverStarted.trySuccess(())
+ }
+ })
+ failureLines.foreach(r => {
+ if (line.contains(r)) {
+ serverStarted.tryFailure(new RuntimeException(s"Failed with output '$line'"))
+ }
+ })
}))
- Await.result(serverStarted.future, 2.minute)
+ Await.result(serverStarted.future, SERVER_STARTUP_TIMEOUT)
}
private def stopThriftServer(): Unit = {