aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-04-21 21:48:09 -0700
committerDavies Liu <davies.liu@gmail.com>2016-04-21 21:48:09 -0700
commit145433f1aaf4a58f484f98c2f1d32abd8cc95b48 (patch)
treee910fe2d65ccc045c2f0b1d8c2e796ae1bd84975 /sql/core/src/main/scala/org/apache
parentb29bc3f51518806ef7827b35df7c8aada329f961 (diff)
downloadspark-145433f1aaf4a58f484f98c2f1d32abd8cc95b48.tar.gz
spark-145433f1aaf4a58f484f98c2f1d32abd8cc95b48.tar.bz2
spark-145433f1aaf4a58f484f98c2f1d32abd8cc95b48.zip
[SPARK-14369] [SQL] Locality support for FileScanRDD
(This PR is a rebased version of PR #12153.) ## What changes were proposed in this pull request? This PR adds preliminary locality support for `FileFormat` data sources by overriding `FileScanRDD.preferredLocations()`. The strategy can be divided into two parts: 1. Block location lookup Unlike `HadoopRDD` or `NewHadoopRDD`, `FileScanRDD` doesn't have access to the underlying `InputFormat` or `InputSplit`, and thus can't rely on `InputSplit.getLocations()` to gather locality information. Instead, this PR queries block locations using `FileSystem.getBlockLocations()` after listing all `FileStatus`es in `HDFSFileCatalog` and convert all `FileStatus`es into `LocatedFileStatus`es. Note that although S3/S3A/S3N file systems don't provide valid locality information, their `getLocatedStatus()` implementations don't actually issue remote calls either. So there's no need to special case these file systems. 2. Selecting preferred locations For each `FilePartition`, we pick up top 3 locations that containing the most data to be retrieved. This isn't necessarily the best algorithm out there. Further improvements may be brought up in follow-up PRs. ## How was this patch tested? Tested by overriding default `FileSystem` implementation for `file:///` with a mocked one, which returns mocked block locations. Author: Cheng Lian <lian@databricks.com> Closes #12527 from liancheng/spark-14369-locality-rebased.
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala53
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala84
3 files changed, 134 insertions, 27 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 90694d9af4..60238bd515 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.datasources
+import scala.collection.mutable
+
import org.apache.spark.{Partition => RDDPartition, TaskContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{InputFileNameHolder, RDD}
@@ -33,7 +35,8 @@ case class PartitionedFile(
partitionValues: InternalRow,
filePath: String,
start: Long,
- length: Long) {
+ length: Long,
+ locations: Array[String] = Array.empty) {
override def toString: String = {
s"path: $filePath, range: $start-${start + length}, partition values: $partitionValues"
}
@@ -131,4 +134,23 @@ class FileScanRDD(
}
override protected def getPartitions: Array[RDDPartition] = filePartitions.toArray
+
+ override protected def getPreferredLocations(split: RDDPartition): Seq[String] = {
+ val files = split.asInstanceOf[FilePartition].files
+
+ // Computes total number of bytes can be retrieved from each host.
+ val hostToNumBytes = mutable.HashMap.empty[String, Long]
+ files.foreach { file =>
+ file.locations.filter(_ != "localhost").foreach { host =>
+ hostToNumBytes(host) = hostToNumBytes.getOrElse(host, 0L) + file.length
+ }
+ }
+
+ // Takes the first 3 hosts with the most data to be retrieved
+ hostToNumBytes.toSeq.sortBy {
+ case (host, numBytes) => numBytes
+ }.reverse.take(3).map {
+ case (host, numBytes) => host
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 80a9156ddc..ee48a7b81d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources
import scala.collection.mutable.ArrayBuffer
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
@@ -28,7 +28,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{DataSourceScan, SparkPlan}
-import org.apache.spark.sql.sources._
/**
* A strategy for planning scans over collections of files that might be partitioned or bucketed
@@ -120,7 +119,10 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
logInfo(s"Planning with ${bucketing.numBuckets} buckets")
val bucketed =
selectedPartitions.flatMap { p =>
- p.files.map(f => PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen))
+ p.files.map { f =>
+ val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen)
+ PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen, hosts)
+ }
}.groupBy { f =>
BucketingUtils
.getBucketId(new Path(f.filePath).getName)
@@ -139,10 +141,12 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
- (0L to file.getLen by maxSplitBytes).map { offset =>
+ val blockLocations = getBlockLocations(file)
+ (0L until file.getLen by maxSplitBytes).map { offset =>
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
- PartitionedFile(partition.values, file.getPath.toUri.toString, offset, size)
+ val hosts = getBlockHosts(blockLocations, offset, size)
+ PartitionedFile(partition.values, file.getPath.toUri.toString, offset, size, hosts)
}
}
}.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
@@ -207,4 +211,43 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
case _ => Nil
}
+
+ private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match {
+ case f: LocatedFileStatus => f.getBlockLocations
+ case f => Array.empty[BlockLocation]
+ }
+
+ // Given locations of all blocks of a single file, `blockLocations`, and an `(offset, length)`
+ // pair that represents a segment of the same file, find out the block that contains the largest
+ // fraction the segment, and returns location hosts of that block. If no such block can be found,
+ // returns an empty array.
+ private def getBlockHosts(
+ blockLocations: Array[BlockLocation], offset: Long, length: Long): Array[String] = {
+ val candidates = blockLocations.map {
+ // The fragment starts from a position within this block
+ case b if b.getOffset <= offset && offset < b.getOffset + b.getLength =>
+ b.getHosts -> (b.getOffset + b.getLength - offset).min(length)
+
+ // The fragment ends at a position within this block
+ case b if offset <= b.getOffset && offset + length < b.getLength =>
+ b.getHosts -> (offset + length - b.getOffset).min(length)
+
+ // The fragment fully contains this block
+ case b if offset <= b.getOffset && b.getOffset + b.getLength <= offset + length =>
+ b.getHosts -> b.getLength
+
+ // The fragment doesn't intersect with this block
+ case b =>
+ b.getHosts -> 0L
+ }.filter { case (hosts, size) =>
+ size > 0L
+ }
+
+ if (candidates.isEmpty) {
+ Array.empty[String]
+ } else {
+ val (hosts, _) = candidates.maxBy { case (_, size) => size }
+ hosts
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index d37a939b54..ed24bdd77f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable
import scala.util.Try
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.fs._
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
@@ -342,16 +342,31 @@ class HDFSFileCatalog(
if (paths.length >= sqlContext.conf.parallelPartitionDiscoveryThreshold) {
HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sqlContext.sparkContext)
} else {
- val statuses = paths.flatMap { path =>
+ val statuses: Seq[FileStatus] = paths.flatMap { path =>
val fs = path.getFileSystem(hadoopConf)
logInfo(s"Listing $path on driver")
// Dummy jobconf to get to the pathFilter defined in configuration
- val jobConf = new JobConf(hadoopConf, this.getClass())
+ val jobConf = new JobConf(hadoopConf, this.getClass)
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
- if (pathFilter != null) {
- Try(fs.listStatus(path, pathFilter)).getOrElse(Array.empty)
- } else {
- Try(fs.listStatus(path)).getOrElse(Array.empty)
+
+ val statuses = {
+ val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
+ if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats
+ }
+
+ statuses.map {
+ case f: LocatedFileStatus => f
+
+ // NOTE:
+ //
+ // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
+ // operations, calling `getFileBlockLocations` does no harm here since these file system
+ // implementations don't actually issue RPC for this method.
+ //
+ // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should a
+ // a big deal since we always use to `listLeafFilesInParallel` when the number of paths
+ // exceeds threshold.
+ case f => new LocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
}
}.filterNot { status =>
val name = status.getPath.getName
@@ -369,7 +384,7 @@ class HDFSFileCatalog(
}
}
- def inferPartitioning(schema: Option[StructType]): PartitionSpec = {
+ def inferPartitioning(schema: Option[StructType]): PartitionSpec = {
// We use leaf dirs containing data files to discover the schema.
val leafDirs = leafDirToChildrenFiles.keys.toSeq
schema match {
@@ -473,15 +488,15 @@ private[sql] object HadoopFsRelation extends Logging {
// Dummy jobconf to get to the pathFilter defined in configuration
val jobConf = new JobConf(fs.getConf, this.getClass())
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
- val statuses =
- if (pathFilter != null) {
- val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDirectory)
- files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
- } else {
- val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
- files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
- }
- statuses.filterNot(status => shouldFilterOut(status.getPath.getName))
+ val statuses = {
+ val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
+ val stats = files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
+ if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats
+ }
+ statuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
+ case f: LocatedFileStatus => f
+ case f => new LocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
+ }
}
}
@@ -489,6 +504,12 @@ private[sql] object HadoopFsRelation extends Logging {
// well with `SerializableWritable`. So there seems to be no way to serialize a `FileStatus`.
// Here we use `FakeFileStatus` to extract key components of a `FileStatus` to serialize it from
// executor side and reconstruct it on driver side.
+ case class FakeBlockLocation(
+ names: Array[String],
+ hosts: Array[String],
+ offset: Long,
+ length: Long)
+
case class FakeFileStatus(
path: String,
length: Long,
@@ -496,7 +517,8 @@ private[sql] object HadoopFsRelation extends Logging {
blockReplication: Short,
blockSize: Long,
modificationTime: Long,
- accessTime: Long)
+ accessTime: Long,
+ blockLocations: Array[FakeBlockLocation])
def listLeafFilesInParallel(
paths: Seq[Path],
@@ -511,6 +533,20 @@ private[sql] object HadoopFsRelation extends Logging {
val fs = path.getFileSystem(serializableConfiguration.value)
Try(listLeafFiles(fs, fs.getFileStatus(path))).getOrElse(Array.empty)
}.map { status =>
+ val blockLocations = status match {
+ case f: LocatedFileStatus =>
+ f.getBlockLocations.map { loc =>
+ FakeBlockLocation(
+ loc.getNames,
+ loc.getHosts,
+ loc.getOffset,
+ loc.getLength)
+ }
+
+ case _ =>
+ Array.empty[FakeBlockLocation]
+ }
+
FakeFileStatus(
status.getPath.toString,
status.getLen,
@@ -518,12 +554,18 @@ private[sql] object HadoopFsRelation extends Logging {
status.getReplication,
status.getBlockSize,
status.getModificationTime,
- status.getAccessTime)
+ status.getAccessTime,
+ blockLocations)
}.collect()
val hadoopFakeStatuses = fakeStatuses.map { f =>
- new FileStatus(
- f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path))
+ val blockLocations = f.blockLocations.map { loc =>
+ new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
+ }
+ new LocatedFileStatus(
+ new FileStatus(
+ f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path)),
+ blockLocations)
}
mutable.LinkedHashSet(hadoopFakeStatuses: _*)
}