aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-11-05 18:12:54 -0800
committerReynold Xin <rxin@databricks.com>2015-11-05 18:12:54 -0800
commit3cc2c053b5d68c747a30bd58cf388b87b1922f13 (patch)
tree9eb52719b1434317ec87215a624e533031523bc2
parent5e31db70bb783656ba042863fcd3c223e17a8f81 (diff)
downloadspark-3cc2c053b5d68c747a30bd58cf388b87b1922f13.tar.gz
spark-3cc2c053b5d68c747a30bd58cf388b87b1922f13.tar.bz2
spark-3cc2c053b5d68c747a30bd58cf388b87b1922f13.zip
[SPARK-11540][SQL] API audit for QueryExecutionListener.
Author: Reynold Xin <rxin@databricks.com> Closes #9509 from rxin/SPARK-11540.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala30
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala101
2 files changed, 72 insertions, 59 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index fc9174549e..c2142d03f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution
+import com.google.common.annotations.VisibleForTesting
+
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
@@ -25,31 +27,33 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
/**
* The primary workflow for executing relational queries using Spark. Designed to allow easy
* access to the intermediate phases of query execution for developers.
+ *
+ * While this is not a public class, we should avoid changing the function names for the sake of
+ * changing them, because a lot of developers use the feature for debugging.
*/
class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
- val analyzer = sqlContext.analyzer
- val optimizer = sqlContext.optimizer
- val planner = sqlContext.planner
- val cacheManager = sqlContext.cacheManager
- val prepareForExecution = sqlContext.prepareForExecution
- def assertAnalyzed(): Unit = analyzer.checkAnalysis(analyzed)
+ @VisibleForTesting
+ def assertAnalyzed(): Unit = sqlContext.analyzer.checkAnalysis(analyzed)
+
+ lazy val analyzed: LogicalPlan = sqlContext.analyzer.execute(logical)
- lazy val analyzed: LogicalPlan = analyzer.execute(logical)
lazy val withCachedData: LogicalPlan = {
assertAnalyzed()
- cacheManager.useCachedData(analyzed)
+ sqlContext.cacheManager.useCachedData(analyzed)
}
- lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData)
+
+ lazy val optimizedPlan: LogicalPlan = sqlContext.optimizer.execute(withCachedData)
// TODO: Don't just pick the first one...
lazy val sparkPlan: SparkPlan = {
SparkPlan.currentContext.set(sqlContext)
- planner.plan(optimizedPlan).next()
+ sqlContext.planner.plan(optimizedPlan).next()
}
+
// executedPlan should not be used to initialize any SparkPlan. It should be
// only used for execution.
- lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan)
+ lazy val executedPlan: SparkPlan = sqlContext.prepareForExecution.execute(sparkPlan)
/** Internal version of the RDD. Avoids copies and has no schema */
lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
@@ -57,11 +61,11 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: Throwable => e.toString }
- def simpleString: String =
+ def simpleString: String = {
s"""== Physical Plan ==
|${stringOrError(executedPlan)}
""".stripMargin.trim
-
+ }
override def toString: String = {
def output =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
index 909a8abd22..ac432e2baa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
@@ -19,36 +19,38 @@ package org.apache.spark.sql.util
import java.util.concurrent.locks.ReentrantReadWriteLock
import scala.collection.mutable.ListBuffer
+import scala.util.control.NonFatal
-import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.Logging
+import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.sql.execution.QueryExecution
/**
+ * :: Experimental ::
* The interface of query execution listener that can be used to analyze execution metrics.
*
- * Note that implementations should guarantee thread-safety as they will be used in a non
- * thread-safe way.
+ * Note that implementations should guarantee thread-safety as they can be invoked by
+ * multiple different threads.
*/
@Experimental
trait QueryExecutionListener {
/**
* A callback function that will be called when a query executed successfully.
- * Implementations should guarantee thread-safe.
+ * Note that this can be invoked by multiple different threads.
*
- * @param funcName the name of the action that triggered this query.
+ * @param funcName name of the action that triggered this query.
* @param qe the QueryExecution object that carries detail information like logical plan,
* physical plan, etc.
- * @param duration the execution time for this query in nanoseconds.
+ * @param durationNs the execution time for this query in nanoseconds.
*/
@DeveloperApi
- def onSuccess(funcName: String, qe: QueryExecution, duration: Long)
+ def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit
/**
* A callback function that will be called when a query execution failed.
- * Implementations should guarantee thread-safe.
+ * Note that this can be invoked by multiple different threads.
*
* @param funcName the name of the action that triggered this query.
* @param qe the QueryExecution object that carries detail information like logical plan,
@@ -56,34 +58,20 @@ trait QueryExecutionListener {
* @param exception the exception that failed this query.
*/
@DeveloperApi
- def onFailure(funcName: String, qe: QueryExecution, exception: Exception)
+ def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit
}
-@Experimental
-class ExecutionListenerManager extends Logging {
- private[this] val listeners = ListBuffer.empty[QueryExecutionListener]
- private[this] val lock = new ReentrantReadWriteLock()
-
- /** Acquires a read lock on the cache for the duration of `f`. */
- private def readLock[A](f: => A): A = {
- val rl = lock.readLock()
- rl.lock()
- try f finally {
- rl.unlock()
- }
- }
- /** Acquires a write lock on the cache for the duration of `f`. */
- private def writeLock[A](f: => A): A = {
- val wl = lock.writeLock()
- wl.lock()
- try f finally {
- wl.unlock()
- }
- }
+/**
+ * :: Experimental ::
+ *
+ * Manager for [[QueryExecutionListener]]. See [[org.apache.spark.sql.SQLContext.listenerManager]].
+ */
+@Experimental
+class ExecutionListenerManager private[sql] () extends Logging {
/**
- * Registers the specified QueryExecutionListener.
+ * Registers the specified [[QueryExecutionListener]].
*/
@DeveloperApi
def register(listener: QueryExecutionListener): Unit = writeLock {
@@ -91,7 +79,7 @@ class ExecutionListenerManager extends Logging {
}
/**
- * Unregisters the specified QueryExecutionListener.
+ * Unregisters the specified [[QueryExecutionListener]].
*/
@DeveloperApi
def unregister(listener: QueryExecutionListener): Unit = writeLock {
@@ -99,38 +87,59 @@ class ExecutionListenerManager extends Logging {
}
/**
- * clears out all registered QueryExecutionListeners.
+ * Removes all the registered [[QueryExecutionListener]].
*/
@DeveloperApi
def clear(): Unit = writeLock {
listeners.clear()
}
- private[sql] def onSuccess(
- funcName: String,
- qe: QueryExecution,
- duration: Long): Unit = readLock {
- withErrorHandling { listener =>
- listener.onSuccess(funcName, qe, duration)
+ private[sql] def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
+ readLock {
+ withErrorHandling { listener =>
+ listener.onSuccess(funcName, qe, duration)
+ }
}
}
- private[sql] def onFailure(
- funcName: String,
- qe: QueryExecution,
- exception: Exception): Unit = readLock {
- withErrorHandling { listener =>
- listener.onFailure(funcName, qe, exception)
+ private[sql] def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
+ readLock {
+ withErrorHandling { listener =>
+ listener.onFailure(funcName, qe, exception)
+ }
}
}
+ private[this] val listeners = ListBuffer.empty[QueryExecutionListener]
+
+ /** A lock to prevent updating the list of listeners while we are traversing through them. */
+ private[this] val lock = new ReentrantReadWriteLock()
+
private def withErrorHandling(f: QueryExecutionListener => Unit): Unit = {
for (listener <- listeners) {
try {
f(listener)
} catch {
- case e: Exception => logWarning("error executing query execution listener", e)
+ case NonFatal(e) => logWarning("Error executing query execution listener", e)
}
}
}
+
+ /** Acquires a read lock on the cache for the duration of `f`. */
+ private def readLock[A](f: => A): A = {
+ val rl = lock.readLock()
+ rl.lock()
+ try f finally {
+ rl.unlock()
+ }
+ }
+
+ /** Acquires a write lock on the cache for the duration of `f`. */
+ private def writeLock[A](f: => A): A = {
+ val wl = lock.writeLock()
+ wl.lock()
+ try f finally {
+ wl.unlock()
+ }
+ }
}