From 7b1407c7b95c43299a30e891748824c4bc47e43b Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 17 Nov 2015 11:17:52 -0800 Subject: [SPARK-11089][SQL] Adds option for disabling multi-session in Thrift server This PR adds a new option `spark.sql.hive.thriftServer.singleSession` for disabling multi-session support in the Thrift server. Note that this option is added as a Spark configuration (retrieved from `SparkConf`) rather than Spark SQL configuration (retrieved from `SQLConf`). This is because all SQL configurations are session-ized. Since multi-session support is by default on, no JDBC connection can modify global configurations like the newly added one. Author: Cheng Lian Closes #9740 from liancheng/spark-11089.single-session-option. --- docs/sql-programming-guide.md | 14 ++++++ .../hive/thriftserver/SparkSQLSessionManager.scala | 6 ++- .../thriftserver/HiveThriftServer2Suites.scala | 51 +++++++++++++++++++++- .../org/apache/spark/sql/hive/HiveContext.scala | 3 ++ 4 files changed, 72 insertions(+), 2 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 6e02d6564b..e347754055 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -2051,6 +2051,20 @@ options. # Migration Guide +## Upgrading From Spark SQL 1.5 to 1.6 + + - From Spark 1.6, by default the Thrift server runs in multi-session mode. Which means each JDBC/ODBC + connection owns a copy of their own SQL configuration and temporary function registry. Cached + tables are still shared though. If you prefer to run the Thrift server in the old single-session + mode, please set option `spark.sql.hive.thriftServer.singleSession` to `true`. You may either add + this option to `spark-defaults.conf`, or pass it to `start-thriftserver.sh` via `--conf`: + + {% highlight bash %} + ./sbin/start-thriftserver.sh \ + --conf spark.sql.hive.thriftServer.singleSession=true \ + ... + {% endhighlight %} + ## Upgrading From Spark SQL 1.4 to 1.5 - Optimized execution using manually managed memory (Tungsten) is now enabled by default, along with diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index 33aaead3fb..af4fcdf021 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -66,7 +66,11 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext: val session = super.getSession(sessionHandle) HiveThriftServer2.listener.onSessionCreated( session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername) - val ctx = hiveContext.newSession() + val ctx = if (hiveContext.hiveThriftServerSingleSession) { + hiveContext + } else { + hiveContext.newSession() + } ctx.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion) sparkSqlOperationManager.sessionToContexts += sessionHandle -> ctx sessionHandle diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index eb1895f263..1dd898aa38 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -41,7 +41,6 @@ import org.apache.thrift.transport.TSocket import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.hive.HiveContext -import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer import org.apache.spark.util.Utils import org.apache.spark.{Logging, SparkFunSuite} @@ -510,6 +509,53 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } } +class SingleSessionSuite extends HiveThriftJdbcTest { + override def mode: ServerMode.Value = ServerMode.binary + + override protected def extraConf: Seq[String] = + "--conf spark.sql.hive.thriftServer.singleSession=true" :: Nil + + test("test single session") { + withMultipleConnectionJdbcStatement( + { statement => + val jarPath = "../hive/src/test/resources/TestUDTF.jar" + val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath" + + // Configurations and temporary functions added in this session should be visible to all + // the other sessions. + Seq( + "SET foo=bar", + s"ADD JAR $jarURL", + s"""CREATE TEMPORARY FUNCTION udtf_count2 + |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2' + """.stripMargin + ).foreach(statement.execute) + }, + + { statement => + val rs1 = statement.executeQuery("SET foo") + + assert(rs1.next()) + assert(rs1.getString(1) === "foo") + assert(rs1.getString(2) === "bar") + + val rs2 = statement.executeQuery("DESCRIBE FUNCTION udtf_count2") + + assert(rs2.next()) + assert(rs2.getString(1) === "Function: udtf_count2") + + assert(rs2.next()) + assertResult("Class: org.apache.spark.sql.hive.execution.GenericUDTFCount2") { + rs2.getString(1) + } + + assert(rs2.next()) + assert(rs2.getString(1) === "Usage: To be added.") + } + ) + } +} + class HiveThriftHttpServerSuite extends HiveThriftJdbcTest { override def mode: ServerMode.Value = ServerMode.http @@ -600,6 +646,8 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl private var logTailingProcess: Process = _ private var diagnosisBuffer: ArrayBuffer[String] = ArrayBuffer.empty[String] + protected def extraConf: Seq[String] = Nil + protected def serverStartCommand(port: Int) = { val portConf = if (mode == ServerMode.binary) { ConfVars.HIVE_SERVER2_THRIFT_PORT @@ -635,6 +683,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl | --driver-class-path $driverClassPath | --driver-java-options -Dlog4j.debug | --conf spark.ui.enabled=false + | ${extraConf.mkString("\n")} """.stripMargin.split("\\s+").toSeq } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 2004f24ad2..c0bb5af7d5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -190,6 +190,9 @@ class HiveContext private[hive]( */ protected[hive] def hiveThriftServerAsync: Boolean = getConf(HIVE_THRIFT_SERVER_ASYNC) + protected[hive] def hiveThriftServerSingleSession: Boolean = + sc.conf.get("spark.sql.hive.thriftServer.singleSession", "false").toBoolean + @transient protected[sql] lazy val substitutor = new VariableSubstitution() -- cgit v1.2.3