aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2017-03-30 23:09:33 -0700
committerReynold Xin <rxin@databricks.com>2017-03-30 23:09:33 -0700
commita8a765b3f302c078cb9519c4a17912cd38b9680c (patch)
treec3aee9f1bd5f3d09132eeea3ef531bb92ef36d00 /sql/core
parentc734fc504a3f6a3d3b0bd90ff54604b17df2b413 (diff)
downloadspark-a8a765b3f302c078cb9519c4a17912cd38b9680c.tar.gz
spark-a8a765b3f302c078cb9519c4a17912cd38b9680c.tar.bz2
spark-a8a765b3f302c078cb9519c4a17912cd38b9680c.zip
[SPARK-20151][SQL] Account for partition pruning in scan metadataTime metrics
## What changes were proposed in this pull request? After SPARK-20136, we report metadata timing metrics in scan operator. However, that timing metric doesn't include one of the most important part of metadata, which is partition pruning. This patch adds that time measurement to the scan metrics. ## How was this patch tested? N/A - I tried adding a test in SQLMetricsSuite but it was extremely convoluted to the point that I'm not sure if this is worth it. Author: Reynold Xin <rxin@databricks.com> Closes #17476 from rxin/SPARK-20151.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala10
3 files changed, 18 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 239151495f..2fa660c4d5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -172,12 +172,13 @@ case class FileSourceScanExec(
}
@transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
+ val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
val startTime = System.nanoTime()
val ret = relation.location.listFiles(partitionFilters, dataFilters)
- val timeTaken = (System.nanoTime() - startTime) / 1000 / 1000
+ val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000
metrics("numFiles").add(ret.map(_.files.size.toLong).sum)
- metrics("metadataTime").add(timeTaken)
+ metrics("metadataTime").add(timeTakenMs)
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index db0254f8d5..4046396d0e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -69,6 +69,7 @@ class CatalogFileIndex(
*/
def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = {
if (table.partitionColumnNames.nonEmpty) {
+ val startTime = System.nanoTime()
val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
table.identifier, filters)
val partitions = selectedPartitions.map { p =>
@@ -79,8 +80,9 @@ class CatalogFileIndex(
path.makeQualified(fs.getUri, fs.getWorkingDirectory))
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
+ val timeNs = System.nanoTime() - startTime
new PrunedInMemoryFileIndex(
- sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec)
+ sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec, Option(timeNs))
} else {
new InMemoryFileIndex(
sparkSession, rootPaths, table.storage.properties, partitionSchema = None)
@@ -111,7 +113,8 @@ private class PrunedInMemoryFileIndex(
sparkSession: SparkSession,
tableBasePath: Path,
fileStatusCache: FileStatusCache,
- override val partitionSpec: PartitionSpec)
+ override val partitionSpec: PartitionSpec,
+ override val metadataOpsTimeNs: Option[Long])
extends InMemoryFileIndex(
sparkSession,
partitionSpec.partitions.map(_.path),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
index 6b99d38fe5..094a66a282 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
@@ -72,4 +72,14 @@ trait FileIndex {
/** Schema of the partitioning columns, or the empty schema if the table is not partitioned. */
def partitionSchema: StructType
+
+ /**
+ * Returns an optional metadata operation time, in nanoseconds, for listing files.
+ *
+ * We do file listing in query optimization (in order to get the proper statistics) and we want
+ * to account for file listing time in physical execution (as metrics). To do that, we save the
+ * file listing time in some implementations and physical execution calls it in this method
+ * to update the metrics.
+ */
+ def metadataOpsTimeNs: Option[Long] = None
}