diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala | 60 |
1 files changed, 53 insertions, 7 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 468e101fed..f86911e002 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -18,14 +18,16 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.{InputFileNameHolder, RDD} import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized.ColumnarBatch /** * A single file that should be read, along with partition column values that * need to be prepended to each row. The reading should start at the first - * valid record found after `offset`. + * valid record found after `start`. */ case class PartitionedFile( partitionValues: InternalRow, @@ -53,33 +55,77 @@ class FileScanRDD( override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val iterator = new Iterator[Object] with AutoCloseable { + private val inputMetrics = context.taskMetrics().inputMetrics + private val existingBytesRead = inputMetrics.bytesRead + + // Find a function that will return the FileSystem bytes read by this thread. Do this before + // apply readFunction, because it might read some bytes. + private val getBytesReadCallback: Option[() => Long] = + SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() + + // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics. + // If we do a coalesce, however, we are likely to compute multiple partitions in the same + // task and in the same thread, in which case we need to avoid override values written by + // previous partitions (SPARK-13071). + private def updateBytesRead(): Unit = { + getBytesReadCallback.foreach { getBytesRead => + inputMetrics.setBytesRead(existingBytesRead + getBytesRead()) + } + } + + // If we can't get the bytes read from the FS stats, fall back to the file size, + // which may be inaccurate. + private def updateBytesReadWithFileSize(): Unit = { + if (getBytesReadCallback.isEmpty && currentFile != null) { + inputMetrics.incBytesRead(currentFile.length) + } + } + private[this] val files = split.asInstanceOf[FilePartition].files.toIterator + private[this] var currentFile: PartitionedFile = null private[this] var currentIterator: Iterator[Object] = null def hasNext = (currentIterator != null && currentIterator.hasNext) || nextIterator() - def next() = currentIterator.next() + def next() = { + val nextElement = currentIterator.next() + // TODO: we should have a better separation of row based and batch based scan, so that we + // don't need to run this `if` for every record. + if (nextElement.isInstanceOf[ColumnarBatch]) { + inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows()) + } else { + inputMetrics.incRecordsRead(1) + } + if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) { + updateBytesRead() + } + nextElement + } /** Advances to the next file. Returns true if a new non-empty iterator is available. */ private def nextIterator(): Boolean = { + updateBytesReadWithFileSize() if (files.hasNext) { - val nextFile = files.next() - logInfo(s"Reading File $nextFile") - InputFileNameHolder.setInputFileName(nextFile.filePath) - currentIterator = readFunction(nextFile) + currentFile = files.next() + logInfo(s"Reading File $currentFile") + InputFileNameHolder.setInputFileName(currentFile.filePath) + currentIterator = readFunction(currentFile) hasNext } else { + currentFile = null InputFileNameHolder.unsetInputFileName() false } } override def close() = { + updateBytesRead() + updateBytesReadWithFileSize() InputFileNameHolder.unsetInputFileName() } } // Register an on-task-completion callback to close the input stream. - context.addTaskCompletionListener(context => iterator.close()) + context.addTaskCompletionListener(_ => iterator.close()) iterator.asInstanceOf[Iterator[InternalRow]] // This is an erasure hack. } |