diff options
5 files changed, 87 insertions, 51 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index cce16264d9..304dcb691b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -831,6 +831,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def warehousePath: String = new Path(getConf(StaticSQLConf.WAREHOUSE_PATH)).toString + def hiveThriftServerSingleSession: Boolean = + getConf(StaticSQLConf.HIVE_THRIFT_SERVER_SINGLESESSION) + override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL) override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL) @@ -1008,4 +1011,11 @@ object StaticSQLConf { .doc("Only used for internal debugging. Not all functions are supported when it is enabled.") .booleanConf .createWithDefault(false) + + val HIVE_THRIFT_SERVER_SINGLESESSION = buildConf("spark.sql.hive.thriftServer.singleSession") + .doc("When set to true, Hive Thrift server is running in a single session mode. " + + "All the JDBC/ODBC connections share the temporary views, function registries, " + + "SQL configuration and the current database.") + .booleanConf + .createWithDefault(false) } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index 226b7e175a..7adaafe5ad 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -28,7 +28,7 @@ import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.hive.service.server.HiveServer2 import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.hive.{HiveSessionState, HiveUtils} +import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager @@ -72,8 +72,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: val session = super.getSession(sessionHandle) HiveThriftServer2.listener.onSessionCreated( session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername) - val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] - val ctx = if (sessionState.hiveThriftServerSingleSession) { + val ctx = if (sqlContext.conf.hiveThriftServerSingleSession) { sqlContext } else { sqlContext.newSession() diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 5d20ec958c..b6215bde6b 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -98,9 +98,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { val user = System.getProperty("user.name") val sessionHandle = client.openSession(user, "") - withJdbcStatement { statement => + withJdbcStatement("test_16563") { statement => val queries = Seq( - "DROP TABLE IF EXISTS test_16563", "CREATE TABLE test_16563(key INT, val STRING)", s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_16563") @@ -134,16 +133,14 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { rows_first.numRows() } - statement.executeQuery("DROP TABLE IF EXISTS test_16563") } } } test("JDBC query execution") { - withJdbcStatement { statement => + withJdbcStatement("test") { statement => val queries = Seq( "SET spark.sql.shuffle.partitions=3", - "DROP TABLE IF EXISTS test", "CREATE TABLE test(key INT, val STRING)", s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test", "CACHE TABLE test") @@ -159,7 +156,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } test("Checks Hive version") { - withJdbcStatement { statement => + withJdbcStatement() { statement => val resultSet = statement.executeQuery("SET spark.sql.hive.version") resultSet.next() assert(resultSet.getString(1) === "spark.sql.hive.version") @@ -168,9 +165,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } test("SPARK-3004 regression: result set containing NULL") { - withJdbcStatement { statement => + withJdbcStatement("test_null") { statement => val queries = Seq( - "DROP TABLE IF EXISTS test_null", "CREATE TABLE test_null(key INT, val STRING)", s"LOAD DATA LOCAL INPATH '${TestData.smallKvWithNull}' OVERWRITE INTO TABLE test_null") @@ -189,9 +185,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } test("SPARK-4292 regression: result set iterator issue") { - withJdbcStatement { statement => + withJdbcStatement("test_4292") { statement => val queries = Seq( - "DROP TABLE IF EXISTS test_4292", "CREATE TABLE test_4292(key INT, val STRING)", s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_4292") @@ -203,15 +198,12 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { resultSet.next() assert(resultSet.getInt(1) === key) } - - statement.executeQuery("DROP TABLE IF EXISTS test_4292") } } test("SPARK-4309 regression: Date type support") { - withJdbcStatement { statement => + withJdbcStatement("test_date") { statement => val queries = Seq( - "DROP TABLE IF EXISTS test_date", "CREATE TABLE test_date(key INT, value STRING)", s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_date") @@ -227,9 +219,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } test("SPARK-4407 regression: Complex type support") { - withJdbcStatement { statement => + withJdbcStatement("test_map") { statement => val queries = Seq( - "DROP TABLE IF EXISTS test_map", "CREATE TABLE test_map(key INT, value STRING)", s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map") @@ -251,9 +242,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } test("SPARK-12143 regression: Binary type support") { - withJdbcStatement { statement => + withJdbcStatement("test_binary") { statement => val queries = Seq( - "DROP TABLE IF EXISTS test_binary", "CREATE TABLE test_binary(key INT, value STRING)", s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_binary") @@ -262,7 +252,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { val expected: Array[Byte] = "val_238".getBytes assertResult(expected) { val resultSet = statement.executeQuery( - "SELECT CAST(value as BINARY) FROM test_date LIMIT 1") + "SELECT CAST(value as BINARY) FROM test_binary LIMIT 1") resultSet.next() resultSet.getObject(1) } @@ -275,12 +265,11 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { var defaultV2: String = null var data: ArrayBuffer[Int] = null - withMultipleConnectionJdbcStatement( + withMultipleConnectionJdbcStatement("test_map")( // create table { statement => val queries = Seq( - "DROP TABLE IF EXISTS test_map", "CREATE TABLE test_map(key INT, value STRING)", s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map", "CACHE TABLE test_table AS SELECT key FROM test_map ORDER BY key DESC", @@ -418,9 +407,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { // This test often hangs and then times out, leaving the hanging processes. // Let's ignore it and improve the test. ignore("test jdbc cancel") { - withJdbcStatement { statement => + withJdbcStatement("test_map") { statement => val queries = Seq( - "DROP TABLE IF EXISTS test_map", "CREATE TABLE test_map(key INT, value STRING)", s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map") @@ -478,7 +466,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } test("test add jar") { - withMultipleConnectionJdbcStatement( + withMultipleConnectionJdbcStatement("smallKV", "addJar")( { statement => val jarFile = @@ -492,10 +480,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { { statement => val queries = Seq( - "DROP TABLE IF EXISTS smallKV", "CREATE TABLE smallKV(key INT, val STRING)", s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE smallKV", - "DROP TABLE IF EXISTS addJar", """CREATE TABLE addJar(key string) |ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' """.stripMargin) @@ -524,15 +510,12 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { expectedResult.close() assert(expectedResultBuffer === actualResultBuffer) - - statement.executeQuery("DROP TABLE IF EXISTS addJar") - statement.executeQuery("DROP TABLE IF EXISTS smallKV") } ) } test("Checks Hive version via SET -v") { - withJdbcStatement { statement => + withJdbcStatement() { statement => val resultSet = statement.executeQuery("SET -v") val conf = mutable.Map.empty[String, String] @@ -545,7 +528,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } test("Checks Hive version via SET") { - withJdbcStatement { statement => + withJdbcStatement() { statement => val resultSet = statement.executeQuery("SET") val conf = mutable.Map.empty[String, String] @@ -558,7 +541,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } test("SPARK-11595 ADD JAR with input path having URL scheme") { - withJdbcStatement { statement => + withJdbcStatement("test_udtf") { statement => try { val jarPath = "../hive/src/test/resources/TestUDTF.jar" val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath" @@ -586,7 +569,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { val dataPath = "../hive/src/test/resources/data/files/kv1.txt" Seq( - s"CREATE TABLE test_udtf(key INT, value STRING)", + "CREATE TABLE test_udtf(key INT, value STRING)", s"LOAD DATA LOCAL INPATH '$dataPath' OVERWRITE INTO TABLE test_udtf" ).foreach(statement.execute) @@ -624,8 +607,8 @@ class SingleSessionSuite extends HiveThriftJdbcTest { override protected def extraConf: Seq[String] = "--conf spark.sql.hive.thriftServer.singleSession=true" :: Nil - test("test single session") { - withMultipleConnectionJdbcStatement( + test("share the temporary functions across JDBC connections") { + withMultipleConnectionJdbcStatement()( { statement => val jarPath = "../hive/src/test/resources/TestUDTF.jar" val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath" @@ -667,16 +650,63 @@ class SingleSessionSuite extends HiveThriftJdbcTest { } ) } + + test("unable to changing spark.sql.hive.thriftServer.singleSession using JDBC connections") { + withJdbcStatement() { statement => + // JDBC connections are not able to set the conf spark.sql.hive.thriftServer.singleSession + val e = intercept[SQLException] { + statement.executeQuery("SET spark.sql.hive.thriftServer.singleSession=false") + }.getMessage + assert(e.contains( + "Cannot modify the value of a static config: spark.sql.hive.thriftServer.singleSession")) + } + } + + test("share the current database and temporary tables across JDBC connections") { + withMultipleConnectionJdbcStatement()( + { statement => + statement.execute("CREATE DATABASE IF NOT EXISTS db1") + }, + + { statement => + val rs1 = statement.executeQuery("SELECT current_database()") + assert(rs1.next()) + assert(rs1.getString(1) === "default") + + statement.execute("USE db1") + + val rs2 = statement.executeQuery("SELECT current_database()") + assert(rs2.next()) + assert(rs2.getString(1) === "db1") + + statement.execute("CREATE TEMP VIEW tempView AS SELECT 123") + }, + + { statement => + // the current database is set to db1 by another JDBC connection. + val rs1 = statement.executeQuery("SELECT current_database()") + assert(rs1.next()) + assert(rs1.getString(1) === "db1") + + val rs2 = statement.executeQuery("SELECT * from tempView") + assert(rs2.next()) + assert(rs2.getString(1) === "123") + + statement.execute("USE default") + statement.execute("DROP VIEW tempView") + statement.execute("DROP DATABASE db1 CASCADE") + } + ) + } } class HiveThriftHttpServerSuite extends HiveThriftJdbcTest { override def mode: ServerMode.Value = ServerMode.http test("JDBC query execution") { - withJdbcStatement { statement => + withJdbcStatement("test") { statement => val queries = Seq( "SET spark.sql.shuffle.partitions=3", - "DROP TABLE IF EXISTS test", "CREATE TABLE test(key INT, val STRING)", s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test", "CACHE TABLE test") @@ -692,7 +722,7 @@ class HiveThriftHttpServerSuite extends HiveThriftJdbcTest { } test("Checks Hive version") { - withJdbcStatement { statement => + withJdbcStatement() { statement => val resultSet = statement.executeQuery("SET spark.sql.hive.version") resultSet.next() assert(resultSet.getString(1) === "spark.sql.hive.version") @@ -718,7 +748,7 @@ abstract class HiveThriftJdbcTest extends HiveThriftServer2Test { s"jdbc:hive2://localhost:$serverPort/" } - def withMultipleConnectionJdbcStatement(fs: (Statement => Unit)*) { + def withMultipleConnectionJdbcStatement(tableNames: String*)(fs: (Statement => Unit)*) { val user = System.getProperty("user.name") val connections = fs.map { _ => DriverManager.getConnection(jdbcUri, user, "") } val statements = connections.map(_.createStatement()) @@ -726,13 +756,16 @@ abstract class HiveThriftJdbcTest extends HiveThriftServer2Test { try { statements.zip(fs).foreach { case (s, f) => f(s) } } finally { + tableNames.foreach { name => + statements(0).execute(s"DROP TABLE IF EXISTS $name") + } statements.foreach(_.close()) connections.foreach(_.close()) } } - def withJdbcStatement(f: Statement => Unit) { - withMultipleConnectionJdbcStatement(f) + def withJdbcStatement(tableNames: String*)(f: Statement => Unit) { + withMultipleConnectionJdbcStatement(tableNames: _*)(f) } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala index bf431cd6b0..4c53dd8f46 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala @@ -74,7 +74,7 @@ class UISeleniumSuite } ignore("thrift server ui test") { - withJdbcStatement { statement => + withJdbcStatement("test_map") { statement => val baseURL = s"http://localhost:$uiPort" val queries = Seq( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 6d4fe1a941..bea073fb48 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -141,10 +141,4 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC) } - // TODO: why do we get this from SparkConf but not SQLConf? - def hiveThriftServerSingleSession: Boolean = { - sparkSession.sparkContext.conf.getBoolean( - "spark.sql.hive.thriftServer.singleSession", defaultValue = false) - } - } |