From 17688d14299f18a93591818ae5fef69e9dc20eb5 Mon Sep 17 00:00:00 2001 From: Judy Nash Date: Tue, 16 Dec 2014 12:37:26 -0800 Subject: [SQL] SPARK-4700: Add HTTP protocol spark thrift server Add HTTP protocol support and test cases to spark thrift server, so users can deploy thrift server in both TCP and http mode. Author: Judy Nash Author: judynash Closes #3672 from judynash/master and squashes the following commits: 526315d [Judy Nash] correct spacing on startThriftServer method 31a6520 [Judy Nash] fix code style issues and update sql programming guide format issue 47bf87e [Judy Nash] modify withJdbcStatement method definition to meet less than 100 line length 2e9c11c [Judy Nash] add thrift server in http mode documentation on sql programming guide 1cbd305 [Judy Nash] Merge remote-tracking branch 'upstream/master' 2b1d312 [Judy Nash] updated http thrift server support based on feedback 377532c [judynash] add HTTP protocol spark thrift server --- .../sql/hive/thriftserver/HiveThriftServer2.scala | 21 ++++-- .../hive/thriftserver/HiveThriftServer2Suite.scala | 77 ++++++++++++++++++---- 2 files changed, 81 insertions(+), 17 deletions(-) (limited to 'sql/hive-thriftserver') diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index bd4e99492b..c5b73234fa 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql.hive.thriftserver import org.apache.commons.logging.LogFactory import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService +import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService} import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor} import org.apache.spark.Logging @@ -85,10 +86,22 @@ private[hive] class HiveThriftServer2(hiveContext: HiveContext) setSuperField(this, "cliService", sparkSqlCliService) addService(sparkSqlCliService) - val thriftCliService = new ThriftBinaryCLIService(sparkSqlCliService) - setSuperField(this, "thriftCLIService", thriftCliService) - addService(thriftCliService) + if (isHTTPTransportMode(hiveConf)) { + val thriftCliService = new ThriftHttpCLIService(sparkSqlCliService) + setSuperField(this, "thriftCLIService", thriftCliService) + addService(thriftCliService) + } else { + val thriftCliService = new ThriftBinaryCLIService(sparkSqlCliService) + setSuperField(this, "thriftCLIService", thriftCliService) + addService(thriftCliService) + } initCompositeService(hiveConf) } + + private def isHTTPTransportMode(hiveConf: HiveConf): Boolean = { + val transportMode: String = hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE) + transportMode.equalsIgnoreCase("http") + } + } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala index 23d12cbff3..94d5ed4f1d 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala @@ -70,11 +70,20 @@ class HiveThriftServer2Suite extends FunSuite with Logging { port } - def withJdbcStatement(serverStartTimeout: FiniteDuration = 1.minute)(f: Statement => Unit) { + def withJdbcStatement( + serverStartTimeout: FiniteDuration = 1.minute, + httpMode: Boolean = false)( + f: Statement => Unit) { val port = randomListeningPort - startThriftServer(port, serverStartTimeout) { - val jdbcUri = s"jdbc:hive2://${"localhost"}:$port/" + startThriftServer(port, serverStartTimeout, httpMode) { + val jdbcUri = if (httpMode) { + s"jdbc:hive2://${"localhost"}:$port/" + + "default?hive.server2.transport.mode=http;hive.server2.thrift.http.path=cliservice" + } else { + s"jdbc:hive2://${"localhost"}:$port/" + } + val user = System.getProperty("user.name") val connection = DriverManager.getConnection(jdbcUri, user, "") val statement = connection.createStatement() @@ -113,7 +122,8 @@ class HiveThriftServer2Suite extends FunSuite with Logging { def startThriftServer( port: Int, - serverStartTimeout: FiniteDuration = 1.minute)( + serverStartTimeout: FiniteDuration = 1.minute, + httpMode: Boolean = false)( f: => Unit) { val startScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator) val stopScript = "../../sbin/stop-thriftserver.sh".split("/").mkString(File.separator) @@ -121,15 +131,28 @@ class HiveThriftServer2Suite extends FunSuite with Logging { val warehousePath = getTempFilePath("warehouse") val metastorePath = getTempFilePath("metastore") val metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true" + val command = - s"""$startScript - | --master local - | --hiveconf hive.root.logger=INFO,console - | --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri - | --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath - | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=${"localhost"} - | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$port - """.stripMargin.split("\\s+").toSeq + if (httpMode) { + s"""$startScript + | --master local + | --hiveconf hive.root.logger=INFO,console + | --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri + | --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath + | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=localhost + | --hiveconf ${ConfVars.HIVE_SERVER2_TRANSPORT_MODE}=http + | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT}=$port + """.stripMargin.split("\\s+").toSeq + } else { + s"""$startScript + | --master local + | --hiveconf hive.root.logger=INFO,console + | --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri + | --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath + | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=localhost + | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$port + """.stripMargin.split("\\s+").toSeq + } val serverRunning = Promise[Unit]() val buffer = new ArrayBuffer[String]() @@ -140,7 +163,8 @@ class HiveThriftServer2Suite extends FunSuite with Logging { def captureLogOutput(line: String): Unit = { buffer += line - if (line.contains("ThriftBinaryCLIService listening on")) { + if (line.contains("ThriftBinaryCLIService listening on") || + line.contains("Started ThriftHttpCLIService in http")) { serverRunning.success(()) } } @@ -217,6 +241,25 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } } + test("Test JDBC query execution in Http Mode") { + withJdbcStatement(httpMode = true) { statement => + val queries = Seq( + "SET spark.sql.shuffle.partitions=3", + "DROP TABLE IF EXISTS test", + "CREATE TABLE test(key INT, val STRING)", + s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test", + "CACHE TABLE test") + + queries.foreach(statement.execute) + + assertResult(5, "Row count mismatch") { + val resultSet = statement.executeQuery("SELECT COUNT(*) FROM test") + resultSet.next() + resultSet.getInt(1) + } + } + } + test("SPARK-3004 regression: result set containing NULL") { withJdbcStatement() { statement => val queries = Seq( @@ -267,6 +310,14 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } } + test("Checks Hive version in Http Mode") { + withJdbcStatement(httpMode = true) { statement => + val resultSet = statement.executeQuery("SET spark.sql.hive.version") + resultSet.next() + assert(resultSet.getString(1) === s"spark.sql.hive.version=${HiveShim.version}") + } + } + test("SPARK-4292 regression: result set iterator issue") { withJdbcStatement() { statement => val queries = Seq( -- cgit v1.2.3