diff options
author | Cheng Lian <lian@databricks.com> | 2016-04-21 21:48:09 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-04-21 21:48:09 -0700 |
commit | 145433f1aaf4a58f484f98c2f1d32abd8cc95b48 (patch) | |
tree | e910fe2d65ccc045c2f0b1d8c2e796ae1bd84975 /sql/hive | |
parent | b29bc3f51518806ef7827b35df7c8aada329f961 (diff) | |
download | spark-145433f1aaf4a58f484f98c2f1d32abd8cc95b48.tar.gz spark-145433f1aaf4a58f484f98c2f1d32abd8cc95b48.tar.bz2 spark-145433f1aaf4a58f484f98c2f1d32abd8cc95b48.zip |
[SPARK-14369] [SQL] Locality support for FileScanRDD
(This PR is a rebased version of PR #12153.)
## What changes were proposed in this pull request?
This PR adds preliminary locality support for `FileFormat` data sources by overriding `FileScanRDD.preferredLocations()`. The strategy can be divided into two parts:
1. Block location lookup
Unlike `HadoopRDD` or `NewHadoopRDD`, `FileScanRDD` doesn't have access to the underlying `InputFormat` or `InputSplit`, and thus can't rely on `InputSplit.getLocations()` to gather locality information. Instead, this PR queries block locations using `FileSystem.getBlockLocations()` after listing all `FileStatus`es in `HDFSFileCatalog` and convert all `FileStatus`es into `LocatedFileStatus`es.
Note that although S3/S3A/S3N file systems don't provide valid locality information, their `getLocatedStatus()` implementations don't actually issue remote calls either. So there's no need to special case these file systems.
2. Selecting preferred locations
For each `FilePartition`, we pick up top 3 locations that containing the most data to be retrieved. This isn't necessarily the best algorithm out there. Further improvements may be brought up in follow-up PRs.
## How was this patch tested?
Tested by overriding default `FileSystem` implementation for `file:///` with a mocked one, which returns mocked block locations.
Author: Cheng Lian <lian@databricks.com>
Closes #12527 from liancheng/spark-14369-locality-rebased.
Diffstat (limited to 'sql/hive')
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala | 40 |
1 files changed, 39 insertions, 1 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index 368fe62ff2..089cef615f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -28,7 +28,8 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql._ -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.DataSourceScan +import org.apache.spark.sql.execution.datasources.{FileScanRDD, HadoopFsRelation, LocalityTestFileSystem, LogicalRelation} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils @@ -668,6 +669,43 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes df.write.format(dataSourceName).partitionBy("c", "d", "e").saveAsTable("t") } } + + test("Locality support for FileScanRDD") { + withHadoopConf( + "fs.file.impl" -> classOf[LocalityTestFileSystem].getName, + "fs.file.impl.disable.cache" -> "true" + ) { + withTempPath { dir => + val path = "file://" + dir.getCanonicalPath + val df1 = sqlContext.range(4) + df1.coalesce(1).write.mode("overwrite").format(dataSourceName).save(path) + df1.coalesce(1).write.mode("append").format(dataSourceName).save(path) + + def checkLocality(): Unit = { + val df2 = sqlContext.read + .format(dataSourceName) + .option("dataSchema", df1.schema.json) + .load(path) + + val Some(fileScanRDD) = df2.queryExecution.executedPlan.collectFirst { + case scan: DataSourceScan if scan.rdd.isInstanceOf[FileScanRDD] => + scan.rdd.asInstanceOf[FileScanRDD] + } + + val partitions = fileScanRDD.partitions + val preferredLocations = partitions.flatMap(fileScanRDD.preferredLocations) + + assert(preferredLocations.distinct.length == 2) + } + + checkLocality() + + withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "0") { + checkLocality() + } + } + } + } } // This class is used to test SPARK-8578. We should not use any custom output committer when |