aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala18
1 files changed, 16 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 28156b277f..239151495f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -171,8 +171,20 @@ case class FileSourceScanExec(
false
}
- @transient private lazy val selectedPartitions =
- relation.location.listFiles(partitionFilters, dataFilters)
+ @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
+ val startTime = System.nanoTime()
+ val ret = relation.location.listFiles(partitionFilters, dataFilters)
+ val timeTaken = (System.nanoTime() - startTime) / 1000 / 1000
+
+ metrics("numFiles").add(ret.map(_.files.size.toLong).sum)
+ metrics("metadataTime").add(timeTaken)
+
+ val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+ SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
+ metrics("numFiles") :: metrics("metadataTime") :: Nil)
+
+ ret
+ }
override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
@@ -293,6 +305,8 @@ case class FileSourceScanExec(
override lazy val metrics =
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
+ "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files"),
+ "metadataTime" -> SQLMetrics.createMetric(sparkContext, "metadata time (ms)"),
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
protected override def doExecute(): RDD[InternalRow] = {