aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver/src/test
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2015-03-17 01:09:27 +0800
committerCheng Lian <lian@databricks.com>2015-03-17 01:09:27 +0800
commit12a345adcbaee359199ddfed4f41bf0e19d66d48 (patch)
tree3e1a93a7952024af0c14945efd577c79461231c2 /sql/hive-thriftserver/src/test
parent00e730b94cba1202a73af1e2476ff5a44af4b6b2 (diff)
downloadspark-12a345adcbaee359199ddfed4f41bf0e19d66d48.tar.gz
spark-12a345adcbaee359199ddfed4f41bf0e19d66d48.tar.bz2
spark-12a345adcbaee359199ddfed4f41bf0e19d66d48.zip
[SPARK-2087] [SQL] Multiple thriftserver sessions with single HiveContext instance
Still, we keep only a single HiveContext within ThriftServer, and we also create a object called `SQLSession` for isolating the different user states. Developers can obtain/release a new user session via `openSession` and `closeSession`, and `SQLContext` and `HiveContext` will also provide a default session if no `openSession` called, for backward-compatibility. Author: Cheng Hao <hao.cheng@intel.com> Closes #4885 from chenghao-intel/multisessions_singlecontext and squashes the following commits: 1c47b2a [Cheng Hao] rename the tss => tlSession 815b27a [Cheng Hao] code style issue 57e3fa0 [Cheng Hao] openSession is not compatible between Hive0.12 & 0.13.1 4665b0d [Cheng Hao] thriftservice with single context
Diffstat (limited to 'sql/hive-thriftserver/src/test')
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala161
1 files changed, 154 insertions, 7 deletions
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index d783d487b5..aff96e21a5 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -195,6 +195,146 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}
}
}
+
+ test("test multiple session") {
+ import org.apache.spark.sql.SQLConf
+ var defaultV1: String = null
+ var defaultV2: String = null
+
+ withMultipleConnectionJdbcStatement(
+ // create table
+ { statement =>
+
+ val queries = Seq(
+ "DROP TABLE IF EXISTS test_map",
+ "CREATE TABLE test_map(key INT, value STRING)",
+ s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map",
+ "CACHE TABLE test_table AS SELECT key FROM test_map ORDER BY key DESC")
+
+ queries.foreach(statement.execute)
+
+ val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC")
+ val buf1 = new collection.mutable.ArrayBuffer[Int]()
+ while (rs1.next()) {
+ buf1 += rs1.getInt(1)
+ }
+ rs1.close()
+
+ val rs2 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
+ val buf2 = new collection.mutable.ArrayBuffer[Int]()
+ while (rs2.next()) {
+ buf2 += rs2.getInt(1)
+ }
+ rs2.close()
+
+ assert(buf1 === buf2)
+ },
+
+ // first session, we get the default value of the session status
+ { statement =>
+
+ val rs1 = statement.executeQuery(s"SET ${SQLConf.SHUFFLE_PARTITIONS}")
+ rs1.next()
+ defaultV1 = rs1.getString(1)
+ assert(defaultV1 != "200")
+ rs1.close()
+
+ val rs2 = statement.executeQuery("SET hive.cli.print.header")
+ rs2.next()
+
+ defaultV2 = rs2.getString(1)
+ assert(defaultV1 != "true")
+ rs2.close()
+ },
+
+ // second session, we update the session status
+ { statement =>
+
+ val queries = Seq(
+ s"SET ${SQLConf.SHUFFLE_PARTITIONS}=291",
+ "SET hive.cli.print.header=true"
+ )
+
+ queries.map(statement.execute)
+ val rs1 = statement.executeQuery(s"SET ${SQLConf.SHUFFLE_PARTITIONS}")
+ rs1.next()
+ assert("spark.sql.shuffle.partitions=291" === rs1.getString(1))
+ rs1.close()
+
+ val rs2 = statement.executeQuery("SET hive.cli.print.header")
+ rs2.next()
+ assert("hive.cli.print.header=true" === rs2.getString(1))
+ rs2.close()
+ },
+
+ // third session, we get the latest session status, supposed to be the
+ // default value
+ { statement =>
+
+ val rs1 = statement.executeQuery(s"SET ${SQLConf.SHUFFLE_PARTITIONS}")
+ rs1.next()
+ assert(defaultV1 === rs1.getString(1))
+ rs1.close()
+
+ val rs2 = statement.executeQuery("SET hive.cli.print.header")
+ rs2.next()
+ assert(defaultV2 === rs2.getString(1))
+ rs2.close()
+ },
+
+ // accessing the cached data in another session
+ { statement =>
+
+ val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC")
+ val buf1 = new collection.mutable.ArrayBuffer[Int]()
+ while (rs1.next()) {
+ buf1 += rs1.getInt(1)
+ }
+ rs1.close()
+
+ val rs2 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
+ val buf2 = new collection.mutable.ArrayBuffer[Int]()
+ while (rs2.next()) {
+ buf2 += rs2.getInt(1)
+ }
+ rs2.close()
+
+ assert(buf1 === buf2)
+ statement.executeQuery("UNCACHE TABLE test_table")
+
+ // TODO need to figure out how to determine if the data loaded from cache
+ val rs3 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
+ val buf3 = new collection.mutable.ArrayBuffer[Int]()
+ while (rs3.next()) {
+ buf3 += rs3.getInt(1)
+ }
+ rs3.close()
+
+ assert(buf1 === buf3)
+ },
+
+ // accessing the uncached table
+ { statement =>
+
+ // TODO need to figure out how to determine if the data loaded from cache
+ val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC")
+ val buf1 = new collection.mutable.ArrayBuffer[Int]()
+ while (rs1.next()) {
+ buf1 += rs1.getInt(1)
+ }
+ rs1.close()
+
+ val rs2 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
+ val buf2 = new collection.mutable.ArrayBuffer[Int]()
+ while (rs2.next()) {
+ buf2 += rs2.getInt(1)
+ }
+ rs2.close()
+
+ assert(buf1 === buf2)
+ }
+ )
+ }
}
class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
@@ -245,15 +385,22 @@ abstract class HiveThriftJdbcTest extends HiveThriftServer2Test {
s"jdbc:hive2://localhost:$serverPort/"
}
- protected def withJdbcStatement(f: Statement => Unit): Unit = {
- val connection = DriverManager.getConnection(jdbcUri, user, "")
- val statement = connection.createStatement()
-
- try f(statement) finally {
- statement.close()
- connection.close()
+ def withMultipleConnectionJdbcStatement(fs: (Statement => Unit)*) {
+ val user = System.getProperty("user.name")
+ val connections = fs.map { _ => DriverManager.getConnection(jdbcUri, user, "") }
+ val statements = connections.map(_.createStatement())
+
+ try {
+ statements.zip(fs).map { case (s, f) => f(s) }
+ } finally {
+ statements.map(_.close())
+ connections.map(_.close())
}
}
+
+ def withJdbcStatement(f: Statement => Unit) {
+ withMultipleConnectionJdbcStatement(f)
+ }
}
abstract class HiveThriftServer2Test extends FunSuite with BeforeAndAfterAll with Logging {