diff options
author | Judy Nash <judynash@microsoft.com> | 2014-12-16 12:37:26 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-12-16 12:37:26 -0800 |
commit | 17688d14299f18a93591818ae5fef69e9dc20eb5 (patch) | |
tree | 1e14688745ff26dd427d229c78d0caf7d40de7ba | |
parent | d12c0711faa3d4333513fcbbbee4868bcb784a26 (diff) | |
download | spark-17688d14299f18a93591818ae5fef69e9dc20eb5.tar.gz spark-17688d14299f18a93591818ae5fef69e9dc20eb5.tar.bz2 spark-17688d14299f18a93591818ae5fef69e9dc20eb5.zip |
[SQL] SPARK-4700: Add HTTP protocol spark thrift server
Add HTTP protocol support and test cases to spark thrift server, so users can deploy thrift server in both TCP and http mode.
Author: Judy Nash <judynash@microsoft.com>
Author: judynash <judynash@microsoft.com>
Closes #3672 from judynash/master and squashes the following commits:
526315d [Judy Nash] correct spacing on startThriftServer method
31a6520 [Judy Nash] fix code style issues and update sql programming guide format issue
47bf87e [Judy Nash] modify withJdbcStatement method definition to meet less than 100 line length
2e9c11c [Judy Nash] add thrift server in http mode documentation on sql programming guide
1cbd305 [Judy Nash] Merge remote-tracking branch 'upstream/master'
2b1d312 [Judy Nash] updated http thrift server support based on feedback
377532c [judynash] add HTTP protocol spark thrift server
3 files changed, 93 insertions, 17 deletions
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index be284fbe21..ad51b9cf41 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -938,6 +938,18 @@ Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`. You may also use the beeline script that comes with Hive. +Thrift JDBC server also supports sending thrift RPC messages over HTTP transport. +Use the following setting to enable HTTP mode as system property or in `hive-site.xml` file in `conf/`: + + hive.server2.transport.mode - Set this to value: http + hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001 + hive.server2.http.endpoint - HTTP endpoint; default is cliservice + +To test, use beeline to connect to the JDBC/ODBC server in http mode with: + + beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint> + + ## Running the Spark SQL CLI The Spark SQL CLI is a convenient tool to run the Hive metastore service in local mode and execute diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index bd4e99492b..c5b73234fa 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql.hive.thriftserver import org.apache.commons.logging.LogFactory import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService +import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService} import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor} import org.apache.spark.Logging @@ -85,10 +86,22 @@ private[hive] class HiveThriftServer2(hiveContext: HiveContext) setSuperField(this, "cliService", sparkSqlCliService) addService(sparkSqlCliService) - val thriftCliService = new ThriftBinaryCLIService(sparkSqlCliService) - setSuperField(this, "thriftCLIService", thriftCliService) - addService(thriftCliService) + if (isHTTPTransportMode(hiveConf)) { + val thriftCliService = new ThriftHttpCLIService(sparkSqlCliService) + setSuperField(this, "thriftCLIService", thriftCliService) + addService(thriftCliService) + } else { + val thriftCliService = new ThriftBinaryCLIService(sparkSqlCliService) + setSuperField(this, "thriftCLIService", thriftCliService) + addService(thriftCliService) + } initCompositeService(hiveConf) } + + private def isHTTPTransportMode(hiveConf: HiveConf): Boolean = { + val transportMode: String = hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE) + transportMode.equalsIgnoreCase("http") + } + } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala index 23d12cbff3..94d5ed4f1d 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala @@ -70,11 +70,20 @@ class HiveThriftServer2Suite extends FunSuite with Logging { port } - def withJdbcStatement(serverStartTimeout: FiniteDuration = 1.minute)(f: Statement => Unit) { + def withJdbcStatement( + serverStartTimeout: FiniteDuration = 1.minute, + httpMode: Boolean = false)( + f: Statement => Unit) { val port = randomListeningPort - startThriftServer(port, serverStartTimeout) { - val jdbcUri = s"jdbc:hive2://${"localhost"}:$port/" + startThriftServer(port, serverStartTimeout, httpMode) { + val jdbcUri = if (httpMode) { + s"jdbc:hive2://${"localhost"}:$port/" + + "default?hive.server2.transport.mode=http;hive.server2.thrift.http.path=cliservice" + } else { + s"jdbc:hive2://${"localhost"}:$port/" + } + val user = System.getProperty("user.name") val connection = DriverManager.getConnection(jdbcUri, user, "") val statement = connection.createStatement() @@ -113,7 +122,8 @@ class HiveThriftServer2Suite extends FunSuite with Logging { def startThriftServer( port: Int, - serverStartTimeout: FiniteDuration = 1.minute)( + serverStartTimeout: FiniteDuration = 1.minute, + httpMode: Boolean = false)( f: => Unit) { val startScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator) val stopScript = "../../sbin/stop-thriftserver.sh".split("/").mkString(File.separator) @@ -121,15 +131,28 @@ class HiveThriftServer2Suite extends FunSuite with Logging { val warehousePath = getTempFilePath("warehouse") val metastorePath = getTempFilePath("metastore") val metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true" + val command = - s"""$startScript - | --master local - | --hiveconf hive.root.logger=INFO,console - | --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri - | --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath - | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=${"localhost"} - | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$port - """.stripMargin.split("\\s+").toSeq + if (httpMode) { + s"""$startScript + | --master local + | --hiveconf hive.root.logger=INFO,console + | --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri + | --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath + | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=localhost + | --hiveconf ${ConfVars.HIVE_SERVER2_TRANSPORT_MODE}=http + | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT}=$port + """.stripMargin.split("\\s+").toSeq + } else { + s"""$startScript + | --master local + | --hiveconf hive.root.logger=INFO,console + | --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri + | --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath + | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=localhost + | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$port + """.stripMargin.split("\\s+").toSeq + } val serverRunning = Promise[Unit]() val buffer = new ArrayBuffer[String]() @@ -140,7 +163,8 @@ class HiveThriftServer2Suite extends FunSuite with Logging { def captureLogOutput(line: String): Unit = { buffer += line - if (line.contains("ThriftBinaryCLIService listening on")) { + if (line.contains("ThriftBinaryCLIService listening on") || + line.contains("Started ThriftHttpCLIService in http")) { serverRunning.success(()) } } @@ -217,6 +241,25 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } } + test("Test JDBC query execution in Http Mode") { + withJdbcStatement(httpMode = true) { statement => + val queries = Seq( + "SET spark.sql.shuffle.partitions=3", + "DROP TABLE IF EXISTS test", + "CREATE TABLE test(key INT, val STRING)", + s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test", + "CACHE TABLE test") + + queries.foreach(statement.execute) + + assertResult(5, "Row count mismatch") { + val resultSet = statement.executeQuery("SELECT COUNT(*) FROM test") + resultSet.next() + resultSet.getInt(1) + } + } + } + test("SPARK-3004 regression: result set containing NULL") { withJdbcStatement() { statement => val queries = Seq( @@ -267,6 +310,14 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } } + test("Checks Hive version in Http Mode") { + withJdbcStatement(httpMode = true) { statement => + val resultSet = statement.executeQuery("SET spark.sql.hive.version") + resultSet.next() + assert(resultSet.getString(1) === s"spark.sql.hive.version=${HiveShim.version}") + } + } + test("SPARK-4292 regression: result set iterator issue") { withJdbcStatement() { statement => val queries = Seq( |