diff options
author | Herman van Hovell <hvanhovell@databricks.com> | 2017-03-28 23:14:31 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2017-03-28 23:14:31 +0800 |
commit | f82461fc1197f6055d9cf972d82260b178e10a7c (patch) | |
tree | 36bb1f58ce3080b1b2d86cd8c2b99148d07cbf0c /sql/core/src/main/scala/org/apache | |
parent | 4fcc214d9eb5e98b2eed3e28cc23b0c511cd9007 (diff) | |
download | spark-f82461fc1197f6055d9cf972d82260b178e10a7c.tar.gz spark-f82461fc1197f6055d9cf972d82260b178e10a7c.tar.bz2 spark-f82461fc1197f6055d9cf972d82260b178e10a7c.zip |
[SPARK-20126][SQL] Remove HiveSessionState
## What changes were proposed in this pull request?
Commit https://github.com/apache/spark/commit/ea361165e1ddce4d8aa0242ae3e878d7b39f1de2 moved most of the logic from the SessionState classes into an accompanying builder. This makes the existence of the `HiveSessionState` redundant. This PR removes the `HiveSessionState`.
## How was this patch tested?
Existing tests.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes #17457 from hvanhovell/SPARK-20126.
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
3 files changed, 32 insertions, 25 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala index 20b0894667..2e859cf1ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala @@ -37,7 +37,7 @@ case class AddJarCommand(path: String) extends RunnableCommand { } override def run(sparkSession: SparkSession): Seq[Row] = { - sparkSession.sessionState.addJar(path) + sparkSession.sessionState.resourceLoader.addJar(path) Seq(Row(0)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index b5b0bb0bfc..c6241d923d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -63,6 +63,7 @@ private[sql] class SessionState( val optimizer: Optimizer, val planner: SparkPlanner, val streamingQueryManager: StreamingQueryManager, + val resourceLoader: SessionResourceLoader, createQueryExecution: LogicalPlan => QueryExecution, createClone: (SparkSession, SessionState) => SessionState) { @@ -106,27 +107,6 @@ private[sql] class SessionState( def refreshTable(tableName: String): Unit = { catalog.refreshTable(sqlParser.parseTableIdentifier(tableName)) } - - /** - * Add a jar path to [[SparkContext]] and the classloader. - * - * Note: this method seems not access any session state, but the subclass `HiveSessionState` needs - * to add the jar to its hive client for the current session. Hence, it still needs to be in - * [[SessionState]]. - */ - def addJar(path: String): Unit = { - sparkContext.addJar(path) - val uri = new Path(path).toUri - val jarURL = if (uri.getScheme == null) { - // `path` is a local file path without a URL scheme - new File(path).toURI.toURL - } else { - // `path` is a URL with a scheme - uri.toURL - } - sharedState.jarClassLoader.addURL(jarURL) - Thread.currentThread().setContextClassLoader(sharedState.jarClassLoader) - } } private[sql] object SessionState { @@ -160,10 +140,10 @@ class SessionStateBuilder( * Session shared [[FunctionResourceLoader]]. */ @InterfaceStability.Unstable -class SessionFunctionResourceLoader(session: SparkSession) extends FunctionResourceLoader { +class SessionResourceLoader(session: SparkSession) extends FunctionResourceLoader { override def loadResource(resource: FunctionResource): Unit = { resource.resourceType match { - case JarResource => session.sessionState.addJar(resource.uri) + case JarResource => addJar(resource.uri) case FileResource => session.sparkContext.addFile(resource.uri) case ArchiveResource => throw new AnalysisException( @@ -171,4 +151,25 @@ class SessionFunctionResourceLoader(session: SparkSession) extends FunctionResou "please use --archives options while calling spark-submit.") } } + + /** + * Add a jar path to [[SparkContext]] and the classloader. + * + * Note: this method seems not access any session state, but the subclass `HiveSessionState` needs + * to add the jar to its hive client for the current session. Hence, it still needs to be in + * [[SessionState]]. + */ + def addJar(path: String): Unit = { + session.sparkContext.addJar(path) + val uri = new Path(path).toUri + val jarURL = if (uri.getScheme == null) { + // `path` is a local file path without a URL scheme + new File(path).toURI.toURL + } else { + // `path` is a URL with a scheme + uri.toURL + } + session.sharedState.jarClassLoader.addURL(jarURL) + Thread.currentThread().setContextClassLoader(session.sharedState.jarClassLoader) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala index 6b5559adb1..b8f645fdee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala @@ -110,6 +110,11 @@ abstract class BaseSessionStateBuilder( protected lazy val sqlParser: ParserInterface = new SparkSqlParser(conf) /** + * ResourceLoader that is used to load function resources and jars. + */ + protected lazy val resourceLoader: SessionResourceLoader = new SessionResourceLoader(session) + + /** * Catalog for managing table and database states. If there is a pre-existing catalog, the state * of that catalog (temp tables & current database) will be copied into the new catalog. * @@ -123,7 +128,7 @@ abstract class BaseSessionStateBuilder( conf, SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf), sqlParser, - new SessionFunctionResourceLoader(session)) + resourceLoader) parentState.foreach(_.catalog.copyStateTo(catalog)) catalog } @@ -251,6 +256,7 @@ abstract class BaseSessionStateBuilder( optimizer, planner, streamingQueryManager, + resourceLoader, createQueryExecution, createClone) } |