aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2017-03-29 19:06:51 -0700
committerReynold Xin <rxin@databricks.com>2017-03-29 19:06:51 -0700
commit60977889eaecdf28adc6164310eaa5afed488fa1 (patch)
tree8da9fd91a7b4936f09df5dda098f535125703b64 /sql/core
parent22f07fefe11f0147f1e8d83d9b77707640d5dc97 (diff)
downloadspark-60977889eaecdf28adc6164310eaa5afed488fa1.tar.gz
spark-60977889eaecdf28adc6164310eaa5afed488fa1.tar.bz2
spark-60977889eaecdf28adc6164310eaa5afed488fa1.zip
[SPARK-20136][SQL] Add num files and metadata operation timing to scan operator metrics
## What changes were proposed in this pull request? This patch adds explicit metadata operation timing and number of files in data source metrics. Those would be useful to include for performance profiling. Screenshot of a UI with this change (num files and metadata time are new metrics): <img width="321" alt="screen shot 2017-03-29 at 12 29 28 am" src="https://cloud.githubusercontent.com/assets/323388/24443272/d4ea58c0-1416-11e7-8940-ecb69375554a.png"> ## How was this patch tested? N/A Author: Reynold Xin <rxin@databricks.com> Closes #17465 from rxin/SPARK-20136.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala18
1 files changed, 16 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 28156b277f..239151495f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -171,8 +171,20 @@ case class FileSourceScanExec(
false
}
- @transient private lazy val selectedPartitions =
- relation.location.listFiles(partitionFilters, dataFilters)
+ @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
+ val startTime = System.nanoTime()
+ val ret = relation.location.listFiles(partitionFilters, dataFilters)
+ val timeTaken = (System.nanoTime() - startTime) / 1000 / 1000
+
+ metrics("numFiles").add(ret.map(_.files.size.toLong).sum)
+ metrics("metadataTime").add(timeTaken)
+
+ val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+ SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
+ metrics("numFiles") :: metrics("metadataTime") :: Nil)
+
+ ret
+ }
override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
@@ -293,6 +305,8 @@ case class FileSourceScanExec(
override lazy val metrics =
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
+ "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files"),
+ "metadataTime" -> SQLMetrics.createMetric(sparkContext, "metadata time (ms)"),
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
protected override def doExecute(): RDD[InternalRow] = {