diff options
author | Tejas Patil <tejasp@fb.com> | 2016-09-10 09:27:22 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-09-10 09:27:22 +0800 |
commit | 335491704c526921da3b3c5035175677ba5b92de (patch) | |
tree | 43194c82c849831ea54c96c012aef4f6f04d49ca /sql/core/src | |
parent | f7d2143705c8c1baeed0bc62940f9dba636e705b (diff) | |
download | spark-335491704c526921da3b3c5035175677ba5b92de.tar.gz spark-335491704c526921da3b3c5035175677ba5b92de.tar.bz2 spark-335491704c526921da3b3c5035175677ba5b92de.zip |
[SPARK-15453][SQL] FileSourceScanExec to extract `outputOrdering` information
## What changes were proposed in this pull request?
Jira : https://issues.apache.org/jira/browse/SPARK-15453
Extracting sort ordering information in `FileSourceScanExec` so that planner can make use of it. My motivation to make this change was to get Sort Merge join in par with Hive's Sort-Merge-Bucket join when the source tables are bucketed + sorted.
Query:
```
val df = (0 until 16).map(i => (i % 8, i * 2, i.toString)).toDF("i", "j", "k").coalesce(1)
df.write.bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table8")
df.write.bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table9")
context.sql("SELECT * FROM table8 a JOIN table9 b ON a.j=b.j AND a.k=b.k").explain(true)
```
Before:
```
== Physical Plan ==
*SortMergeJoin [j#120, k#121], [j#123, k#124], Inner
:- *Sort [j#120 ASC, k#121 ASC], false, 0
: +- *Project [i#119, j#120, k#121]
: +- *Filter (isnotnull(k#121) && isnotnull(j#120))
: +- *FileScan orc default.table8[i#119,j#120,k#121] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
+- *Sort [j#123 ASC, k#124 ASC], false, 0
+- *Project [i#122, j#123, k#124]
+- *Filter (isnotnull(k#124) && isnotnull(j#123))
+- *FileScan orc default.table9[i#122,j#123,k#124] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
```
After: (note that the `Sort` step is no longer there)
```
== Physical Plan ==
*SortMergeJoin [j#49, k#50], [j#52, k#53], Inner
:- *Project [i#48, j#49, k#50]
: +- *Filter (isnotnull(k#50) && isnotnull(j#49))
: +- *FileScan orc default.table8[i#48,j#49,k#50] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
+- *Project [i#51, j#52, k#53]
+- *Filter (isnotnull(j#52) && isnotnull(k#53))
+- *FileScan orc default.table9[i#51,j#52,k#53] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(k)], ReadSchema: struct<i:int,j:int,k:string>
```
## How was this patch tested?
Added a test case in `JoinSuite`. Ran all other tests in `JoinSuite`
Author: Tejas Patil <tejasp@fb.com>
Closes #14864 from tejasapatil/SPARK-15453_smb_optimization.
Diffstat (limited to 'sql/core/src')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala | 79 |
1 files changed, 62 insertions, 17 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 9597bdf34b..6cdba40693 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -23,12 +23,11 @@ import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Row, SparkSession, SQLContext} +import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} -import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} @@ -156,24 +155,72 @@ case class FileSourceScanExec( false } - override val outputPartitioning: Partitioning = { + @transient private lazy val selectedPartitions = relation.location.listFiles(partitionFilters) + + override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = { val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) { relation.bucketSpec } else { None } - bucketSpec.map { spec => - val numBuckets = spec.numBuckets - val bucketColumns = spec.bucketColumnNames.flatMap { n => - output.find(_.name == n) - } - if (bucketColumns.size == spec.bucketColumnNames.size) { - HashPartitioning(bucketColumns, numBuckets) - } else { - UnknownPartitioning(0) - } - }.getOrElse { - UnknownPartitioning(0) + bucketSpec match { + case Some(spec) => + // For bucketed columns: + // ----------------------- + // `HashPartitioning` would be used only when: + // 1. ALL the bucketing columns are being read from the table + // + // For sorted columns: + // --------------------- + // Sort ordering should be used when ALL these criteria's match: + // 1. `HashPartitioning` is being used + // 2. A prefix (or all) of the sort columns are being read from the table. + // + // Sort ordering would be over the prefix subset of `sort columns` being read + // from the table. + // eg. + // Assume (col0, col2, col3) are the columns read from the table + // If sort columns are (col0, col1), then sort ordering would be considered as (col0) + // If sort columns are (col1, col0), then sort ordering would be empty as per rule #2 + // above + + def toAttribute(colName: String): Option[Attribute] = + output.find(_.name == colName) + + val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n)) + if (bucketColumns.size == spec.bucketColumnNames.size) { + val partitioning = HashPartitioning(bucketColumns, spec.numBuckets) + val sortColumns = + spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => x.isDefined).map(_.get) + + val sortOrder = if (sortColumns.nonEmpty) { + // In case of bucketing, its possible to have multiple files belonging to the + // same bucket in a given relation. Each of these files are locally sorted + // but those files combined together are not globally sorted. Given that, + // the RDD partition will not be sorted even if the relation has sort columns set + // Current solution is to check if all the buckets have a single file in it + + val files = selectedPartitions.flatMap(partition => partition.files) + val bucketToFilesGrouping = + files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file)) + val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1) + + if (singleFilePartitions) { + // TODO Currently Spark does not support writing columns sorting in descending order + // so using Ascending order. This can be fixed in future + sortColumns.map(attribute => SortOrder(attribute, Ascending)) + } else { + Nil + } + } else { + Nil + } + (partitioning, sortOrder) + } else { + (UnknownPartitioning(0), Nil) + } + case _ => + (UnknownPartitioning(0), Nil) } } @@ -187,8 +234,6 @@ case class FileSourceScanExec( "InputPaths" -> relation.location.paths.mkString(", ")) private lazy val inputRDD: RDD[InternalRow] = { - val selectedPartitions = relation.location.listFiles(partitionFilters) - val readFile: (PartitionedFile) => Iterator[InternalRow] = relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, |