diff options
author | Cheng Hao <hao.cheng@intel.com> | 2015-03-17 01:09:27 +0800 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2015-03-17 01:09:27 +0800 |
commit | 12a345adcbaee359199ddfed4f41bf0e19d66d48 (patch) | |
tree | 3e1a93a7952024af0c14945efd577c79461231c2 /sql/hive-thriftserver/src/test | |
parent | 00e730b94cba1202a73af1e2476ff5a44af4b6b2 (diff) | |
download | spark-12a345adcbaee359199ddfed4f41bf0e19d66d48.tar.gz spark-12a345adcbaee359199ddfed4f41bf0e19d66d48.tar.bz2 spark-12a345adcbaee359199ddfed4f41bf0e19d66d48.zip |
[SPARK-2087] [SQL] Multiple thriftserver sessions with single HiveContext instance
Still, we keep only a single HiveContext within ThriftServer, and we also create a object called `SQLSession` for isolating the different user states.
Developers can obtain/release a new user session via `openSession` and `closeSession`, and `SQLContext` and `HiveContext` will also provide a default session if no `openSession` called, for backward-compatibility.
Author: Cheng Hao <hao.cheng@intel.com>
Closes #4885 from chenghao-intel/multisessions_singlecontext and squashes the following commits:
1c47b2a [Cheng Hao] rename the tss => tlSession
815b27a [Cheng Hao] code style issue
57e3fa0 [Cheng Hao] openSession is not compatible between Hive0.12 & 0.13.1
4665b0d [Cheng Hao] thriftservice with single context
Diffstat (limited to 'sql/hive-thriftserver/src/test')
-rw-r--r-- | sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala | 161 |
1 files changed, 154 insertions, 7 deletions
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index d783d487b5..aff96e21a5 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -195,6 +195,146 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } } } + + test("test multiple session") { + import org.apache.spark.sql.SQLConf + var defaultV1: String = null + var defaultV2: String = null + + withMultipleConnectionJdbcStatement( + // create table + { statement => + + val queries = Seq( + "DROP TABLE IF EXISTS test_map", + "CREATE TABLE test_map(key INT, value STRING)", + s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map", + "CACHE TABLE test_table AS SELECT key FROM test_map ORDER BY key DESC") + + queries.foreach(statement.execute) + + val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC") + val buf1 = new collection.mutable.ArrayBuffer[Int]() + while (rs1.next()) { + buf1 += rs1.getInt(1) + } + rs1.close() + + val rs2 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC") + val buf2 = new collection.mutable.ArrayBuffer[Int]() + while (rs2.next()) { + buf2 += rs2.getInt(1) + } + rs2.close() + + assert(buf1 === buf2) + }, + + // first session, we get the default value of the session status + { statement => + + val rs1 = statement.executeQuery(s"SET ${SQLConf.SHUFFLE_PARTITIONS}") + rs1.next() + defaultV1 = rs1.getString(1) + assert(defaultV1 != "200") + rs1.close() + + val rs2 = statement.executeQuery("SET hive.cli.print.header") + rs2.next() + + defaultV2 = rs2.getString(1) + assert(defaultV1 != "true") + rs2.close() + }, + + // second session, we update the session status + { statement => + + val queries = Seq( + s"SET ${SQLConf.SHUFFLE_PARTITIONS}=291", + "SET hive.cli.print.header=true" + ) + + queries.map(statement.execute) + val rs1 = statement.executeQuery(s"SET ${SQLConf.SHUFFLE_PARTITIONS}") + rs1.next() + assert("spark.sql.shuffle.partitions=291" === rs1.getString(1)) + rs1.close() + + val rs2 = statement.executeQuery("SET hive.cli.print.header") + rs2.next() + assert("hive.cli.print.header=true" === rs2.getString(1)) + rs2.close() + }, + + // third session, we get the latest session status, supposed to be the + // default value + { statement => + + val rs1 = statement.executeQuery(s"SET ${SQLConf.SHUFFLE_PARTITIONS}") + rs1.next() + assert(defaultV1 === rs1.getString(1)) + rs1.close() + + val rs2 = statement.executeQuery("SET hive.cli.print.header") + rs2.next() + assert(defaultV2 === rs2.getString(1)) + rs2.close() + }, + + // accessing the cached data in another session + { statement => + + val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC") + val buf1 = new collection.mutable.ArrayBuffer[Int]() + while (rs1.next()) { + buf1 += rs1.getInt(1) + } + rs1.close() + + val rs2 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC") + val buf2 = new collection.mutable.ArrayBuffer[Int]() + while (rs2.next()) { + buf2 += rs2.getInt(1) + } + rs2.close() + + assert(buf1 === buf2) + statement.executeQuery("UNCACHE TABLE test_table") + + // TODO need to figure out how to determine if the data loaded from cache + val rs3 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC") + val buf3 = new collection.mutable.ArrayBuffer[Int]() + while (rs3.next()) { + buf3 += rs3.getInt(1) + } + rs3.close() + + assert(buf1 === buf3) + }, + + // accessing the uncached table + { statement => + + // TODO need to figure out how to determine if the data loaded from cache + val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC") + val buf1 = new collection.mutable.ArrayBuffer[Int]() + while (rs1.next()) { + buf1 += rs1.getInt(1) + } + rs1.close() + + val rs2 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC") + val buf2 = new collection.mutable.ArrayBuffer[Int]() + while (rs2.next()) { + buf2 += rs2.getInt(1) + } + rs2.close() + + assert(buf1 === buf2) + } + ) + } } class HiveThriftHttpServerSuite extends HiveThriftJdbcTest { @@ -245,15 +385,22 @@ abstract class HiveThriftJdbcTest extends HiveThriftServer2Test { s"jdbc:hive2://localhost:$serverPort/" } - protected def withJdbcStatement(f: Statement => Unit): Unit = { - val connection = DriverManager.getConnection(jdbcUri, user, "") - val statement = connection.createStatement() - - try f(statement) finally { - statement.close() - connection.close() + def withMultipleConnectionJdbcStatement(fs: (Statement => Unit)*) { + val user = System.getProperty("user.name") + val connections = fs.map { _ => DriverManager.getConnection(jdbcUri, user, "") } + val statements = connections.map(_.createStatement()) + + try { + statements.zip(fs).map { case (s, f) => f(s) } + } finally { + statements.map(_.close()) + connections.map(_.close()) } } + + def withJdbcStatement(f: Statement => Unit) { + withMultipleConnectionJdbcStatement(f) + } } abstract class HiveThriftServer2Test extends FunSuite with BeforeAndAfterAll with Logging { |