diff options
3 files changed, 14 insertions, 47 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 94b1ced990..cef5912c62 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -26,7 +26,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.commons.logging.LogFactory import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService} +import org.apache.hive.service.cli.thrift.ThriftHttpCLIService import org.apache.hive.service.server.HiveServer2 import org.apache.spark.SparkContext @@ -34,7 +34,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart} import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.hive.{HiveSharedState, HiveUtils} +import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab import org.apache.spark.sql.internal.SQLConf @@ -271,7 +271,7 @@ object HiveThriftServer2 extends Logging { private[hive] class HiveThriftServer2(sqlContext: SQLContext) extends HiveServer2 - with ReflectedCompositeService { + with ReflectedCompositeService with Logging { // state is tracked internally so that the server only attempts to shut down if it successfully // started, and then once only. private val started = new AtomicBoolean(false) @@ -281,20 +281,18 @@ private[hive] class HiveThriftServer2(sqlContext: SQLContext) setSuperField(this, "cliService", sparkSqlCliService) addService(sparkSqlCliService) - val thriftCliService = if (isHTTPTransportMode(hiveConf)) { - new ThriftHttpCLIService(sparkSqlCliService) - } else { - new ThriftBinaryCLIService(sparkSqlCliService) + if (isBinaryTransportMode(hiveConf)) { + logWarning("Binary mode is not supported, use HTTP mode instead") } - + val thriftCliService = new ThriftHttpCLIService(sparkSqlCliService) setSuperField(this, "thriftCLIService", thriftCliService) addService(thriftCliService) initCompositeService(hiveConf) } - private def isHTTPTransportMode(hiveConf: HiveConf): Boolean = { + private def isBinaryTransportMode(hiveConf: HiveConf): Boolean = { val transportMode = hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE) - transportMode.toLowerCase(Locale.ENGLISH).equals("http") + transportMode.toLowerCase(Locale.ENGLISH).equals("binary") } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 55a93ea06b..1a894ae857 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -55,8 +55,8 @@ object TestData { val smallKvWithNull = getTestDataFilePath("small_kv_with_null.txt") } -class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { - override def mode: ServerMode.Value = ServerMode.binary +class HiveThriftHttpServerSuite extends HiveThriftJdbcTest { + override def mode: ServerMode.Value = ServerMode.http private def withCLIServiceClient(f: ThriftCLIServiceClient => Unit): Unit = { // Transport creation logic below mimics HiveConnection.createBinaryTransport @@ -70,7 +70,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { try f(client) finally transport.close() } - test("GetInfo Thrift API") { + // TODO: update this test to work in HTTP mode + ignore("GetInfo Thrift API") { withCLIServiceClient { client => val user = System.getProperty("user.name") val sessionHandle = client.openSession(user, "") @@ -566,7 +567,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } class SingleSessionSuite extends HiveThriftJdbcTest { - override def mode: ServerMode.Value = ServerMode.binary + override def mode: ServerMode.Value = ServerMode.http override protected def extraConf: Seq[String] = "--conf spark.sql.hive.thriftServer.singleSession=true" :: Nil @@ -616,38 +617,6 @@ class SingleSessionSuite extends HiveThriftJdbcTest { } } -class HiveThriftHttpServerSuite extends HiveThriftJdbcTest { - override def mode: ServerMode.Value = ServerMode.http - - test("JDBC query execution") { - withJdbcStatement { statement => - val queries = Seq( - "SET spark.sql.shuffle.partitions=3", - "DROP TABLE IF EXISTS test", - "CREATE TABLE test(key INT, val STRING)", - s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test", - "CACHE TABLE test") - - queries.foreach(statement.execute) - - assertResult(5, "Row count mismatch") { - val resultSet = statement.executeQuery("SELECT COUNT(*) FROM test") - resultSet.next() - resultSet.getInt(1) - } - } - } - - test("Checks Hive version") { - withJdbcStatement { statement => - val resultSet = statement.executeQuery("SET spark.sql.hive.version") - resultSet.next() - assert(resultSet.getString(1) === "spark.sql.hive.version") - assert(resultSet.getString(2) === HiveUtils.hiveExecutionVersion) - } - } -} - object ServerMode extends Enumeration { val binary, http = Value } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala index bf431cd6b0..b6b9de1ba6 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala @@ -36,7 +36,7 @@ class UISeleniumSuite implicit var webDriver: WebDriver = _ var server: HiveThriftServer2 = _ val uiPort = 20000 + Random.nextInt(10000) - override def mode: ServerMode.Value = ServerMode.binary + override def mode: ServerMode.Value = ServerMode.http override def beforeAll(): Unit = { webDriver = new HtmlUnitDriver { |