aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-10-13 09:57:53 -0700
committerYin Huai <yhuai@databricks.com>2015-10-13 09:57:53 -0700
commitd0cc79ccd0b4500bd6b18184a723dabc164e8abd (patch)
tree1b62b02113975c2ae800c703bdb598189982f778 /sql/core
parent1797055dbf1d2fd7714d7c65c8d2efde2f15efc1 (diff)
downloadspark-d0cc79ccd0b4500bd6b18184a723dabc164e8abd.tar.gz
spark-d0cc79ccd0b4500bd6b18184a723dabc164e8abd.tar.bz2
spark-d0cc79ccd0b4500bd6b18184a723dabc164e8abd.zip
[SPARK-11030] [SQL] share the SQLTab across sessions
The SQLTab will be shared by multiple sessions. If we create multiple independent SQLContexts (not using newSession()), will still see multiple SQLTabs in the Spark UI. Author: Davies Liu <davies@databricks.com> Closes #9048 from davies/sqlui.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala23
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala8
4 files changed, 24 insertions, 21 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 1bd2913892..cd937257d3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -65,12 +65,15 @@ import org.apache.spark.util.Utils
class SQLContext private[sql](
@transient val sparkContext: SparkContext,
@transient protected[sql] val cacheManager: CacheManager,
+ @transient private[sql] val listener: SQLListener,
val isRootContext: Boolean)
extends org.apache.spark.Logging with Serializable {
self =>
- def this(sparkContext: SparkContext) = this(sparkContext, new CacheManager, true)
+ def this(sparkContext: SparkContext) = {
+ this(sparkContext, new CacheManager, SQLContext.createListenerAndUI(sparkContext), true)
+ }
def this(sparkContext: JavaSparkContext) = this(sparkContext.sc)
// If spark.sql.allowMultipleContexts is true, we will throw an exception if a user
@@ -97,7 +100,7 @@ class SQLContext private[sql](
/**
* Returns a SQLContext as new session, with separated SQL configurations, temporary tables,
- * registered functions, but sharing the same SparkContext and CacheManager.
+ * registered functions, but sharing the same SparkContext, CacheManager, SQLListener and SQLTab.
*
* @since 1.6.0
*/
@@ -105,6 +108,7 @@ class SQLContext private[sql](
new SQLContext(
sparkContext = sparkContext,
cacheManager = cacheManager,
+ listener = listener,
isRootContext = false)
}
@@ -113,11 +117,6 @@ class SQLContext private[sql](
*/
protected[sql] lazy val conf = new SQLConf
- // `listener` should be only used in the driver
- @transient private[sql] val listener = new SQLListener(this)
- sparkContext.addSparkListener(listener)
- sparkContext.ui.foreach(new SQLTab(this, _))
-
/**
* Set Spark SQL configuration properties.
*
@@ -1312,4 +1311,14 @@ object SQLContext {
): InternalRow
}
}
+
+ /**
+ * Create a SQLListener then add it into SparkContext, and create an SQLTab if there is SparkUI.
+ */
+ private[sql] def createListenerAndUI(sc: SparkContext): SQLListener = {
+ val listener = new SQLListener(sc.conf)
+ sc.addSparkListener(listener)
+ sc.ui.foreach(new SQLTab(listener, _))
+ listener
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 5779c71f64..d6472400a6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -19,19 +19,15 @@ package org.apache.spark.sql.execution.ui
import scala.collection.mutable
-import com.google.common.annotations.VisibleForTesting
-
-import org.apache.spark.{JobExecutionStatus, Logging}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
-import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue}
+import org.apache.spark.{JobExecutionStatus, Logging, SparkConf}
-private[sql] class SQLListener(sqlContext: SQLContext) extends SparkListener with Logging {
+private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Logging {
- private val retainedExecutions =
- sqlContext.sparkContext.conf.getInt("spark.sql.ui.retainedExecutions", 1000)
+ private val retainedExecutions = conf.getInt("spark.sql.ui.retainedExecutions", 1000)
private val activeExecutions = mutable.HashMap[Long, SQLExecutionUIData]()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
index 0b0867f67e..9c27944d42 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
@@ -20,14 +20,12 @@ package org.apache.spark.sql.execution.ui
import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.Logging
-import org.apache.spark.sql.SQLContext
import org.apache.spark.ui.{SparkUI, SparkUITab}
-private[sql] class SQLTab(sqlContext: SQLContext, sparkUI: SparkUI)
+private[sql] class SQLTab(val listener: SQLListener, sparkUI: SparkUI)
extends SparkUITab(sparkUI, SQLTab.nextTabName) with Logging {
val parent = sparkUI
- val listener = sqlContext.listener
attachPage(new AllExecutionsPage(this))
attachPage(new ExecutionPage(this))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 7a46c69a05..727cf3665a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -74,7 +74,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
}
test("basic") {
- val listener = new SQLListener(sqlContext)
+ val listener = new SQLListener(sqlContext.sparkContext.conf)
val executionId = 0
val df = createTestDataFrame
val accumulatorIds =
@@ -212,7 +212,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
}
test("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
- val listener = new SQLListener(sqlContext)
+ val listener = new SQLListener(sqlContext.sparkContext.conf)
val executionId = 0
val df = createTestDataFrame
listener.onExecutionStart(
@@ -241,7 +241,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
}
test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") {
- val listener = new SQLListener(sqlContext)
+ val listener = new SQLListener(sqlContext.sparkContext.conf)
val executionId = 0
val df = createTestDataFrame
listener.onExecutionStart(
@@ -281,7 +281,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
}
test("onExecutionEnd happens before onJobEnd(JobFailed)") {
- val listener = new SQLListener(sqlContext)
+ val listener = new SQLListener(sqlContext.sparkContext.conf)
val executionId = 0
val df = createTestDataFrame
listener.onExecutionStart(