aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@databricks.com>2017-03-28 23:14:31 +0800
committerWenchen Fan <wenchen@databricks.com>2017-03-28 23:14:31 +0800
commitf82461fc1197f6055d9cf972d82260b178e10a7c (patch)
tree36bb1f58ce3080b1b2d86cd8c2b99148d07cbf0c /sql/core/src/main/scala/org/apache
parent4fcc214d9eb5e98b2eed3e28cc23b0c511cd9007 (diff)
downloadspark-f82461fc1197f6055d9cf972d82260b178e10a7c.tar.gz
spark-f82461fc1197f6055d9cf972d82260b178e10a7c.tar.bz2
spark-f82461fc1197f6055d9cf972d82260b178e10a7c.zip
[SPARK-20126][SQL] Remove HiveSessionState
## What changes were proposed in this pull request? Commit https://github.com/apache/spark/commit/ea361165e1ddce4d8aa0242ae3e878d7b39f1de2 moved most of the logic from the SessionState classes into an accompanying builder. This makes the existence of the `HiveSessionState` redundant. This PR removes the `HiveSessionState`. ## How was this patch tested? Existing tests. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17457 from hvanhovell/SPARK-20126.
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala47
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala8
3 files changed, 32 insertions, 25 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala
index 20b0894667..2e859cf1ef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala
@@ -37,7 +37,7 @@ case class AddJarCommand(path: String) extends RunnableCommand {
}
override def run(sparkSession: SparkSession): Seq[Row] = {
- sparkSession.sessionState.addJar(path)
+ sparkSession.sessionState.resourceLoader.addJar(path)
Seq(Row(0))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index b5b0bb0bfc..c6241d923d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -63,6 +63,7 @@ private[sql] class SessionState(
val optimizer: Optimizer,
val planner: SparkPlanner,
val streamingQueryManager: StreamingQueryManager,
+ val resourceLoader: SessionResourceLoader,
createQueryExecution: LogicalPlan => QueryExecution,
createClone: (SparkSession, SessionState) => SessionState) {
@@ -106,27 +107,6 @@ private[sql] class SessionState(
def refreshTable(tableName: String): Unit = {
catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
}
-
- /**
- * Add a jar path to [[SparkContext]] and the classloader.
- *
- * Note: this method seems not access any session state, but the subclass `HiveSessionState` needs
- * to add the jar to its hive client for the current session. Hence, it still needs to be in
- * [[SessionState]].
- */
- def addJar(path: String): Unit = {
- sparkContext.addJar(path)
- val uri = new Path(path).toUri
- val jarURL = if (uri.getScheme == null) {
- // `path` is a local file path without a URL scheme
- new File(path).toURI.toURL
- } else {
- // `path` is a URL with a scheme
- uri.toURL
- }
- sharedState.jarClassLoader.addURL(jarURL)
- Thread.currentThread().setContextClassLoader(sharedState.jarClassLoader)
- }
}
private[sql] object SessionState {
@@ -160,10 +140,10 @@ class SessionStateBuilder(
* Session shared [[FunctionResourceLoader]].
*/
@InterfaceStability.Unstable
-class SessionFunctionResourceLoader(session: SparkSession) extends FunctionResourceLoader {
+class SessionResourceLoader(session: SparkSession) extends FunctionResourceLoader {
override def loadResource(resource: FunctionResource): Unit = {
resource.resourceType match {
- case JarResource => session.sessionState.addJar(resource.uri)
+ case JarResource => addJar(resource.uri)
case FileResource => session.sparkContext.addFile(resource.uri)
case ArchiveResource =>
throw new AnalysisException(
@@ -171,4 +151,25 @@ class SessionFunctionResourceLoader(session: SparkSession) extends FunctionResou
"please use --archives options while calling spark-submit.")
}
}
+
+ /**
+ * Add a jar path to [[SparkContext]] and the classloader.
+ *
+ * Note: this method seems not access any session state, but the subclass `HiveSessionState` needs
+ * to add the jar to its hive client for the current session. Hence, it still needs to be in
+ * [[SessionState]].
+ */
+ def addJar(path: String): Unit = {
+ session.sparkContext.addJar(path)
+ val uri = new Path(path).toUri
+ val jarURL = if (uri.getScheme == null) {
+ // `path` is a local file path without a URL scheme
+ new File(path).toURI.toURL
+ } else {
+ // `path` is a URL with a scheme
+ uri.toURL
+ }
+ session.sharedState.jarClassLoader.addURL(jarURL)
+ Thread.currentThread().setContextClassLoader(session.sharedState.jarClassLoader)
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala
index 6b5559adb1..b8f645fdee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala
@@ -110,6 +110,11 @@ abstract class BaseSessionStateBuilder(
protected lazy val sqlParser: ParserInterface = new SparkSqlParser(conf)
/**
+ * ResourceLoader that is used to load function resources and jars.
+ */
+ protected lazy val resourceLoader: SessionResourceLoader = new SessionResourceLoader(session)
+
+ /**
* Catalog for managing table and database states. If there is a pre-existing catalog, the state
* of that catalog (temp tables & current database) will be copied into the new catalog.
*
@@ -123,7 +128,7 @@ abstract class BaseSessionStateBuilder(
conf,
SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
sqlParser,
- new SessionFunctionResourceLoader(session))
+ resourceLoader)
parentState.foreach(_.catalog.copyStateTo(catalog))
catalog
}
@@ -251,6 +256,7 @@ abstract class BaseSessionStateBuilder(
optimizer,
planner,
streamingQueryManager,
+ resourceLoader,
createQueryExecution,
createClone)
}