diff options
author | Cheng Hao <hao.cheng@intel.com> | 2015-03-17 01:09:27 +0800 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2015-03-17 01:09:27 +0800 |
commit | 12a345adcbaee359199ddfed4f41bf0e19d66d48 (patch) | |
tree | 3e1a93a7952024af0c14945efd577c79461231c2 /sql/hive-thriftserver/src | |
parent | 00e730b94cba1202a73af1e2476ff5a44af4b6b2 (diff) | |
download | spark-12a345adcbaee359199ddfed4f41bf0e19d66d48.tar.gz spark-12a345adcbaee359199ddfed4f41bf0e19d66d48.tar.bz2 spark-12a345adcbaee359199ddfed4f41bf0e19d66d48.zip |
[SPARK-2087] [SQL] Multiple thriftserver sessions with single HiveContext instance
Still, we keep only a single HiveContext within ThriftServer, and we also create a object called `SQLSession` for isolating the different user states.
Developers can obtain/release a new user session via `openSession` and `closeSession`, and `SQLContext` and `HiveContext` will also provide a default session if no `openSession` called, for backward-compatibility.
Author: Cheng Hao <hao.cheng@intel.com>
Closes #4885 from chenghao-intel/multisessions_singlecontext and squashes the following commits:
1c47b2a [Cheng Hao] rename the tss => tlSession
815b27a [Cheng Hao] code style issue
57e3fa0 [Cheng Hao] openSession is not compatible between Hive0.12 & 0.13.1
4665b0d [Cheng Hao] thriftservice with single context
Diffstat (limited to 'sql/hive-thriftserver/src')
2 files changed, 154 insertions, 63 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala deleted file mode 100644 index 89e9ede726..0000000000 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.thriftserver - -import java.util.concurrent.Executors - -import org.apache.commons.logging.Log -import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hive.service.cli.session.SessionManager - -import org.apache.spark.sql.hive.HiveContext -import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ -import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager -import org.apache.hive.service.cli.SessionHandle - -private[hive] class SparkSQLSessionManager(hiveContext: HiveContext) - extends SessionManager - with ReflectedCompositeService { - - private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext) - - override def init(hiveConf: HiveConf) { - setSuperField(this, "hiveConf", hiveConf) - - val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS) - setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize)) - getAncestorField[Log](this, 3, "LOG").info( - s"HiveServer2: Async execution pool size $backgroundPoolSize") - - setSuperField(this, "operationManager", sparkSqlOperationManager) - addService(sparkSqlOperationManager) - - initCompositeService(hiveConf) - } - - override def closeSession(sessionHandle: SessionHandle) { - super.closeSession(sessionHandle) - sparkSqlOperationManager.sessionToActivePool -= sessionHandle - } -} diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index d783d487b5..aff96e21a5 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -195,6 +195,146 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } } } + + test("test multiple session") { + import org.apache.spark.sql.SQLConf + var defaultV1: String = null + var defaultV2: String = null + + withMultipleConnectionJdbcStatement( + // create table + { statement => + + val queries = Seq( + "DROP TABLE IF EXISTS test_map", + "CREATE TABLE test_map(key INT, value STRING)", + s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map", + "CACHE TABLE test_table AS SELECT key FROM test_map ORDER BY key DESC") + + queries.foreach(statement.execute) + + val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC") + val buf1 = new collection.mutable.ArrayBuffer[Int]() + while (rs1.next()) { + buf1 += rs1.getInt(1) + } + rs1.close() + + val rs2 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC") + val buf2 = new collection.mutable.ArrayBuffer[Int]() + while (rs2.next()) { + buf2 += rs2.getInt(1) + } + rs2.close() + + assert(buf1 === buf2) + }, + + // first session, we get the default value of the session status + { statement => + + val rs1 = statement.executeQuery(s"SET ${SQLConf.SHUFFLE_PARTITIONS}") + rs1.next() + defaultV1 = rs1.getString(1) + assert(defaultV1 != "200") + rs1.close() + + val rs2 = statement.executeQuery("SET hive.cli.print.header") + rs2.next() + + defaultV2 = rs2.getString(1) + assert(defaultV1 != "true") + rs2.close() + }, + + // second session, we update the session status + { statement => + + val queries = Seq( + s"SET ${SQLConf.SHUFFLE_PARTITIONS}=291", + "SET hive.cli.print.header=true" + ) + + queries.map(statement.execute) + val rs1 = statement.executeQuery(s"SET ${SQLConf.SHUFFLE_PARTITIONS}") + rs1.next() + assert("spark.sql.shuffle.partitions=291" === rs1.getString(1)) + rs1.close() + + val rs2 = statement.executeQuery("SET hive.cli.print.header") + rs2.next() + assert("hive.cli.print.header=true" === rs2.getString(1)) + rs2.close() + }, + + // third session, we get the latest session status, supposed to be the + // default value + { statement => + + val rs1 = statement.executeQuery(s"SET ${SQLConf.SHUFFLE_PARTITIONS}") + rs1.next() + assert(defaultV1 === rs1.getString(1)) + rs1.close() + + val rs2 = statement.executeQuery("SET hive.cli.print.header") + rs2.next() + assert(defaultV2 === rs2.getString(1)) + rs2.close() + }, + + // accessing the cached data in another session + { statement => + + val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC") + val buf1 = new collection.mutable.ArrayBuffer[Int]() + while (rs1.next()) { + buf1 += rs1.getInt(1) + } + rs1.close() + + val rs2 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC") + val buf2 = new collection.mutable.ArrayBuffer[Int]() + while (rs2.next()) { + buf2 += rs2.getInt(1) + } + rs2.close() + + assert(buf1 === buf2) + statement.executeQuery("UNCACHE TABLE test_table") + + // TODO need to figure out how to determine if the data loaded from cache + val rs3 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC") + val buf3 = new collection.mutable.ArrayBuffer[Int]() + while (rs3.next()) { + buf3 += rs3.getInt(1) + } + rs3.close() + + assert(buf1 === buf3) + }, + + // accessing the uncached table + { statement => + + // TODO need to figure out how to determine if the data loaded from cache + val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC") + val buf1 = new collection.mutable.ArrayBuffer[Int]() + while (rs1.next()) { + buf1 += rs1.getInt(1) + } + rs1.close() + + val rs2 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC") + val buf2 = new collection.mutable.ArrayBuffer[Int]() + while (rs2.next()) { + buf2 += rs2.getInt(1) + } + rs2.close() + + assert(buf1 === buf2) + } + ) + } } class HiveThriftHttpServerSuite extends HiveThriftJdbcTest { @@ -245,15 +385,22 @@ abstract class HiveThriftJdbcTest extends HiveThriftServer2Test { s"jdbc:hive2://localhost:$serverPort/" } - protected def withJdbcStatement(f: Statement => Unit): Unit = { - val connection = DriverManager.getConnection(jdbcUri, user, "") - val statement = connection.createStatement() - - try f(statement) finally { - statement.close() - connection.close() + def withMultipleConnectionJdbcStatement(fs: (Statement => Unit)*) { + val user = System.getProperty("user.name") + val connections = fs.map { _ => DriverManager.getConnection(jdbcUri, user, "") } + val statements = connections.map(_.createStatement()) + + try { + statements.zip(fs).map { case (s, f) => f(s) } + } finally { + statements.map(_.close()) + connections.map(_.close()) } } + + def withJdbcStatement(f: Statement => Unit) { + withMultipleConnectionJdbcStatement(f) + } } abstract class HiveThriftServer2Test extends FunSuite with BeforeAndAfterAll with Logging { |