aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-11-02 15:18:29 -0800
committerMichael Armbrust <michael@databricks.com>2014-11-02 15:18:29 -0800
commitc9f840046f8c45b1137f0289eeb0c980de72ea5e (patch)
treec867149b1f31ab6845ccbb412b73980b1c65b2a2 /sql/hive-thriftserver
parent495a132031ae002c787371f2fd0ba4be2437e7c8 (diff)
downloadspark-c9f840046f8c45b1137f0289eeb0c980de72ea5e.tar.gz
spark-c9f840046f8c45b1137f0289eeb0c980de72ea5e.tar.bz2
spark-c9f840046f8c45b1137f0289eeb0c980de72ea5e.zip
[SPARK-3791][SQL] Provides Spark version and Hive version in HiveThriftServer2
This PR overrides the `GetInfo` Hive Thrift API to provide correct version information. Another property `spark.sql.hive.version` is added to reveal the underlying Hive version. These are generally useful for Spark SQL ODBC driver providers. The Spark version information is extracted from the jar manifest. Also took the chance to remove the `SET -v` hack, which was a workaround for Simba ODBC driver connectivity. TODO - [x] Find a general way to figure out Hive (or even any dependency) version. This [blog post](http://blog.soebes.de/blog/2014/01/02/version-information-into-your-appas-with-maven/) suggests several methods to inspect application version. In the case of Spark, this can be tricky because the chosen method: 1. must applies to both Maven build and SBT build For Maven builds, we can retrieve the version information from the META-INF/maven directory within the assembly jar. But this doesn't work for SBT builds. 2. must not rely on the original jars of dependencies to extract specific dependency version, because Spark uses assembly jar. This implies we can't read Hive version from Hive jar files since standard Spark distribution doesn't include them. 3. should play well with `SPARK_PREPEND_CLASSES` to ease local testing during development. `SPARK_PREPEND_CLASSES` prevents classes to be loaded from the assembly jar, thus we can't locate the jar file and read its manifest. Given these, maybe the only reliable method is to generate a source file containing version information at build time. pwendell Do you have any suggestions from the perspective of the build process? **Update** Hive version is now retrieved from the newly introduced `HiveShim` object. Author: Cheng Lian <lian.cs.zju@gmail.com> Author: Cheng Lian <lian@databricks.com> Closes #2843 from liancheng/get-info and squashes the following commits: a873d0f [Cheng Lian] Updates test case 53f43cd [Cheng Lian] Retrieves underlying Hive verson via HiveShim 1d282b8 [Cheng Lian] Removes the Simba ODBC "SET -v" hack f857fce [Cheng Lian] Overrides Hive GetInfo Thrift API and adds Hive version property
Diffstat (limited to 'sql/hive-thriftserver')
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala14
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala11
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala144
3 files changed, 130 insertions, 39 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
index a78311fc48..ecfb74473e 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.hive.thriftserver
+import java.util.jar.Attributes.Name
+
import scala.collection.JavaConversions._
import java.io.IOException
@@ -29,11 +31,12 @@ import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.hive.service.Service.STATE
import org.apache.hive.service.auth.HiveAuthFactory
-import org.apache.hive.service.cli.CLIService
+import org.apache.hive.service.cli._
import org.apache.hive.service.{AbstractService, Service, ServiceException}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
+import org.apache.spark.util.Utils
private[hive] class SparkSQLCLIService(hiveContext: HiveContext)
extends CLIService
@@ -60,6 +63,15 @@ private[hive] class SparkSQLCLIService(hiveContext: HiveContext)
initCompositeService(hiveConf)
}
+
+ override def getInfo(sessionHandle: SessionHandle, getInfoType: GetInfoType): GetInfoValue = {
+ getInfoType match {
+ case GetInfoType.CLI_SERVER_NAME => new GetInfoValue("Spark SQL")
+ case GetInfoType.CLI_DBMS_NAME => new GetInfoValue("Spark SQL")
+ case GetInfoType.CLI_DBMS_VER => new GetInfoValue(Utils.sparkVersion)
+ case _ => super.getInfo(sessionHandle, getInfoType)
+ }
+ }
}
private[thriftserver] trait ReflectedCompositeService { this: AbstractService =>
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
index 5042586351..89732c939b 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
@@ -17,10 +17,11 @@
package org.apache.spark.sql.hive.thriftserver
+import scala.collection.JavaConversions._
+
import org.apache.spark.scheduler.StatsReportListener
-import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.hive.{HiveShim, HiveContext}
import org.apache.spark.{Logging, SparkConf, SparkContext}
-import scala.collection.JavaConversions._
/** A singleton object for the master program. The slaves should not access this. */
private[hive] object SparkSQLEnv extends Logging {
@@ -31,8 +32,10 @@ private[hive] object SparkSQLEnv extends Logging {
def init() {
if (hiveContext == null) {
- sparkContext = new SparkContext(new SparkConf()
- .setAppName(s"SparkSQL::${java.net.InetAddress.getLocalHost.getHostName}"))
+ val sparkConf = new SparkConf()
+ .setAppName(s"SparkSQL::${java.net.InetAddress.getLocalHost.getHostName}")
+ .set("spark.sql.hive.version", HiveShim.version)
+ sparkContext = new SparkContext(sparkConf)
sparkContext.addSparkListener(new StatsReportListener())
hiveContext = new HiveContext(sparkContext)
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
index c60e8fa5b1..65d910a0c3 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
@@ -30,42 +30,95 @@ import scala.util.Try
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.jdbc.HiveDriver
+import org.apache.hive.service.auth.PlainSaslHelper
+import org.apache.hive.service.cli.GetInfoType
+import org.apache.hive.service.cli.thrift.TCLIService.Client
+import org.apache.hive.service.cli.thrift._
+import org.apache.thrift.protocol.TBinaryProtocol
+import org.apache.thrift.transport.TSocket
import org.scalatest.FunSuite
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.util.getTempFilePath
+import org.apache.spark.sql.hive.HiveShim
/**
* Tests for the HiveThriftServer2 using JDBC.
+ *
+ * NOTE: SPARK_PREPEND_CLASSES is explicitly disabled in this test suite. Assembly jar must be
+ * rebuilt after changing HiveThriftServer2 related code.
*/
class HiveThriftServer2Suite extends FunSuite with Logging {
Class.forName(classOf[HiveDriver].getCanonicalName)
- def startThriftServerWithin(timeout: FiniteDuration = 1.minute)(f: Statement => Unit) {
+ def randomListeningPort = {
+ // Let the system to choose a random available port to avoid collision with other parallel
+ // builds.
+ val socket = new ServerSocket(0)
+ val port = socket.getLocalPort
+ socket.close()
+ port
+ }
+
+ def withJdbcStatement(serverStartTimeout: FiniteDuration = 1.minute)(f: Statement => Unit) {
+ val port = randomListeningPort
+
+ startThriftServer(port, serverStartTimeout) {
+ val jdbcUri = s"jdbc:hive2://${"localhost"}:$port/"
+ val user = System.getProperty("user.name")
+ val connection = DriverManager.getConnection(jdbcUri, user, "")
+ val statement = connection.createStatement()
+
+ try {
+ f(statement)
+ } finally {
+ statement.close()
+ connection.close()
+ }
+ }
+ }
+
+ def withCLIServiceClient(
+ serverStartTimeout: FiniteDuration = 1.minute)(
+ f: ThriftCLIServiceClient => Unit) {
+ val port = randomListeningPort
+
+ startThriftServer(port) {
+ // Transport creation logics below mimics HiveConnection.createBinaryTransport
+ val rawTransport = new TSocket("localhost", port)
+ val user = System.getProperty("user.name")
+ val transport = PlainSaslHelper.getPlainTransport(user, "anonymous", rawTransport)
+ val protocol = new TBinaryProtocol(transport)
+ val client = new ThriftCLIServiceClient(new Client(protocol))
+
+ transport.open()
+
+ try {
+ f(client)
+ } finally {
+ transport.close()
+ }
+ }
+ }
+
+ def startThriftServer(
+ port: Int,
+ serverStartTimeout: FiniteDuration = 1.minute)(
+ f: => Unit) {
val startScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator)
val stopScript = "../../sbin/stop-thriftserver.sh".split("/").mkString(File.separator)
val warehousePath = getTempFilePath("warehouse")
val metastorePath = getTempFilePath("metastore")
val metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true"
- val listeningHost = "localhost"
- val listeningPort = {
- // Let the system to choose a random available port to avoid collision with other parallel
- // builds.
- val socket = new ServerSocket(0)
- val port = socket.getLocalPort
- socket.close()
- port
- }
-
val command =
s"""$startScript
| --master local
| --hiveconf hive.root.logger=INFO,console
| --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri
| --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath
- | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=$listeningHost
- | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$listeningPort
+ | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=${"localhost"}
+ | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$port
""".stripMargin.split("\\s+").toSeq
val serverRunning = Promise[Unit]()
@@ -92,31 +145,25 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
}
}
- // Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths
- Process(command, None, "SPARK_TESTING" -> "0").run(ProcessLogger(
+ val env = Seq(
+ // Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths
+ "SPARK_TESTING" -> "0",
+ // Prevents loading classes out of the assembly jar. Otherwise Utils.sparkVersion can't read
+ // proper version information from the jar manifest.
+ "SPARK_PREPEND_CLASSES" -> "")
+
+ Process(command, None, env: _*).run(ProcessLogger(
captureThriftServerOutput("stdout"),
captureThriftServerOutput("stderr")))
- val jdbcUri = s"jdbc:hive2://$listeningHost:$listeningPort/"
- val user = System.getProperty("user.name")
-
try {
- Await.result(serverRunning.future, timeout)
-
- val connection = DriverManager.getConnection(jdbcUri, user, "")
- val statement = connection.createStatement()
-
- try {
- f(statement)
- } finally {
- statement.close()
- connection.close()
- }
+ Await.result(serverRunning.future, serverStartTimeout)
+ f
} catch {
case cause: Exception =>
cause match {
case _: TimeoutException =>
- logError(s"Failed to start Hive Thrift server within $timeout", cause)
+ logError(s"Failed to start Hive Thrift server within $serverStartTimeout", cause)
case _ =>
}
logError(
@@ -125,8 +172,8 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
|HiveThriftServer2Suite failure output
|=====================================
|HiveThriftServer2 command line: ${command.mkString(" ")}
- |JDBC URI: $jdbcUri
- |User: $user
+ |Binding port: $port
+ |System user: ${System.getProperty("user.name")}
|
|${buffer.mkString("\n")}
|=========================================
@@ -146,7 +193,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
}
test("Test JDBC query execution") {
- startThriftServerWithin() { statement =>
+ withJdbcStatement() { statement =>
val dataFilePath =
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
@@ -168,7 +215,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
}
test("SPARK-3004 regression: result set containing NULL") {
- startThriftServerWithin() { statement =>
+ withJdbcStatement() { statement =>
val dataFilePath =
Thread.currentThread().getContextClassLoader.getResource(
"data/files/small_kv_with_null.txt")
@@ -191,4 +238,33 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
assert(!resultSet.next())
}
}
+
+ test("GetInfo Thrift API") {
+ withCLIServiceClient() { client =>
+ val user = System.getProperty("user.name")
+ val sessionHandle = client.openSession(user, "")
+
+ assertResult("Spark SQL", "Wrong GetInfo(CLI_DBMS_NAME) result") {
+ client.getInfo(sessionHandle, GetInfoType.CLI_DBMS_NAME).getStringValue
+ }
+
+ assertResult("Spark SQL", "Wrong GetInfo(CLI_SERVER_NAME) result") {
+ client.getInfo(sessionHandle, GetInfoType.CLI_SERVER_NAME).getStringValue
+ }
+
+ assertResult(true, "Spark version shouldn't be \"Unknown\"") {
+ val version = client.getInfo(sessionHandle, GetInfoType.CLI_DBMS_VER).getStringValue
+ logInfo(s"Spark version: $version")
+ version != "Unknown"
+ }
+ }
+ }
+
+ test("Checks Hive version") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery("SET spark.sql.hive.version")
+ resultSet.next()
+ assert(resultSet.getString(1) === s"spark.sql.hive.version=${HiveShim.version}")
+ }
+ }
}