diff options
author | gatorsmile <gatorsmile@gmail.com> | 2016-12-28 10:16:22 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-12-28 10:16:22 +0800 |
commit | 5ac62043cf6dc12c986e5ae9d9661fd439f8b5b9 (patch) | |
tree | 6307b2701a86255ed3d34982e99b0ad53a3a1ccb /sql/hive-thriftserver | |
parent | 28ab0ec49fa9bac1c4a246a44a5d1ad163660e1a (diff) | |
download | spark-5ac62043cf6dc12c986e5ae9d9661fd439f8b5b9.tar.gz spark-5ac62043cf6dc12c986e5ae9d9661fd439f8b5b9.tar.bz2 spark-5ac62043cf6dc12c986e5ae9d9661fd439f8b5b9.zip |
[SPARK-18992][SQL] Move spark.sql.hive.thriftServer.singleSession to SQLConf
### What changes were proposed in this pull request?
Since `spark.sql.hive.thriftServer.singleSession` is a configuration of SQL component, this conf can be moved from `SparkConf` to `StaticSQLConf`.
When we introduced `spark.sql.hive.thriftServer.singleSession`, all the SQL configuration are session specific. They can be modified in different sessions.
In Spark 2.1, static SQL configuration is added. It is a perfect fit for `spark.sql.hive.thriftServer.singleSession`. Previously, we did the same move for `spark.sql.warehouse.dir` from `SparkConf` to `StaticSQLConf`
### How was this patch tested?
Added test cases in HiveThriftServer2Suites.scala
Author: gatorsmile <gatorsmile@gmail.com>
Closes #16392 from gatorsmile/hiveThriftServerSingleSession.
Diffstat (limited to 'sql/hive-thriftserver')
3 files changed, 77 insertions, 45 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index 226b7e175a..7adaafe5ad 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -28,7 +28,7 @@ import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.hive.service.server.HiveServer2 import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.hive.{HiveSessionState, HiveUtils} +import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager @@ -72,8 +72,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: val session = super.getSession(sessionHandle) HiveThriftServer2.listener.onSessionCreated( session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername) - val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] - val ctx = if (sessionState.hiveThriftServerSingleSession) { + val ctx = if (sqlContext.conf.hiveThriftServerSingleSession) { sqlContext } else { sqlContext.newSession() diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 5d20ec958c..b6215bde6b 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -98,9 +98,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { val user = System.getProperty("user.name") val sessionHandle = client.openSession(user, "") - withJdbcStatement { statement => + withJdbcStatement("test_16563") { statement => val queries = Seq( - "DROP TABLE IF EXISTS test_16563", "CREATE TABLE test_16563(key INT, val STRING)", s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_16563") @@ -134,16 +133,14 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { rows_first.numRows() } - statement.executeQuery("DROP TABLE IF EXISTS test_16563") } } } test("JDBC query execution") { - withJdbcStatement { statement => + withJdbcStatement("test") { statement => val queries = Seq( "SET spark.sql.shuffle.partitions=3", - "DROP TABLE IF EXISTS test", "CREATE TABLE test(key INT, val STRING)", s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test", "CACHE TABLE test") @@ -159,7 +156,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } test("Checks Hive version") { - withJdbcStatement { statement => + withJdbcStatement() { statement => val resultSet = statement.executeQuery("SET spark.sql.hive.version") resultSet.next() assert(resultSet.getString(1) === "spark.sql.hive.version") @@ -168,9 +165,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } test("SPARK-3004 regression: result set containing NULL") { - withJdbcStatement { statement => + withJdbcStatement("test_null") { statement => val queries = Seq( - "DROP TABLE IF EXISTS test_null", "CREATE TABLE test_null(key INT, val STRING)", s"LOAD DATA LOCAL INPATH '${TestData.smallKvWithNull}' OVERWRITE INTO TABLE test_null") @@ -189,9 +185,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } test("SPARK-4292 regression: result set iterator issue") { - withJdbcStatement { statement => + withJdbcStatement("test_4292") { statement => val queries = Seq( - "DROP TABLE IF EXISTS test_4292", "CREATE TABLE test_4292(key INT, val STRING)", s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_4292") @@ -203,15 +198,12 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { resultSet.next() assert(resultSet.getInt(1) === key) } - - statement.executeQuery("DROP TABLE IF EXISTS test_4292") } } test("SPARK-4309 regression: Date type support") { - withJdbcStatement { statement => + withJdbcStatement("test_date") { statement => val queries = Seq( - "DROP TABLE IF EXISTS test_date", "CREATE TABLE test_date(key INT, value STRING)", s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_date") @@ -227,9 +219,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } test("SPARK-4407 regression: Complex type support") { - withJdbcStatement { statement => + withJdbcStatement("test_map") { statement => val queries = Seq( - "DROP TABLE IF EXISTS test_map", "CREATE TABLE test_map(key INT, value STRING)", s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map") @@ -251,9 +242,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } test("SPARK-12143 regression: Binary type support") { - withJdbcStatement { statement => + withJdbcStatement("test_binary") { statement => val queries = Seq( - "DROP TABLE IF EXISTS test_binary", "CREATE TABLE test_binary(key INT, value STRING)", s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_binary") @@ -262,7 +252,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { val expected: Array[Byte] = "val_238".getBytes assertResult(expected) { val resultSet = statement.executeQuery( - "SELECT CAST(value as BINARY) FROM test_date LIMIT 1") + "SELECT CAST(value as BINARY) FROM test_binary LIMIT 1") resultSet.next() resultSet.getObject(1) } @@ -275,12 +265,11 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { var defaultV2: String = null var data: ArrayBuffer[Int] = null - withMultipleConnectionJdbcStatement( + withMultipleConnectionJdbcStatement("test_map")( // create table { statement => val queries = Seq( - "DROP TABLE IF EXISTS test_map", "CREATE TABLE test_map(key INT, value STRING)", s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map", "CACHE TABLE test_table AS SELECT key FROM test_map ORDER BY key DESC", @@ -418,9 +407,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { // This test often hangs and then times out, leaving the hanging processes. // Let's ignore it and improve the test. ignore("test jdbc cancel") { - withJdbcStatement { statement => + withJdbcStatement("test_map") { statement => val queries = Seq( - "DROP TABLE IF EXISTS test_map", "CREATE TABLE test_map(key INT, value STRING)", s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map") @@ -478,7 +466,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } test("test add jar") { - withMultipleConnectionJdbcStatement( + withMultipleConnectionJdbcStatement("smallKV", "addJar")( { statement => val jarFile = @@ -492,10 +480,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { { statement => val queries = Seq( - "DROP TABLE IF EXISTS smallKV", "CREATE TABLE smallKV(key INT, val STRING)", s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE smallKV", - "DROP TABLE IF EXISTS addJar", """CREATE TABLE addJar(key string) |ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' """.stripMargin) @@ -524,15 +510,12 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { expectedResult.close() assert(expectedResultBuffer === actualResultBuffer) - - statement.executeQuery("DROP TABLE IF EXISTS addJar") - statement.executeQuery("DROP TABLE IF EXISTS smallKV") } ) } test("Checks Hive version via SET -v") { - withJdbcStatement { statement => + withJdbcStatement() { statement => val resultSet = statement.executeQuery("SET -v") val conf = mutable.Map.empty[String, String] @@ -545,7 +528,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } test("Checks Hive version via SET") { - withJdbcStatement { statement => + withJdbcStatement() { statement => val resultSet = statement.executeQuery("SET") val conf = mutable.Map.empty[String, String] @@ -558,7 +541,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } test("SPARK-11595 ADD JAR with input path having URL scheme") { - withJdbcStatement { statement => + withJdbcStatement("test_udtf") { statement => try { val jarPath = "../hive/src/test/resources/TestUDTF.jar" val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath" @@ -586,7 +569,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { val dataPath = "../hive/src/test/resources/data/files/kv1.txt" Seq( - s"CREATE TABLE test_udtf(key INT, value STRING)", + "CREATE TABLE test_udtf(key INT, value STRING)", s"LOAD DATA LOCAL INPATH '$dataPath' OVERWRITE INTO TABLE test_udtf" ).foreach(statement.execute) @@ -624,8 +607,8 @@ class SingleSessionSuite extends HiveThriftJdbcTest { override protected def extraConf: Seq[String] = "--conf spark.sql.hive.thriftServer.singleSession=true" :: Nil - test("test single session") { - withMultipleConnectionJdbcStatement( + test("share the temporary functions across JDBC connections") { + withMultipleConnectionJdbcStatement()( { statement => val jarPath = "../hive/src/test/resources/TestUDTF.jar" val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath" @@ -667,16 +650,63 @@ class SingleSessionSuite extends HiveThriftJdbcTest { } ) } + + test("unable to changing spark.sql.hive.thriftServer.singleSession using JDBC connections") { + withJdbcStatement() { statement => + // JDBC connections are not able to set the conf spark.sql.hive.thriftServer.singleSession + val e = intercept[SQLException] { + statement.executeQuery("SET spark.sql.hive.thriftServer.singleSession=false") + }.getMessage + assert(e.contains( + "Cannot modify the value of a static config: spark.sql.hive.thriftServer.singleSession")) + } + } + + test("share the current database and temporary tables across JDBC connections") { + withMultipleConnectionJdbcStatement()( + { statement => + statement.execute("CREATE DATABASE IF NOT EXISTS db1") + }, + + { statement => + val rs1 = statement.executeQuery("SELECT current_database()") + assert(rs1.next()) + assert(rs1.getString(1) === "default") + + statement.execute("USE db1") + + val rs2 = statement.executeQuery("SELECT current_database()") + assert(rs2.next()) + assert(rs2.getString(1) === "db1") + + statement.execute("CREATE TEMP VIEW tempView AS SELECT 123") + }, + + { statement => + // the current database is set to db1 by another JDBC connection. + val rs1 = statement.executeQuery("SELECT current_database()") + assert(rs1.next()) + assert(rs1.getString(1) === "db1") + + val rs2 = statement.executeQuery("SELECT * from tempView") + assert(rs2.next()) + assert(rs2.getString(1) === "123") + + statement.execute("USE default") + statement.execute("DROP VIEW tempView") + statement.execute("DROP DATABASE db1 CASCADE") + } + ) + } } class HiveThriftHttpServerSuite extends HiveThriftJdbcTest { override def mode: ServerMode.Value = ServerMode.http test("JDBC query execution") { - withJdbcStatement { statement => + withJdbcStatement("test") { statement => val queries = Seq( "SET spark.sql.shuffle.partitions=3", - "DROP TABLE IF EXISTS test", "CREATE TABLE test(key INT, val STRING)", s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test", "CACHE TABLE test") @@ -692,7 +722,7 @@ class HiveThriftHttpServerSuite extends HiveThriftJdbcTest { } test("Checks Hive version") { - withJdbcStatement { statement => + withJdbcStatement() { statement => val resultSet = statement.executeQuery("SET spark.sql.hive.version") resultSet.next() assert(resultSet.getString(1) === "spark.sql.hive.version") @@ -718,7 +748,7 @@ abstract class HiveThriftJdbcTest extends HiveThriftServer2Test { s"jdbc:hive2://localhost:$serverPort/" } - def withMultipleConnectionJdbcStatement(fs: (Statement => Unit)*) { + def withMultipleConnectionJdbcStatement(tableNames: String*)(fs: (Statement => Unit)*) { val user = System.getProperty("user.name") val connections = fs.map { _ => DriverManager.getConnection(jdbcUri, user, "") } val statements = connections.map(_.createStatement()) @@ -726,13 +756,16 @@ abstract class HiveThriftJdbcTest extends HiveThriftServer2Test { try { statements.zip(fs).foreach { case (s, f) => f(s) } } finally { + tableNames.foreach { name => + statements(0).execute(s"DROP TABLE IF EXISTS $name") + } statements.foreach(_.close()) connections.foreach(_.close()) } } - def withJdbcStatement(f: Statement => Unit) { - withMultipleConnectionJdbcStatement(f) + def withJdbcStatement(tableNames: String*)(f: Statement => Unit) { + withMultipleConnectionJdbcStatement(tableNames: _*)(f) } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala index bf431cd6b0..4c53dd8f46 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala @@ -74,7 +74,7 @@ class UISeleniumSuite } ignore("thrift server ui test") { - withJdbcStatement { statement => + withJdbcStatement("test_map") { statement => val baseURL = s"http://localhost:$uiPort" val queries = Seq( |