aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-04-18 23:48:22 -0700
committerReynold Xin <rxin@databricks.com>2016-04-18 23:48:22 -0700
commitd4b94ead92177a18d78a9701cfde9979641d2a18 (patch)
tree225aedc668757619a4e90e308e4f7a9c9bc5f6c0 /sql/core/src
parent6f8800689530591c8db856ade1f771fb30a12219 (diff)
downloadspark-d4b94ead92177a18d78a9701cfde9979641d2a18.tar.gz
spark-d4b94ead92177a18d78a9701cfde9979641d2a18.tar.bz2
spark-d4b94ead92177a18d78a9701cfde9979641d2a18.zip
[SPARK-14595][SQL] add input metrics for FileScanRDD
## What changes were proposed in this pull request? This is roughly based on the input metrics logic in `SqlNewHadoopRDD` ## How was this patch tested? Not sure how to write a test, I manually verified it in Spark UI. Author: Wenchen Fan <wenchen@databricks.com> Closes #12352 from cloud-fan/metrics.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala60
1 files changed, 53 insertions, 7 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 468e101fed..f86911e002 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -18,14 +18,16 @@
package org.apache.spark.sql.execution.datasources
import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{InputFileNameHolder, RDD}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch
/**
* A single file that should be read, along with partition column values that
* need to be prepended to each row. The reading should start at the first
- * valid record found after `offset`.
+ * valid record found after `start`.
*/
case class PartitionedFile(
partitionValues: InternalRow,
@@ -53,33 +55,77 @@ class FileScanRDD(
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val iterator = new Iterator[Object] with AutoCloseable {
+ private val inputMetrics = context.taskMetrics().inputMetrics
+ private val existingBytesRead = inputMetrics.bytesRead
+
+ // Find a function that will return the FileSystem bytes read by this thread. Do this before
+ // apply readFunction, because it might read some bytes.
+ private val getBytesReadCallback: Option[() => Long] =
+ SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
+
+ // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
+ // If we do a coalesce, however, we are likely to compute multiple partitions in the same
+ // task and in the same thread, in which case we need to avoid override values written by
+ // previous partitions (SPARK-13071).
+ private def updateBytesRead(): Unit = {
+ getBytesReadCallback.foreach { getBytesRead =>
+ inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
+ }
+ }
+
+ // If we can't get the bytes read from the FS stats, fall back to the file size,
+ // which may be inaccurate.
+ private def updateBytesReadWithFileSize(): Unit = {
+ if (getBytesReadCallback.isEmpty && currentFile != null) {
+ inputMetrics.incBytesRead(currentFile.length)
+ }
+ }
+
private[this] val files = split.asInstanceOf[FilePartition].files.toIterator
+ private[this] var currentFile: PartitionedFile = null
private[this] var currentIterator: Iterator[Object] = null
def hasNext = (currentIterator != null && currentIterator.hasNext) || nextIterator()
- def next() = currentIterator.next()
+ def next() = {
+ val nextElement = currentIterator.next()
+ // TODO: we should have a better separation of row based and batch based scan, so that we
+ // don't need to run this `if` for every record.
+ if (nextElement.isInstanceOf[ColumnarBatch]) {
+ inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows())
+ } else {
+ inputMetrics.incRecordsRead(1)
+ }
+ if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
+ updateBytesRead()
+ }
+ nextElement
+ }
/** Advances to the next file. Returns true if a new non-empty iterator is available. */
private def nextIterator(): Boolean = {
+ updateBytesReadWithFileSize()
if (files.hasNext) {
- val nextFile = files.next()
- logInfo(s"Reading File $nextFile")
- InputFileNameHolder.setInputFileName(nextFile.filePath)
- currentIterator = readFunction(nextFile)
+ currentFile = files.next()
+ logInfo(s"Reading File $currentFile")
+ InputFileNameHolder.setInputFileName(currentFile.filePath)
+ currentIterator = readFunction(currentFile)
hasNext
} else {
+ currentFile = null
InputFileNameHolder.unsetInputFileName()
false
}
}
override def close() = {
+ updateBytesRead()
+ updateBytesReadWithFileSize()
InputFileNameHolder.unsetInputFileName()
}
}
// Register an on-task-completion callback to close the input stream.
- context.addTaskCompletionListener(context => iterator.close())
+ context.addTaskCompletionListener(_ => iterator.close())
iterator.asInstanceOf[Iterator[InternalRow]] // This is an erasure hack.
}