aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEgor Pakhomov <egor@anchorfree.com>2016-06-15 14:25:55 -0700
committerDavies Liu <davies.liu@gmail.com>2016-06-15 14:29:32 -0700
commit049e639fc22100af3cbe2ffcf3a2fd3ae2461373 (patch)
tree417e222494e51a17e47e6f245924a7ed3e30f4f3
parent09925735b5e53db61ed12abae58864670a3a5f98 (diff)
downloadspark-049e639fc22100af3cbe2ffcf3a2fd3ae2461373.tar.gz
spark-049e639fc22100af3cbe2ffcf3a2fd3ae2461373.tar.bz2
spark-049e639fc22100af3cbe2ffcf3a2fd3ae2461373.zip
[SPARK-15934] [SQL] Return binary mode in ThriftServer
Returning binary mode to ThriftServer for backward compatibility. Tested with Squirrel and Tableau. Author: Egor Pakhomov <egor@anchorfree.com> Closes #13667 from epahomov/SPARK-15095-2.0.
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala18
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala41
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala2
3 files changed, 47 insertions, 14 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index de70fdc14e..e3258d858f 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hive.service.cli.thrift.ThriftHttpCLIService
+import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService}
import org.apache.hive.service.server.HiveServer2
import org.apache.spark.SparkContext
@@ -34,7 +34,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart}
import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.hive.HiveUtils
+import org.apache.spark.sql.hive.{HiveSharedState, HiveUtils}
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab
import org.apache.spark.sql.internal.SQLConf
@@ -271,7 +271,7 @@ object HiveThriftServer2 extends Logging {
private[hive] class HiveThriftServer2(sqlContext: SQLContext)
extends HiveServer2
- with ReflectedCompositeService with Logging {
+ with ReflectedCompositeService {
// state is tracked internally so that the server only attempts to shut down if it successfully
// started, and then once only.
private val started = new AtomicBoolean(false)
@@ -281,18 +281,20 @@ private[hive] class HiveThriftServer2(sqlContext: SQLContext)
setSuperField(this, "cliService", sparkSqlCliService)
addService(sparkSqlCliService)
- if (isBinaryTransportMode(hiveConf)) {
- logWarning("Binary mode is not supported, use HTTP mode instead")
+ val thriftCliService = if (isHTTPTransportMode(hiveConf)) {
+ new ThriftHttpCLIService(sparkSqlCliService)
+ } else {
+ new ThriftBinaryCLIService(sparkSqlCliService)
}
- val thriftCliService = new ThriftHttpCLIService(sparkSqlCliService)
+
setSuperField(this, "thriftCLIService", thriftCliService)
addService(thriftCliService)
initCompositeService(hiveConf)
}
- private def isBinaryTransportMode(hiveConf: HiveConf): Boolean = {
+ private def isHTTPTransportMode(hiveConf: HiveConf): Boolean = {
val transportMode = hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE)
- transportMode.toLowerCase(Locale.ENGLISH).equals("binary")
+ transportMode.toLowerCase(Locale.ENGLISH).equals("http")
}
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index b3f4944c91..e388c2a082 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -55,8 +55,8 @@ object TestData {
val smallKvWithNull = getTestDataFilePath("small_kv_with_null.txt")
}
-class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
- override def mode: ServerMode.Value = ServerMode.http
+class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
+ override def mode: ServerMode.Value = ServerMode.binary
private def withCLIServiceClient(f: ThriftCLIServiceClient => Unit): Unit = {
// Transport creation logic below mimics HiveConnection.createBinaryTransport
@@ -70,8 +70,7 @@ class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
try f(client) finally transport.close()
}
- // TODO: update this test to work in HTTP mode
- ignore("GetInfo Thrift API") {
+ test("GetInfo Thrift API") {
withCLIServiceClient { client =>
val user = System.getProperty("user.name")
val sessionHandle = client.openSession(user, "")
@@ -567,7 +566,7 @@ class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
}
class SingleSessionSuite extends HiveThriftJdbcTest {
- override def mode: ServerMode.Value = ServerMode.http
+ override def mode: ServerMode.Value = ServerMode.binary
override protected def extraConf: Seq[String] =
"--conf spark.sql.hive.thriftServer.singleSession=true" :: Nil
@@ -617,6 +616,38 @@ class SingleSessionSuite extends HiveThriftJdbcTest {
}
}
+class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
+ override def mode: ServerMode.Value = ServerMode.http
+
+ test("JDBC query execution") {
+ withJdbcStatement { statement =>
+ val queries = Seq(
+ "SET spark.sql.shuffle.partitions=3",
+ "DROP TABLE IF EXISTS test",
+ "CREATE TABLE test(key INT, val STRING)",
+ s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test",
+ "CACHE TABLE test")
+
+ queries.foreach(statement.execute)
+
+ assertResult(5, "Row count mismatch") {
+ val resultSet = statement.executeQuery("SELECT COUNT(*) FROM test")
+ resultSet.next()
+ resultSet.getInt(1)
+ }
+ }
+ }
+
+ test("Checks Hive version") {
+ withJdbcStatement { statement =>
+ val resultSet = statement.executeQuery("SET spark.sql.hive.version")
+ resultSet.next()
+ assert(resultSet.getString(1) === "spark.sql.hive.version")
+ assert(resultSet.getString(2) === HiveUtils.hiveExecutionVersion)
+ }
+ }
+}
+
object ServerMode extends Enumeration {
val binary, http = Value
}
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala
index b6b9de1ba6..bf431cd6b0 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala
@@ -36,7 +36,7 @@ class UISeleniumSuite
implicit var webDriver: WebDriver = _
var server: HiveThriftServer2 = _
val uiPort = 20000 + Random.nextInt(10000)
- override def mode: ServerMode.Value = ServerMode.http
+ override def mode: ServerMode.Value = ServerMode.binary
override def beforeAll(): Unit = {
webDriver = new HtmlUnitDriver {