aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2017-03-29 00:02:15 -0700
committerReynold Xin <rxin@databricks.com>2017-03-29 00:02:15 -0700
commit9712bd3954c029de5c828f27b57d46e4a6325a38 (patch)
treefbd8403b5c40ca234009f03913837121cd6be4cf /sql/core/src/main/scala/org/apache
parenta5c87707eaec5cacdfb703eb396dfc264bc54cda (diff)
downloadspark-9712bd3954c029de5c828f27b57d46e4a6325a38.tar.gz
spark-9712bd3954c029de5c828f27b57d46e4a6325a38.tar.bz2
spark-9712bd3954c029de5c828f27b57d46e4a6325a38.zip
[SPARK-20134][SQL] SQLMetrics.postDriverMetricUpdates to simplify driver side metric updates
## What changes were proposed in this pull request? It is not super intuitive how to update SQLMetric on the driver side. This patch introduces a new SQLMetrics.postDriverMetricUpdates function to do that, and adds documentation to make it more obvious. ## How was this patch tested? Updated a test case to use this method. Author: Reynold Xin <rxin@databricks.com> Closes #17464 from rxin/SPARK-20134.
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala7
4 files changed, 29 insertions, 14 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index d876688a8a..66a8e044ab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -628,13 +628,7 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
val dataSize = rows.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
longMetric("dataSize") += dataSize
- // There are some cases we don't care about the metrics and call `SparkPlan.doExecute`
- // directly without setting an execution id. We should be tolerant to it.
- if (executionId != null) {
- sparkContext.listenerBus.post(SparkListenerDriverAccumUpdates(
- executionId.toLong, metrics.values.map(m => m.id -> m.value).toSeq))
- }
-
+ SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq)
rows
}
}(SubqueryExec.executionContext)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index 7be5d31d4a..efcaca9338 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -97,13 +97,7 @@ case class BroadcastExchangeExec(
val broadcasted = sparkContext.broadcast(relation)
longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000
- // There are some cases we don't care about the metrics and call `SparkPlan.doExecute`
- // directly without setting an execution id. We should be tolerant to it.
- if (executionId != null) {
- sparkContext.listenerBus.post(SparkListenerDriverAccumUpdates(
- executionId.toLong, metrics.values.map(m => m.id -> m.value).toSeq))
- }
-
+ SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq)
broadcasted
} catch {
case oe: OutOfMemoryError =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index dbc27d8b23..ef982a4ebd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -22,9 +22,15 @@ import java.util.Locale
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.AccumulableInfo
+import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils}
+/**
+ * A metric used in a SQL query plan. This is implemented as an [[AccumulatorV2]]. Updates on
+ * the executor side are automatically propagated and shown in the SQL UI through metrics. Updates
+ * on the driver side must be explicitly posted using [[SQLMetrics.postDriverMetricUpdates()]].
+ */
class SQLMetric(val metricType: String, initValue: Long = 0L) extends AccumulatorV2[Long, Long] {
// This is a workaround for SPARK-11013.
// We may use -1 as initial value of the accumulator, if the accumulator is valid, we will
@@ -126,4 +132,18 @@ object SQLMetrics {
s"\n$sum ($min, $med, $max)"
}
}
+
+ /**
+ * Updates metrics based on the driver side value. This is useful for certain metrics that
+ * are only updated on the driver, e.g. subquery execution time, or number of files.
+ */
+ def postDriverMetricUpdates(
+ sc: SparkContext, executionId: String, metrics: Seq[SQLMetric]): Unit = {
+ // There are some cases we don't care about the metrics and call `SparkPlan.doExecute`
+ // directly without setting an execution id. We should be tolerant to it.
+ if (executionId != null) {
+ sc.listenerBus.post(
+ SparkListenerDriverAccumUpdates(executionId.toLong, metrics.map(m => m.id -> m.value)))
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 12d3bc9281..b4a91230a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -47,6 +47,13 @@ case class SparkListenerSQLExecutionStart(
case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
extends SparkListenerEvent
+/**
+ * A message used to update SQL metric value for driver-side updates (which doesn't get reflected
+ * automatically).
+ *
+ * @param executionId The execution id for a query, so we can find the query plan.
+ * @param accumUpdates Map from accumulator id to the metric value (metrics are always 64-bit ints).
+ */
@DeveloperApi
case class SparkListenerDriverAccumUpdates(
executionId: Long,