aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorTejas Patil <tejasp@fb.com>2016-09-10 09:27:22 +0800
committerWenchen Fan <wenchen@databricks.com>2016-09-10 09:27:22 +0800
commit335491704c526921da3b3c5035175677ba5b92de (patch)
tree43194c82c849831ea54c96c012aef4f6f04d49ca /sql/core/src
parentf7d2143705c8c1baeed0bc62940f9dba636e705b (diff)
downloadspark-335491704c526921da3b3c5035175677ba5b92de.tar.gz
spark-335491704c526921da3b3c5035175677ba5b92de.tar.bz2
spark-335491704c526921da3b3c5035175677ba5b92de.zip
[SPARK-15453][SQL] FileSourceScanExec to extract `outputOrdering` information
## What changes were proposed in this pull request? Jira : https://issues.apache.org/jira/browse/SPARK-15453 Extracting sort ordering information in `FileSourceScanExec` so that planner can make use of it. My motivation to make this change was to get Sort Merge join in par with Hive's Sort-Merge-Bucket join when the source tables are bucketed + sorted. Query: ``` val df = (0 until 16).map(i => (i % 8, i * 2, i.toString)).toDF("i", "j", "k").coalesce(1) df.write.bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table8") df.write.bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table9") context.sql("SELECT * FROM table8 a JOIN table9 b ON a.j=b.j AND a.k=b.k").explain(true) ``` Before: ``` == Physical Plan == *SortMergeJoin [j#120, k#121], [j#123, k#124], Inner :- *Sort [j#120 ASC, k#121 ASC], false, 0 : +- *Project [i#119, j#120, k#121] : +- *Filter (isnotnull(k#121) && isnotnull(j#120)) : +- *FileScan orc default.table8[i#119,j#120,k#121] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string> +- *Sort [j#123 ASC, k#124 ASC], false, 0 +- *Project [i#122, j#123, k#124] +- *Filter (isnotnull(k#124) && isnotnull(j#123)) +- *FileScan orc default.table9[i#122,j#123,k#124] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string> ``` After: (note that the `Sort` step is no longer there) ``` == Physical Plan == *SortMergeJoin [j#49, k#50], [j#52, k#53], Inner :- *Project [i#48, j#49, k#50] : +- *Filter (isnotnull(k#50) && isnotnull(j#49)) : +- *FileScan orc default.table8[i#48,j#49,k#50] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string> +- *Project [i#51, j#52, k#53] +- *Filter (isnotnull(j#52) && isnotnull(k#53)) +- *FileScan orc default.table9[i#51,j#52,k#53] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(k)], ReadSchema: struct<i:int,j:int,k:string> ``` ## How was this patch tested? Added a test case in `JoinSuite`. Ran all other tests in `JoinSuite` Author: Tejas Patil <tejasp@fb.com> Closes #14864 from tejasapatil/SPARK-15453_smb_optimization.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala79
1 files changed, 62 insertions, 17 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 9597bdf34b..6cdba40693 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -23,12 +23,11 @@ import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, SparkSession, SQLContext}
+import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
-import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
@@ -156,24 +155,72 @@ case class FileSourceScanExec(
false
}
- override val outputPartitioning: Partitioning = {
+ @transient private lazy val selectedPartitions = relation.location.listFiles(partitionFilters)
+
+ override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
relation.bucketSpec
} else {
None
}
- bucketSpec.map { spec =>
- val numBuckets = spec.numBuckets
- val bucketColumns = spec.bucketColumnNames.flatMap { n =>
- output.find(_.name == n)
- }
- if (bucketColumns.size == spec.bucketColumnNames.size) {
- HashPartitioning(bucketColumns, numBuckets)
- } else {
- UnknownPartitioning(0)
- }
- }.getOrElse {
- UnknownPartitioning(0)
+ bucketSpec match {
+ case Some(spec) =>
+ // For bucketed columns:
+ // -----------------------
+ // `HashPartitioning` would be used only when:
+ // 1. ALL the bucketing columns are being read from the table
+ //
+ // For sorted columns:
+ // ---------------------
+ // Sort ordering should be used when ALL these criteria's match:
+ // 1. `HashPartitioning` is being used
+ // 2. A prefix (or all) of the sort columns are being read from the table.
+ //
+ // Sort ordering would be over the prefix subset of `sort columns` being read
+ // from the table.
+ // eg.
+ // Assume (col0, col2, col3) are the columns read from the table
+ // If sort columns are (col0, col1), then sort ordering would be considered as (col0)
+ // If sort columns are (col1, col0), then sort ordering would be empty as per rule #2
+ // above
+
+ def toAttribute(colName: String): Option[Attribute] =
+ output.find(_.name == colName)
+
+ val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
+ if (bucketColumns.size == spec.bucketColumnNames.size) {
+ val partitioning = HashPartitioning(bucketColumns, spec.numBuckets)
+ val sortColumns =
+ spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => x.isDefined).map(_.get)
+
+ val sortOrder = if (sortColumns.nonEmpty) {
+ // In case of bucketing, its possible to have multiple files belonging to the
+ // same bucket in a given relation. Each of these files are locally sorted
+ // but those files combined together are not globally sorted. Given that,
+ // the RDD partition will not be sorted even if the relation has sort columns set
+ // Current solution is to check if all the buckets have a single file in it
+
+ val files = selectedPartitions.flatMap(partition => partition.files)
+ val bucketToFilesGrouping =
+ files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file))
+ val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1)
+
+ if (singleFilePartitions) {
+ // TODO Currently Spark does not support writing columns sorting in descending order
+ // so using Ascending order. This can be fixed in future
+ sortColumns.map(attribute => SortOrder(attribute, Ascending))
+ } else {
+ Nil
+ }
+ } else {
+ Nil
+ }
+ (partitioning, sortOrder)
+ } else {
+ (UnknownPartitioning(0), Nil)
+ }
+ case _ =>
+ (UnknownPartitioning(0), Nil)
}
}
@@ -187,8 +234,6 @@ case class FileSourceScanExec(
"InputPaths" -> relation.location.paths.mkString(", "))
private lazy val inputRDD: RDD[InternalRow] = {
- val selectedPartitions = relation.location.listFiles(partitionFilters)
-
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,