aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-10-08 17:34:24 -0700
committerDavies Liu <davies.liu@gmail.com>2015-10-08 17:34:24 -0700
commit3390b400d04e40f767d8a51f1078fcccb4e64abd (patch)
treed48ed36a14abf0b15467c9ae9c7c04933fdd3a19 /sql/catalyst
parent84ea287178247c163226e835490c9c70b17d8d3b (diff)
downloadspark-3390b400d04e40f767d8a51f1078fcccb4e64abd.tar.gz
spark-3390b400d04e40f767d8a51f1078fcccb4e64abd.tar.bz2
spark-3390b400d04e40f767d8a51f1078fcccb4e64abd.zip
[SPARK-10810] [SPARK-10902] [SQL] Improve session management in SQL
This PR improve the sessions management by replacing the thread-local based to one SQLContext per session approach, introduce separated temporary tables and UDFs/UDAFs for each session. A new session of SQLContext could be created by: 1) create an new SQLContext 2) call newSession() on existing SQLContext For HiveContext, in order to reduce the cost for each session, the classloader and Hive client are shared across multiple sessions (created by newSession). CacheManager is also shared by multiple sessions, so cache a table multiple times in different sessions will not cause multiple copies of in-memory cache. Added jars are still shared by all the sessions, because SparkContext does not support sessions. cc marmbrus yhuai rxin Author: Davies Liu <davies@databricks.com> Closes #8909 from davies/sessions.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala28
1 files changed, 21 insertions, 7 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index e6122d92b7..ba77b70a37 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -51,23 +51,37 @@ class SimpleFunctionRegistry extends FunctionRegistry {
private val functionBuilders =
StringKeyHashMap[(ExpressionInfo, FunctionBuilder)](caseSensitive = false)
- override def registerFunction(name: String, info: ExpressionInfo, builder: FunctionBuilder)
- : Unit = {
+ override def registerFunction(
+ name: String,
+ info: ExpressionInfo,
+ builder: FunctionBuilder): Unit = synchronized {
functionBuilders.put(name, (info, builder))
}
override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
- val func = functionBuilders.get(name).map(_._2).getOrElse {
- throw new AnalysisException(s"undefined function $name")
+ val func = synchronized {
+ functionBuilders.get(name).map(_._2).getOrElse {
+ throw new AnalysisException(s"undefined function $name")
+ }
}
func(children)
}
- override def listFunction(): Seq[String] = functionBuilders.iterator.map(_._1).toList.sorted
+ override def listFunction(): Seq[String] = synchronized {
+ functionBuilders.iterator.map(_._1).toList.sorted
+ }
- override def lookupFunction(name: String): Option[ExpressionInfo] = {
+ override def lookupFunction(name: String): Option[ExpressionInfo] = synchronized {
functionBuilders.get(name).map(_._1)
}
+
+ def copy(): SimpleFunctionRegistry = synchronized {
+ val registry = new SimpleFunctionRegistry
+ functionBuilders.iterator.foreach { case (name, (info, builder)) =>
+ registry.registerFunction(name, info, builder)
+ }
+ registry
+ }
}
/**
@@ -257,7 +271,7 @@ object FunctionRegistry {
expression[InputFileName]("input_file_name")
)
- val builtin: FunctionRegistry = {
+ val builtin: SimpleFunctionRegistry = {
val fr = new SimpleFunctionRegistry
expressions.foreach { case (name, (info, builder)) => fr.registerFunction(name, info, builder) }
fr