diff options
Diffstat (limited to 'sql/hive-thriftserver')
2 files changed, 81 insertions, 17 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index bd4e99492b..c5b73234fa 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql.hive.thriftserver import org.apache.commons.logging.LogFactory import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService +import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService} import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor} import org.apache.spark.Logging @@ -85,10 +86,22 @@ private[hive] class HiveThriftServer2(hiveContext: HiveContext) setSuperField(this, "cliService", sparkSqlCliService) addService(sparkSqlCliService) - val thriftCliService = new ThriftBinaryCLIService(sparkSqlCliService) - setSuperField(this, "thriftCLIService", thriftCliService) - addService(thriftCliService) + if (isHTTPTransportMode(hiveConf)) { + val thriftCliService = new ThriftHttpCLIService(sparkSqlCliService) + setSuperField(this, "thriftCLIService", thriftCliService) + addService(thriftCliService) + } else { + val thriftCliService = new ThriftBinaryCLIService(sparkSqlCliService) + setSuperField(this, "thriftCLIService", thriftCliService) + addService(thriftCliService) + } initCompositeService(hiveConf) } + + private def isHTTPTransportMode(hiveConf: HiveConf): Boolean = { + val transportMode: String = hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE) + transportMode.equalsIgnoreCase("http") + } + } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala index 23d12cbff3..94d5ed4f1d 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala @@ -70,11 +70,20 @@ class HiveThriftServer2Suite extends FunSuite with Logging { port } - def withJdbcStatement(serverStartTimeout: FiniteDuration = 1.minute)(f: Statement => Unit) { + def withJdbcStatement( + serverStartTimeout: FiniteDuration = 1.minute, + httpMode: Boolean = false)( + f: Statement => Unit) { val port = randomListeningPort - startThriftServer(port, serverStartTimeout) { - val jdbcUri = s"jdbc:hive2://${"localhost"}:$port/" + startThriftServer(port, serverStartTimeout, httpMode) { + val jdbcUri = if (httpMode) { + s"jdbc:hive2://${"localhost"}:$port/" + + "default?hive.server2.transport.mode=http;hive.server2.thrift.http.path=cliservice" + } else { + s"jdbc:hive2://${"localhost"}:$port/" + } + val user = System.getProperty("user.name") val connection = DriverManager.getConnection(jdbcUri, user, "") val statement = connection.createStatement() @@ -113,7 +122,8 @@ class HiveThriftServer2Suite extends FunSuite with Logging { def startThriftServer( port: Int, - serverStartTimeout: FiniteDuration = 1.minute)( + serverStartTimeout: FiniteDuration = 1.minute, + httpMode: Boolean = false)( f: => Unit) { val startScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator) val stopScript = "../../sbin/stop-thriftserver.sh".split("/").mkString(File.separator) @@ -121,15 +131,28 @@ class HiveThriftServer2Suite extends FunSuite with Logging { val warehousePath = getTempFilePath("warehouse") val metastorePath = getTempFilePath("metastore") val metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true" + val command = - s"""$startScript - | --master local - | --hiveconf hive.root.logger=INFO,console - | --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri - | --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath - | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=${"localhost"} - | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$port - """.stripMargin.split("\\s+").toSeq + if (httpMode) { + s"""$startScript + | --master local + | --hiveconf hive.root.logger=INFO,console + | --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri + | --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath + | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=localhost + | --hiveconf ${ConfVars.HIVE_SERVER2_TRANSPORT_MODE}=http + | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT}=$port + """.stripMargin.split("\\s+").toSeq + } else { + s"""$startScript + | --master local + | --hiveconf hive.root.logger=INFO,console + | --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri + | --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath + | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=localhost + | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$port + """.stripMargin.split("\\s+").toSeq + } val serverRunning = Promise[Unit]() val buffer = new ArrayBuffer[String]() @@ -140,7 +163,8 @@ class HiveThriftServer2Suite extends FunSuite with Logging { def captureLogOutput(line: String): Unit = { buffer += line - if (line.contains("ThriftBinaryCLIService listening on")) { + if (line.contains("ThriftBinaryCLIService listening on") || + line.contains("Started ThriftHttpCLIService in http")) { serverRunning.success(()) } } @@ -217,6 +241,25 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } } + test("Test JDBC query execution in Http Mode") { + withJdbcStatement(httpMode = true) { statement => + val queries = Seq( + "SET spark.sql.shuffle.partitions=3", + "DROP TABLE IF EXISTS test", + "CREATE TABLE test(key INT, val STRING)", + s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test", + "CACHE TABLE test") + + queries.foreach(statement.execute) + + assertResult(5, "Row count mismatch") { + val resultSet = statement.executeQuery("SELECT COUNT(*) FROM test") + resultSet.next() + resultSet.getInt(1) + } + } + } + test("SPARK-3004 regression: result set containing NULL") { withJdbcStatement() { statement => val queries = Seq( @@ -267,6 +310,14 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } } + test("Checks Hive version in Http Mode") { + withJdbcStatement(httpMode = true) { statement => + val resultSet = statement.executeQuery("SET spark.sql.hive.version") + resultSet.next() + assert(resultSet.getString(1) === s"spark.sql.hive.version=${HiveShim.version}") + } + } + test("SPARK-4292 regression: result set iterator issue") { withJdbcStatement() { statement => val queries = Seq( |