aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-04-21 21:48:09 -0700
committerDavies Liu <davies.liu@gmail.com>2016-04-21 21:48:09 -0700
commit145433f1aaf4a58f484f98c2f1d32abd8cc95b48 (patch)
treee910fe2d65ccc045c2f0b1d8c2e796ae1bd84975 /sql/hive
parentb29bc3f51518806ef7827b35df7c8aada329f961 (diff)
downloadspark-145433f1aaf4a58f484f98c2f1d32abd8cc95b48.tar.gz
spark-145433f1aaf4a58f484f98c2f1d32abd8cc95b48.tar.bz2
spark-145433f1aaf4a58f484f98c2f1d32abd8cc95b48.zip
[SPARK-14369] [SQL] Locality support for FileScanRDD
(This PR is a rebased version of PR #12153.) ## What changes were proposed in this pull request? This PR adds preliminary locality support for `FileFormat` data sources by overriding `FileScanRDD.preferredLocations()`. The strategy can be divided into two parts: 1. Block location lookup Unlike `HadoopRDD` or `NewHadoopRDD`, `FileScanRDD` doesn't have access to the underlying `InputFormat` or `InputSplit`, and thus can't rely on `InputSplit.getLocations()` to gather locality information. Instead, this PR queries block locations using `FileSystem.getBlockLocations()` after listing all `FileStatus`es in `HDFSFileCatalog` and convert all `FileStatus`es into `LocatedFileStatus`es. Note that although S3/S3A/S3N file systems don't provide valid locality information, their `getLocatedStatus()` implementations don't actually issue remote calls either. So there's no need to special case these file systems. 2. Selecting preferred locations For each `FilePartition`, we pick up top 3 locations that containing the most data to be retrieved. This isn't necessarily the best algorithm out there. Further improvements may be brought up in follow-up PRs. ## How was this patch tested? Tested by overriding default `FileSystem` implementation for `file:///` with a mocked one, which returns mocked block locations. Author: Cheng Lian <lian@databricks.com> Closes #12527 from liancheng/spark-14369-locality-rebased.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala40
1 files changed, 39 insertions, 1 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 368fe62ff2..089cef615f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -28,7 +28,8 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.execution.DataSourceScan
+import org.apache.spark.sql.execution.datasources.{FileScanRDD, HadoopFsRelation, LocalityTestFileSystem, LogicalRelation}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
@@ -668,6 +669,43 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
df.write.format(dataSourceName).partitionBy("c", "d", "e").saveAsTable("t")
}
}
+
+ test("Locality support for FileScanRDD") {
+ withHadoopConf(
+ "fs.file.impl" -> classOf[LocalityTestFileSystem].getName,
+ "fs.file.impl.disable.cache" -> "true"
+ ) {
+ withTempPath { dir =>
+ val path = "file://" + dir.getCanonicalPath
+ val df1 = sqlContext.range(4)
+ df1.coalesce(1).write.mode("overwrite").format(dataSourceName).save(path)
+ df1.coalesce(1).write.mode("append").format(dataSourceName).save(path)
+
+ def checkLocality(): Unit = {
+ val df2 = sqlContext.read
+ .format(dataSourceName)
+ .option("dataSchema", df1.schema.json)
+ .load(path)
+
+ val Some(fileScanRDD) = df2.queryExecution.executedPlan.collectFirst {
+ case scan: DataSourceScan if scan.rdd.isInstanceOf[FileScanRDD] =>
+ scan.rdd.asInstanceOf[FileScanRDD]
+ }
+
+ val partitions = fileScanRDD.partitions
+ val preferredLocations = partitions.flatMap(fileScanRDD.preferredLocations)
+
+ assert(preferredLocations.distinct.length == 2)
+ }
+
+ checkLocality()
+
+ withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "0") {
+ checkLocality()
+ }
+ }
+ }
+ }
}
// This class is used to test SPARK-8578. We should not use any custom output committer when