aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver/src
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-04-20 12:58:48 -0700
committerReynold Xin <rxin@databricks.com>2016-04-20 12:58:48 -0700
commit8fc267ab3322e46db81e725a5cb1adb5a71b2b4d (patch)
tree5332cbe84256366dbe8fb2c7e10b86c3a41d2f32 /sql/hive-thriftserver/src
parentcb8ea9e1f34b9af287b3d10e47f24de4307c63ba (diff)
downloadspark-8fc267ab3322e46db81e725a5cb1adb5a71b2b4d.tar.gz
spark-8fc267ab3322e46db81e725a5cb1adb5a71b2b4d.tar.bz2
spark-8fc267ab3322e46db81e725a5cb1adb5a71b2b4d.zip
[SPARK-14720][SPARK-13643] Move Hive-specific methods into HiveSessionState and Create a SparkSession class
## What changes were proposed in this pull request? This PR has two main changes. 1. Move Hive-specific methods from HiveContext to HiveSessionState, which help the work of removing HiveContext. 2. Create a SparkSession Class, which will later be the entry point of Spark SQL users. ## How was this patch tested? Existing tests This PR is trying to fix test failures of https://github.com/apache/spark/pull/12485. Author: Andrew Or <andrew@databricks.com> Author: Yin Huai <yhuai@databricks.com> Closes #12522 from yhuai/spark-session.
Diffstat (limited to 'sql/hive-thriftserver/src')
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala2
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala7
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala6
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala2
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala2
5 files changed, 10 insertions, 9 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 673a293ce2..d89c3b4ab2 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -195,7 +195,7 @@ private[hive] class SparkExecuteStatementOperation(
setState(OperationState.RUNNING)
// Always use the latest class loader provided by executionHive's state.
val executionHiveClassLoader =
- hiveContext.executionHive.state.getConf.getClassLoader
+ hiveContext.sessionState.executionHive.state.getConf.getClassLoader
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
HiveThriftServer2.listener.onStatementStart(
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
index b8bc8ea44d..7e8eada5ad 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
+import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, HiveQueryExecution}
private[hive] class SparkSQLDriver(
val context: HiveContext = SparkSQLEnv.hiveContext)
@@ -41,7 +41,7 @@ private[hive] class SparkSQLDriver(
override def init(): Unit = {
}
- private def getResultSetSchema(query: context.QueryExecution): Schema = {
+ private def getResultSetSchema(query: HiveQueryExecution): Schema = {
val analyzed = query.analyzed
logDebug(s"Result Schema: ${analyzed.output}")
if (analyzed.output.isEmpty) {
@@ -59,7 +59,8 @@ private[hive] class SparkSQLDriver(
// TODO unify the error code
try {
context.sparkContext.setJobDescription(command)
- val execution = context.executePlan(context.sql(command).logicalPlan)
+ val execution =
+ context.executePlan(context.sql(command).logicalPlan).asInstanceOf[HiveQueryExecution]
hiveResponse = execution.stringResult()
tableSchema = getResultSetSchema(execution)
new CommandProcessorResponse(0)
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
index ae1d737b58..2679ac1854 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
@@ -58,9 +58,9 @@ private[hive] object SparkSQLEnv extends Logging {
sparkContext.addSparkListener(new StatsReportListener())
hiveContext = new HiveContext(sparkContext)
- hiveContext.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8"))
- hiveContext.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
- hiveContext.metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))
+ hiveContext.sessionState.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8"))
+ hiveContext.sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
+ hiveContext.sessionState.metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))
hiveContext.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
index de4e9c62b5..f492b5656c 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -71,7 +71,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext:
val session = super.getSession(sessionHandle)
HiveThriftServer2.listener.onSessionCreated(
session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername)
- val ctx = if (hiveContext.hiveThriftServerSingleSession) {
+ val ctx = if (hiveContext.sessionState.hiveThriftServerSingleSession) {
hiveContext
} else {
hiveContext.newSession()
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
index 0c468a408b..da410c68c8 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
@@ -47,7 +47,7 @@ private[thriftserver] class SparkSQLOperationManager()
confOverlay: JMap[String, String],
async: Boolean): ExecuteStatementOperation = synchronized {
val hiveContext = sessionToContexts(parentSession.getSessionHandle)
- val runInBackground = async && hiveContext.hiveThriftServerAsync
+ val runInBackground = async && hiveContext.sessionState.hiveThriftServerAsync
val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
runInBackground)(hiveContext, sessionToActivePool)
handleToOperation.put(operation.getHandle, operation)