aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-12-28 10:16:22 +0800
committerWenchen Fan <wenchen@databricks.com>2016-12-28 10:16:22 +0800
commit5ac62043cf6dc12c986e5ae9d9661fd439f8b5b9 (patch)
tree6307b2701a86255ed3d34982e99b0ad53a3a1ccb /sql/hive-thriftserver
parent28ab0ec49fa9bac1c4a246a44a5d1ad163660e1a (diff)
downloadspark-5ac62043cf6dc12c986e5ae9d9661fd439f8b5b9.tar.gz
spark-5ac62043cf6dc12c986e5ae9d9661fd439f8b5b9.tar.bz2
spark-5ac62043cf6dc12c986e5ae9d9661fd439f8b5b9.zip
[SPARK-18992][SQL] Move spark.sql.hive.thriftServer.singleSession to SQLConf
### What changes were proposed in this pull request? Since `spark.sql.hive.thriftServer.singleSession` is a configuration of SQL component, this conf can be moved from `SparkConf` to `StaticSQLConf`. When we introduced `spark.sql.hive.thriftServer.singleSession`, all the SQL configuration are session specific. They can be modified in different sessions. In Spark 2.1, static SQL configuration is added. It is a perfect fit for `spark.sql.hive.thriftServer.singleSession`. Previously, we did the same move for `spark.sql.warehouse.dir` from `SparkConf` to `StaticSQLConf` ### How was this patch tested? Added test cases in HiveThriftServer2Suites.scala Author: gatorsmile <gatorsmile@gmail.com> Closes #16392 from gatorsmile/hiveThriftServerSingleSession.
Diffstat (limited to 'sql/hive-thriftserver')
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala5
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala115
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala2
3 files changed, 77 insertions, 45 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
index 226b7e175a..7adaafe5ad 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -28,7 +28,7 @@ import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.hive.service.server.HiveServer2
import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.hive.{HiveSessionState, HiveUtils}
+import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
@@ -72,8 +72,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext:
val session = super.getSession(sessionHandle)
HiveThriftServer2.listener.onSessionCreated(
session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername)
- val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
- val ctx = if (sessionState.hiveThriftServerSingleSession) {
+ val ctx = if (sqlContext.conf.hiveThriftServerSingleSession) {
sqlContext
} else {
sqlContext.newSession()
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 5d20ec958c..b6215bde6b 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -98,9 +98,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
val user = System.getProperty("user.name")
val sessionHandle = client.openSession(user, "")
- withJdbcStatement { statement =>
+ withJdbcStatement("test_16563") { statement =>
val queries = Seq(
- "DROP TABLE IF EXISTS test_16563",
"CREATE TABLE test_16563(key INT, val STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_16563")
@@ -134,16 +133,14 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
rows_first.numRows()
}
- statement.executeQuery("DROP TABLE IF EXISTS test_16563")
}
}
}
test("JDBC query execution") {
- withJdbcStatement { statement =>
+ withJdbcStatement("test") { statement =>
val queries = Seq(
"SET spark.sql.shuffle.partitions=3",
- "DROP TABLE IF EXISTS test",
"CREATE TABLE test(key INT, val STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test",
"CACHE TABLE test")
@@ -159,7 +156,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}
test("Checks Hive version") {
- withJdbcStatement { statement =>
+ withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("SET spark.sql.hive.version")
resultSet.next()
assert(resultSet.getString(1) === "spark.sql.hive.version")
@@ -168,9 +165,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}
test("SPARK-3004 regression: result set containing NULL") {
- withJdbcStatement { statement =>
+ withJdbcStatement("test_null") { statement =>
val queries = Seq(
- "DROP TABLE IF EXISTS test_null",
"CREATE TABLE test_null(key INT, val STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKvWithNull}' OVERWRITE INTO TABLE test_null")
@@ -189,9 +185,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}
test("SPARK-4292 regression: result set iterator issue") {
- withJdbcStatement { statement =>
+ withJdbcStatement("test_4292") { statement =>
val queries = Seq(
- "DROP TABLE IF EXISTS test_4292",
"CREATE TABLE test_4292(key INT, val STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_4292")
@@ -203,15 +198,12 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
resultSet.next()
assert(resultSet.getInt(1) === key)
}
-
- statement.executeQuery("DROP TABLE IF EXISTS test_4292")
}
}
test("SPARK-4309 regression: Date type support") {
- withJdbcStatement { statement =>
+ withJdbcStatement("test_date") { statement =>
val queries = Seq(
- "DROP TABLE IF EXISTS test_date",
"CREATE TABLE test_date(key INT, value STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_date")
@@ -227,9 +219,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}
test("SPARK-4407 regression: Complex type support") {
- withJdbcStatement { statement =>
+ withJdbcStatement("test_map") { statement =>
val queries = Seq(
- "DROP TABLE IF EXISTS test_map",
"CREATE TABLE test_map(key INT, value STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map")
@@ -251,9 +242,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}
test("SPARK-12143 regression: Binary type support") {
- withJdbcStatement { statement =>
+ withJdbcStatement("test_binary") { statement =>
val queries = Seq(
- "DROP TABLE IF EXISTS test_binary",
"CREATE TABLE test_binary(key INT, value STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_binary")
@@ -262,7 +252,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
val expected: Array[Byte] = "val_238".getBytes
assertResult(expected) {
val resultSet = statement.executeQuery(
- "SELECT CAST(value as BINARY) FROM test_date LIMIT 1")
+ "SELECT CAST(value as BINARY) FROM test_binary LIMIT 1")
resultSet.next()
resultSet.getObject(1)
}
@@ -275,12 +265,11 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
var defaultV2: String = null
var data: ArrayBuffer[Int] = null
- withMultipleConnectionJdbcStatement(
+ withMultipleConnectionJdbcStatement("test_map")(
// create table
{ statement =>
val queries = Seq(
- "DROP TABLE IF EXISTS test_map",
"CREATE TABLE test_map(key INT, value STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map",
"CACHE TABLE test_table AS SELECT key FROM test_map ORDER BY key DESC",
@@ -418,9 +407,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
// This test often hangs and then times out, leaving the hanging processes.
// Let's ignore it and improve the test.
ignore("test jdbc cancel") {
- withJdbcStatement { statement =>
+ withJdbcStatement("test_map") { statement =>
val queries = Seq(
- "DROP TABLE IF EXISTS test_map",
"CREATE TABLE test_map(key INT, value STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map")
@@ -478,7 +466,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}
test("test add jar") {
- withMultipleConnectionJdbcStatement(
+ withMultipleConnectionJdbcStatement("smallKV", "addJar")(
{
statement =>
val jarFile =
@@ -492,10 +480,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
{
statement =>
val queries = Seq(
- "DROP TABLE IF EXISTS smallKV",
"CREATE TABLE smallKV(key INT, val STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE smallKV",
- "DROP TABLE IF EXISTS addJar",
"""CREATE TABLE addJar(key string)
|ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
""".stripMargin)
@@ -524,15 +510,12 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
expectedResult.close()
assert(expectedResultBuffer === actualResultBuffer)
-
- statement.executeQuery("DROP TABLE IF EXISTS addJar")
- statement.executeQuery("DROP TABLE IF EXISTS smallKV")
}
)
}
test("Checks Hive version via SET -v") {
- withJdbcStatement { statement =>
+ withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("SET -v")
val conf = mutable.Map.empty[String, String]
@@ -545,7 +528,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}
test("Checks Hive version via SET") {
- withJdbcStatement { statement =>
+ withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("SET")
val conf = mutable.Map.empty[String, String]
@@ -558,7 +541,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}
test("SPARK-11595 ADD JAR with input path having URL scheme") {
- withJdbcStatement { statement =>
+ withJdbcStatement("test_udtf") { statement =>
try {
val jarPath = "../hive/src/test/resources/TestUDTF.jar"
val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath"
@@ -586,7 +569,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
val dataPath = "../hive/src/test/resources/data/files/kv1.txt"
Seq(
- s"CREATE TABLE test_udtf(key INT, value STRING)",
+ "CREATE TABLE test_udtf(key INT, value STRING)",
s"LOAD DATA LOCAL INPATH '$dataPath' OVERWRITE INTO TABLE test_udtf"
).foreach(statement.execute)
@@ -624,8 +607,8 @@ class SingleSessionSuite extends HiveThriftJdbcTest {
override protected def extraConf: Seq[String] =
"--conf spark.sql.hive.thriftServer.singleSession=true" :: Nil
- test("test single session") {
- withMultipleConnectionJdbcStatement(
+ test("share the temporary functions across JDBC connections") {
+ withMultipleConnectionJdbcStatement()(
{ statement =>
val jarPath = "../hive/src/test/resources/TestUDTF.jar"
val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath"
@@ -667,16 +650,63 @@ class SingleSessionSuite extends HiveThriftJdbcTest {
}
)
}
+
+ test("unable to changing spark.sql.hive.thriftServer.singleSession using JDBC connections") {
+ withJdbcStatement() { statement =>
+ // JDBC connections are not able to set the conf spark.sql.hive.thriftServer.singleSession
+ val e = intercept[SQLException] {
+ statement.executeQuery("SET spark.sql.hive.thriftServer.singleSession=false")
+ }.getMessage
+ assert(e.contains(
+ "Cannot modify the value of a static config: spark.sql.hive.thriftServer.singleSession"))
+ }
+ }
+
+ test("share the current database and temporary tables across JDBC connections") {
+ withMultipleConnectionJdbcStatement()(
+ { statement =>
+ statement.execute("CREATE DATABASE IF NOT EXISTS db1")
+ },
+
+ { statement =>
+ val rs1 = statement.executeQuery("SELECT current_database()")
+ assert(rs1.next())
+ assert(rs1.getString(1) === "default")
+
+ statement.execute("USE db1")
+
+ val rs2 = statement.executeQuery("SELECT current_database()")
+ assert(rs2.next())
+ assert(rs2.getString(1) === "db1")
+
+ statement.execute("CREATE TEMP VIEW tempView AS SELECT 123")
+ },
+
+ { statement =>
+ // the current database is set to db1 by another JDBC connection.
+ val rs1 = statement.executeQuery("SELECT current_database()")
+ assert(rs1.next())
+ assert(rs1.getString(1) === "db1")
+
+ val rs2 = statement.executeQuery("SELECT * from tempView")
+ assert(rs2.next())
+ assert(rs2.getString(1) === "123")
+
+ statement.execute("USE default")
+ statement.execute("DROP VIEW tempView")
+ statement.execute("DROP DATABASE db1 CASCADE")
+ }
+ )
+ }
}
class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
override def mode: ServerMode.Value = ServerMode.http
test("JDBC query execution") {
- withJdbcStatement { statement =>
+ withJdbcStatement("test") { statement =>
val queries = Seq(
"SET spark.sql.shuffle.partitions=3",
- "DROP TABLE IF EXISTS test",
"CREATE TABLE test(key INT, val STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test",
"CACHE TABLE test")
@@ -692,7 +722,7 @@ class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
}
test("Checks Hive version") {
- withJdbcStatement { statement =>
+ withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("SET spark.sql.hive.version")
resultSet.next()
assert(resultSet.getString(1) === "spark.sql.hive.version")
@@ -718,7 +748,7 @@ abstract class HiveThriftJdbcTest extends HiveThriftServer2Test {
s"jdbc:hive2://localhost:$serverPort/"
}
- def withMultipleConnectionJdbcStatement(fs: (Statement => Unit)*) {
+ def withMultipleConnectionJdbcStatement(tableNames: String*)(fs: (Statement => Unit)*) {
val user = System.getProperty("user.name")
val connections = fs.map { _ => DriverManager.getConnection(jdbcUri, user, "") }
val statements = connections.map(_.createStatement())
@@ -726,13 +756,16 @@ abstract class HiveThriftJdbcTest extends HiveThriftServer2Test {
try {
statements.zip(fs).foreach { case (s, f) => f(s) }
} finally {
+ tableNames.foreach { name =>
+ statements(0).execute(s"DROP TABLE IF EXISTS $name")
+ }
statements.foreach(_.close())
connections.foreach(_.close())
}
}
- def withJdbcStatement(f: Statement => Unit) {
- withMultipleConnectionJdbcStatement(f)
+ def withJdbcStatement(tableNames: String*)(f: Statement => Unit) {
+ withMultipleConnectionJdbcStatement(tableNames: _*)(f)
}
}
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala
index bf431cd6b0..4c53dd8f46 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala
@@ -74,7 +74,7 @@ class UISeleniumSuite
}
ignore("thrift server ui test") {
- withJdbcStatement { statement =>
+ withJdbcStatement("test_map") { statement =>
val baseURL = s"http://localhost:$uiPort"
val queries = Seq(