aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-04-20 12:58:48 -0700
committerReynold Xin <rxin@databricks.com>2016-04-20 12:58:48 -0700
commit8fc267ab3322e46db81e725a5cb1adb5a71b2b4d (patch)
tree5332cbe84256366dbe8fb2c7e10b86c3a41d2f32 /sql/core
parentcb8ea9e1f34b9af287b3d10e47f24de4307c63ba (diff)
downloadspark-8fc267ab3322e46db81e725a5cb1adb5a71b2b4d.tar.gz
spark-8fc267ab3322e46db81e725a5cb1adb5a71b2b4d.tar.bz2
spark-8fc267ab3322e46db81e725a5cb1adb5a71b2b4d.zip
[SPARK-14720][SPARK-13643] Move Hive-specific methods into HiveSessionState and Create a SparkSession class
## What changes were proposed in this pull request? This PR has two main changes. 1. Move Hive-specific methods from HiveContext to HiveSessionState, which help the work of removing HiveContext. 2. Create a SparkSession Class, which will later be the entry point of Spark SQL users. ## How was this patch tested? Existing tests This PR is trying to fix test failures of https://github.com/apache/spark/pull/12485. Author: Andrew Or <andrew@databricks.com> Author: Yin Huai <yhuai@databricks.com> Closes #12522 from yhuai/spark-session.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala52
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala100
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala54
3 files changed, 179 insertions, 27 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 781d699819..f3f84144ad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -63,14 +63,18 @@ import org.apache.spark.util.Utils
* @since 1.0.0
*/
class SQLContext private[sql](
- @transient protected[sql] val sharedState: SharedState,
+ @transient private val sparkSession: SparkSession,
val isRootContext: Boolean)
extends Logging with Serializable {
self =>
+ private[sql] def this(sparkSession: SparkSession) = {
+ this(sparkSession, true)
+ }
+
def this(sc: SparkContext) = {
- this(new SharedState(sc), true)
+ this(new SparkSession(sc))
}
def this(sparkContext: JavaSparkContext) = this(sparkContext.sc)
@@ -97,12 +101,15 @@ class SQLContext private[sql](
}
}
- def sparkContext: SparkContext = sharedState.sparkContext
-
+ protected[sql] def sessionState: SessionState = sparkSession.sessionState
+ protected[sql] def sharedState: SharedState = sparkSession.sharedState
+ protected[sql] def conf: SQLConf = sessionState.conf
protected[sql] def cacheManager: CacheManager = sharedState.cacheManager
protected[sql] def listener: SQLListener = sharedState.listener
protected[sql] def externalCatalog: ExternalCatalog = sharedState.externalCatalog
+ def sparkContext: SparkContext = sharedState.sparkContext
+
/**
* Returns a [[SQLContext]] as new session, with separated SQL configurations, temporary
* tables, registered functions, but sharing the same [[SparkContext]], cached data and
@@ -110,14 +117,9 @@ class SQLContext private[sql](
*
* @since 1.6.0
*/
- def newSession(): SQLContext = new SQLContext(sharedState, isRootContext = false)
-
- /**
- * Per-session state, e.g. configuration, functions, temporary tables etc.
- */
- @transient
- protected[sql] lazy val sessionState: SessionState = new SessionState(self)
- protected[spark] def conf: SQLConf = sessionState.conf
+ def newSession(): SQLContext = {
+ new SQLContext(sparkSession.newSession(), isRootContext = false)
+ }
/**
* An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s
@@ -132,10 +134,14 @@ class SQLContext private[sql](
* @group config
* @since 1.0.0
*/
- def setConf(props: Properties): Unit = conf.setConf(props)
+ def setConf(props: Properties): Unit = sessionState.setConf(props)
- /** Set the given Spark SQL configuration property. */
- private[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = conf.setConf(entry, value)
+ /**
+ * Set the given Spark SQL configuration property.
+ */
+ private[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
+ sessionState.setConf(entry, value)
+ }
/**
* Set the given Spark SQL configuration property.
@@ -143,7 +149,7 @@ class SQLContext private[sql](
* @group config
* @since 1.0.0
*/
- def setConf(key: String, value: String): Unit = conf.setConfString(key, value)
+ def setConf(key: String, value: String): Unit = sessionState.setConf(key, value)
/**
* Return the value of Spark SQL configuration property for the given key.
@@ -186,23 +192,19 @@ class SQLContext private[sql](
*/
def getAllConfs: immutable.Map[String, String] = conf.getAllConfs
- // Extract `spark.sql.*` entries and put it in our SQLConf.
- // Subclasses may additionally set these entries in other confs.
- SQLContext.getSQLProperties(sparkContext.getConf).asScala.foreach { case (k, v) =>
- setConf(k, v)
- }
-
protected[sql] def parseSql(sql: String): LogicalPlan = sessionState.sqlParser.parsePlan(sql)
protected[sql] def executeSql(sql: String): QueryExecution = executePlan(parseSql(sql))
- protected[sql] def executePlan(plan: LogicalPlan) = new QueryExecution(this, plan)
+ protected[sql] def executePlan(plan: LogicalPlan): QueryExecution = {
+ sessionState.executePlan(plan)
+ }
/**
* Add a jar to SQLContext
*/
protected[sql] def addJar(path: String): Unit = {
- sparkContext.addJar(path)
+ sessionState.addJar(path)
}
/** A [[FunctionResourceLoader]] that can be used in SessionCatalog. */
@@ -768,7 +770,7 @@ class SQLContext private[sql](
* as Spark can parse all supported Hive DDLs itself.
*/
private[sql] def runNativeSql(sqlText: String): Seq[Row] = {
- throw new UnsupportedOperationException
+ sessionState.runNativeSql(sqlText).map { r => Row(r) }
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
new file mode 100644
index 0000000000..17ba299825
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION
+import org.apache.spark.sql.internal.{SessionState, SharedState}
+import org.apache.spark.util.Utils
+
+
+/**
+ * The entry point to Spark execution.
+ */
+class SparkSession private(
+ sparkContext: SparkContext,
+ existingSharedState: Option[SharedState]) { self =>
+
+ def this(sc: SparkContext) {
+ this(sc, None)
+ }
+
+ /**
+ * Start a new session where configurations, temp tables, temp functions etc. are isolated.
+ */
+ def newSession(): SparkSession = {
+ // Note: materialize the shared state here to ensure the parent and child sessions are
+ // initialized with the same shared state.
+ new SparkSession(sparkContext, Some(sharedState))
+ }
+
+ @transient
+ protected[sql] lazy val sharedState: SharedState = {
+ existingSharedState.getOrElse(
+ SparkSession.reflect[SharedState, SparkContext](
+ SparkSession.sharedStateClassName(sparkContext.conf),
+ sparkContext))
+ }
+
+ @transient
+ protected[sql] lazy val sessionState: SessionState = {
+ SparkSession.reflect[SessionState, SQLContext](
+ SparkSession.sessionStateClassName(sparkContext.conf),
+ new SQLContext(self, isRootContext = false))
+ }
+
+}
+
+
+private object SparkSession {
+
+ private def sharedStateClassName(conf: SparkConf): String = {
+ conf.get(CATALOG_IMPLEMENTATION) match {
+ case "hive" => "org.apache.spark.sql.hive.HiveSharedState"
+ case "in-memory" => classOf[SharedState].getCanonicalName
+ }
+ }
+
+ private def sessionStateClassName(conf: SparkConf): String = {
+ conf.get(CATALOG_IMPLEMENTATION) match {
+ case "hive" => "org.apache.spark.sql.hive.HiveSessionState"
+ case "in-memory" => classOf[SessionState].getCanonicalName
+ }
+ }
+
+ /**
+ * Helper method to create an instance of [[T]] using a single-arg constructor that
+ * accepts an [[Arg]].
+ */
+ private def reflect[T, Arg <: AnyRef](
+ className: String,
+ ctorArg: Arg)(implicit ctorArgTag: ClassTag[Arg]): T = {
+ try {
+ val clazz = Utils.classForName(className)
+ val ctor = clazz.getDeclaredConstructor(ctorArgTag.runtimeClass)
+ ctor.newInstance(ctorArg).asInstanceOf[T]
+ } catch {
+ case NonFatal(e) =>
+ throw new IllegalArgumentException(s"Error while instantiating '$className':", e)
+ }
+ }
+
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index d404a7c0ae..42915d5887 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -17,15 +17,22 @@
package org.apache.spark.sql.internal
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.sql.{ContinuousQueryManager, ExperimentalMethods, SQLContext, UDFRegistration}
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, PreInsertCastAndRename, ResolveDataSource}
import org.apache.spark.sql.util.ExecutionListenerManager
+
/**
* A class that holds all session-specific state in a given [[SQLContext]].
*/
@@ -37,7 +44,10 @@ private[sql] class SessionState(ctx: SQLContext) {
/**
* SQL-specific key-value configurations.
*/
- lazy val conf = new SQLConf
+ lazy val conf: SQLConf = new SQLConf
+
+ // Automatically extract `spark.sql.*` entries and put it in our SQLConf
+ setConf(SQLContext.getSQLProperties(ctx.sparkContext.getConf))
lazy val experimentalMethods = new ExperimentalMethods
@@ -101,5 +111,45 @@ private[sql] class SessionState(ctx: SQLContext) {
* Interface to start and stop [[org.apache.spark.sql.ContinuousQuery]]s.
*/
lazy val continuousQueryManager: ContinuousQueryManager = new ContinuousQueryManager(ctx)
-}
+
+ // ------------------------------------------------------
+ // Helper methods, partially leftover from pre-2.0 days
+ // ------------------------------------------------------
+
+ def executePlan(plan: LogicalPlan): QueryExecution = new QueryExecution(ctx, plan)
+
+ def refreshTable(tableName: String): Unit = {
+ catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
+ }
+
+ def invalidateTable(tableName: String): Unit = {
+ catalog.invalidateTable(sqlParser.parseTableIdentifier(tableName))
+ }
+
+ final def setConf(properties: Properties): Unit = {
+ properties.asScala.foreach { case (k, v) => setConf(k, v) }
+ }
+
+ final def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
+ conf.setConf(entry, value)
+ setConf(entry.key, entry.stringConverter(value))
+ }
+
+ def setConf(key: String, value: String): Unit = {
+ conf.setConfString(key, value)
+ }
+
+ def addJar(path: String): Unit = {
+ ctx.sparkContext.addJar(path)
+ }
+
+ def analyze(tableName: String): Unit = {
+ throw new UnsupportedOperationException
+ }
+
+ def runNativeSql(sql: String): Seq[String] = {
+ throw new UnsupportedOperationException
+ }
+
+}