aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver/src
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2015-03-17 01:09:27 +0800
committerCheng Lian <lian@databricks.com>2015-03-17 01:09:27 +0800
commit12a345adcbaee359199ddfed4f41bf0e19d66d48 (patch)
tree3e1a93a7952024af0c14945efd577c79461231c2 /sql/hive-thriftserver/src
parent00e730b94cba1202a73af1e2476ff5a44af4b6b2 (diff)
downloadspark-12a345adcbaee359199ddfed4f41bf0e19d66d48.tar.gz
spark-12a345adcbaee359199ddfed4f41bf0e19d66d48.tar.bz2
spark-12a345adcbaee359199ddfed4f41bf0e19d66d48.zip
[SPARK-2087] [SQL] Multiple thriftserver sessions with single HiveContext instance
Still, we keep only a single HiveContext within ThriftServer, and we also create a object called `SQLSession` for isolating the different user states. Developers can obtain/release a new user session via `openSession` and `closeSession`, and `SQLContext` and `HiveContext` will also provide a default session if no `openSession` called, for backward-compatibility. Author: Cheng Hao <hao.cheng@intel.com> Closes #4885 from chenghao-intel/multisessions_singlecontext and squashes the following commits: 1c47b2a [Cheng Hao] rename the tss => tlSession 815b27a [Cheng Hao] code style issue 57e3fa0 [Cheng Hao] openSession is not compatible between Hive0.12 & 0.13.1 4665b0d [Cheng Hao] thriftservice with single context
Diffstat (limited to 'sql/hive-thriftserver/src')
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala56
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala161
2 files changed, 154 insertions, 63 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
deleted file mode 100644
index 89e9ede726..0000000000
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.thriftserver
-
-import java.util.concurrent.Executors
-
-import org.apache.commons.logging.Log
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hive.service.cli.session.SessionManager
-
-import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
-import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
-import org.apache.hive.service.cli.SessionHandle
-
-private[hive] class SparkSQLSessionManager(hiveContext: HiveContext)
- extends SessionManager
- with ReflectedCompositeService {
-
- private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext)
-
- override def init(hiveConf: HiveConf) {
- setSuperField(this, "hiveConf", hiveConf)
-
- val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS)
- setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize))
- getAncestorField[Log](this, 3, "LOG").info(
- s"HiveServer2: Async execution pool size $backgroundPoolSize")
-
- setSuperField(this, "operationManager", sparkSqlOperationManager)
- addService(sparkSqlOperationManager)
-
- initCompositeService(hiveConf)
- }
-
- override def closeSession(sessionHandle: SessionHandle) {
- super.closeSession(sessionHandle)
- sparkSqlOperationManager.sessionToActivePool -= sessionHandle
- }
-}
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index d783d487b5..aff96e21a5 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -195,6 +195,146 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}
}
}
+
+ test("test multiple session") {
+ import org.apache.spark.sql.SQLConf
+ var defaultV1: String = null
+ var defaultV2: String = null
+
+ withMultipleConnectionJdbcStatement(
+ // create table
+ { statement =>
+
+ val queries = Seq(
+ "DROP TABLE IF EXISTS test_map",
+ "CREATE TABLE test_map(key INT, value STRING)",
+ s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map",
+ "CACHE TABLE test_table AS SELECT key FROM test_map ORDER BY key DESC")
+
+ queries.foreach(statement.execute)
+
+ val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC")
+ val buf1 = new collection.mutable.ArrayBuffer[Int]()
+ while (rs1.next()) {
+ buf1 += rs1.getInt(1)
+ }
+ rs1.close()
+
+ val rs2 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
+ val buf2 = new collection.mutable.ArrayBuffer[Int]()
+ while (rs2.next()) {
+ buf2 += rs2.getInt(1)
+ }
+ rs2.close()
+
+ assert(buf1 === buf2)
+ },
+
+ // first session, we get the default value of the session status
+ { statement =>
+
+ val rs1 = statement.executeQuery(s"SET ${SQLConf.SHUFFLE_PARTITIONS}")
+ rs1.next()
+ defaultV1 = rs1.getString(1)
+ assert(defaultV1 != "200")
+ rs1.close()
+
+ val rs2 = statement.executeQuery("SET hive.cli.print.header")
+ rs2.next()
+
+ defaultV2 = rs2.getString(1)
+ assert(defaultV1 != "true")
+ rs2.close()
+ },
+
+ // second session, we update the session status
+ { statement =>
+
+ val queries = Seq(
+ s"SET ${SQLConf.SHUFFLE_PARTITIONS}=291",
+ "SET hive.cli.print.header=true"
+ )
+
+ queries.map(statement.execute)
+ val rs1 = statement.executeQuery(s"SET ${SQLConf.SHUFFLE_PARTITIONS}")
+ rs1.next()
+ assert("spark.sql.shuffle.partitions=291" === rs1.getString(1))
+ rs1.close()
+
+ val rs2 = statement.executeQuery("SET hive.cli.print.header")
+ rs2.next()
+ assert("hive.cli.print.header=true" === rs2.getString(1))
+ rs2.close()
+ },
+
+ // third session, we get the latest session status, supposed to be the
+ // default value
+ { statement =>
+
+ val rs1 = statement.executeQuery(s"SET ${SQLConf.SHUFFLE_PARTITIONS}")
+ rs1.next()
+ assert(defaultV1 === rs1.getString(1))
+ rs1.close()
+
+ val rs2 = statement.executeQuery("SET hive.cli.print.header")
+ rs2.next()
+ assert(defaultV2 === rs2.getString(1))
+ rs2.close()
+ },
+
+ // accessing the cached data in another session
+ { statement =>
+
+ val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC")
+ val buf1 = new collection.mutable.ArrayBuffer[Int]()
+ while (rs1.next()) {
+ buf1 += rs1.getInt(1)
+ }
+ rs1.close()
+
+ val rs2 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
+ val buf2 = new collection.mutable.ArrayBuffer[Int]()
+ while (rs2.next()) {
+ buf2 += rs2.getInt(1)
+ }
+ rs2.close()
+
+ assert(buf1 === buf2)
+ statement.executeQuery("UNCACHE TABLE test_table")
+
+ // TODO need to figure out how to determine if the data loaded from cache
+ val rs3 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
+ val buf3 = new collection.mutable.ArrayBuffer[Int]()
+ while (rs3.next()) {
+ buf3 += rs3.getInt(1)
+ }
+ rs3.close()
+
+ assert(buf1 === buf3)
+ },
+
+ // accessing the uncached table
+ { statement =>
+
+ // TODO need to figure out how to determine if the data loaded from cache
+ val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC")
+ val buf1 = new collection.mutable.ArrayBuffer[Int]()
+ while (rs1.next()) {
+ buf1 += rs1.getInt(1)
+ }
+ rs1.close()
+
+ val rs2 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
+ val buf2 = new collection.mutable.ArrayBuffer[Int]()
+ while (rs2.next()) {
+ buf2 += rs2.getInt(1)
+ }
+ rs2.close()
+
+ assert(buf1 === buf2)
+ }
+ )
+ }
}
class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
@@ -245,15 +385,22 @@ abstract class HiveThriftJdbcTest extends HiveThriftServer2Test {
s"jdbc:hive2://localhost:$serverPort/"
}
- protected def withJdbcStatement(f: Statement => Unit): Unit = {
- val connection = DriverManager.getConnection(jdbcUri, user, "")
- val statement = connection.createStatement()
-
- try f(statement) finally {
- statement.close()
- connection.close()
+ def withMultipleConnectionJdbcStatement(fs: (Statement => Unit)*) {
+ val user = System.getProperty("user.name")
+ val connections = fs.map { _ => DriverManager.getConnection(jdbcUri, user, "") }
+ val statements = connections.map(_.createStatement())
+
+ try {
+ statements.zip(fs).map { case (s, f) => f(s) }
+ } finally {
+ statements.map(_.close())
+ connections.map(_.close())
}
}
+
+ def withJdbcStatement(f: Statement => Unit) {
+ withMultipleConnectionJdbcStatement(f)
+ }
}
abstract class HiveThriftServer2Test extends FunSuite with BeforeAndAfterAll with Logging {