aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorTejas Patil <tejasp@fb.com>2016-09-10 09:27:22 +0800
committerWenchen Fan <wenchen@databricks.com>2016-09-10 09:27:22 +0800
commit335491704c526921da3b3c5035175677ba5b92de (patch)
tree43194c82c849831ea54c96c012aef4f6f04d49ca /sql
parentf7d2143705c8c1baeed0bc62940f9dba636e705b (diff)
downloadspark-335491704c526921da3b3c5035175677ba5b92de.tar.gz
spark-335491704c526921da3b3c5035175677ba5b92de.tar.bz2
spark-335491704c526921da3b3c5035175677ba5b92de.zip
[SPARK-15453][SQL] FileSourceScanExec to extract `outputOrdering` information
## What changes were proposed in this pull request? Jira : https://issues.apache.org/jira/browse/SPARK-15453 Extracting sort ordering information in `FileSourceScanExec` so that planner can make use of it. My motivation to make this change was to get Sort Merge join in par with Hive's Sort-Merge-Bucket join when the source tables are bucketed + sorted. Query: ``` val df = (0 until 16).map(i => (i % 8, i * 2, i.toString)).toDF("i", "j", "k").coalesce(1) df.write.bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table8") df.write.bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table9") context.sql("SELECT * FROM table8 a JOIN table9 b ON a.j=b.j AND a.k=b.k").explain(true) ``` Before: ``` == Physical Plan == *SortMergeJoin [j#120, k#121], [j#123, k#124], Inner :- *Sort [j#120 ASC, k#121 ASC], false, 0 : +- *Project [i#119, j#120, k#121] : +- *Filter (isnotnull(k#121) && isnotnull(j#120)) : +- *FileScan orc default.table8[i#119,j#120,k#121] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string> +- *Sort [j#123 ASC, k#124 ASC], false, 0 +- *Project [i#122, j#123, k#124] +- *Filter (isnotnull(k#124) && isnotnull(j#123)) +- *FileScan orc default.table9[i#122,j#123,k#124] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string> ``` After: (note that the `Sort` step is no longer there) ``` == Physical Plan == *SortMergeJoin [j#49, k#50], [j#52, k#53], Inner :- *Project [i#48, j#49, k#50] : +- *Filter (isnotnull(k#50) && isnotnull(j#49)) : +- *FileScan orc default.table8[i#48,j#49,k#50] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string> +- *Project [i#51, j#52, k#53] +- *Filter (isnotnull(j#52) && isnotnull(k#53)) +- *FileScan orc default.table9[i#51,j#52,k#53] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(k)], ReadSchema: struct<i:int,j:int,k:string> ``` ## How was this patch tested? Added a test case in `JoinSuite`. Ran all other tests in `JoinSuite` Author: Tejas Patil <tejasp@fb.com> Closes #14864 from tejasapatil/SPARK-15453_smb_optimization.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala79
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala63
2 files changed, 123 insertions, 19 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 9597bdf34b..6cdba40693 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -23,12 +23,11 @@ import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, SparkSession, SQLContext}
+import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
-import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
@@ -156,24 +155,72 @@ case class FileSourceScanExec(
false
}
- override val outputPartitioning: Partitioning = {
+ @transient private lazy val selectedPartitions = relation.location.listFiles(partitionFilters)
+
+ override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
relation.bucketSpec
} else {
None
}
- bucketSpec.map { spec =>
- val numBuckets = spec.numBuckets
- val bucketColumns = spec.bucketColumnNames.flatMap { n =>
- output.find(_.name == n)
- }
- if (bucketColumns.size == spec.bucketColumnNames.size) {
- HashPartitioning(bucketColumns, numBuckets)
- } else {
- UnknownPartitioning(0)
- }
- }.getOrElse {
- UnknownPartitioning(0)
+ bucketSpec match {
+ case Some(spec) =>
+ // For bucketed columns:
+ // -----------------------
+ // `HashPartitioning` would be used only when:
+ // 1. ALL the bucketing columns are being read from the table
+ //
+ // For sorted columns:
+ // ---------------------
+ // Sort ordering should be used when ALL these criteria's match:
+ // 1. `HashPartitioning` is being used
+ // 2. A prefix (or all) of the sort columns are being read from the table.
+ //
+ // Sort ordering would be over the prefix subset of `sort columns` being read
+ // from the table.
+ // eg.
+ // Assume (col0, col2, col3) are the columns read from the table
+ // If sort columns are (col0, col1), then sort ordering would be considered as (col0)
+ // If sort columns are (col1, col0), then sort ordering would be empty as per rule #2
+ // above
+
+ def toAttribute(colName: String): Option[Attribute] =
+ output.find(_.name == colName)
+
+ val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
+ if (bucketColumns.size == spec.bucketColumnNames.size) {
+ val partitioning = HashPartitioning(bucketColumns, spec.numBuckets)
+ val sortColumns =
+ spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => x.isDefined).map(_.get)
+
+ val sortOrder = if (sortColumns.nonEmpty) {
+ // In case of bucketing, its possible to have multiple files belonging to the
+ // same bucket in a given relation. Each of these files are locally sorted
+ // but those files combined together are not globally sorted. Given that,
+ // the RDD partition will not be sorted even if the relation has sort columns set
+ // Current solution is to check if all the buckets have a single file in it
+
+ val files = selectedPartitions.flatMap(partition => partition.files)
+ val bucketToFilesGrouping =
+ files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file))
+ val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1)
+
+ if (singleFilePartitions) {
+ // TODO Currently Spark does not support writing columns sorting in descending order
+ // so using Ascending order. This can be fixed in future
+ sortColumns.map(attribute => SortOrder(attribute, Ascending))
+ } else {
+ Nil
+ }
+ } else {
+ Nil
+ }
+ (partitioning, sortOrder)
+ } else {
+ (UnknownPartitioning(0), Nil)
+ }
+ case _ =>
+ (UnknownPartitioning(0), Nil)
}
}
@@ -187,8 +234,6 @@ case class FileSourceScanExec(
"InputPaths" -> relation.location.paths.mkString(", "))
private lazy val inputRDD: RDD[InternalRow] = {
- val selectedPartitions = relation.location.listFiles(partitionFilters)
-
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index ca2ec9f6a5..3ff85176de 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
-import org.apache.spark.sql.execution.DataSourceScanExec
+import org.apache.spark.sql.execution.{DataSourceScanExec, SortExec}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
@@ -237,7 +237,9 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
bucketSpecRight: Option[BucketSpec],
joinColumns: Seq[String],
shuffleLeft: Boolean,
- shuffleRight: Boolean): Unit = {
+ shuffleRight: Boolean,
+ sortLeft: Boolean = true,
+ sortRight: Boolean = true): Unit = {
withTable("bucketed_table1", "bucketed_table2") {
def withBucket(
writer: DataFrameWriter[Row],
@@ -247,6 +249,15 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
spec.numBuckets,
spec.bucketColumnNames.head,
spec.bucketColumnNames.tail: _*)
+
+ if (spec.sortColumnNames.nonEmpty) {
+ writer.sortBy(
+ spec.sortColumnNames.head,
+ spec.sortColumnNames.tail: _*
+ )
+ } else {
+ writer
+ }
}.getOrElse(writer)
}
@@ -267,12 +278,21 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
assert(joined.queryExecution.executedPlan.isInstanceOf[SortMergeJoinExec])
val joinOperator = joined.queryExecution.executedPlan.asInstanceOf[SortMergeJoinExec]
+ // check existence of shuffle
assert(
joinOperator.left.find(_.isInstanceOf[ShuffleExchange]).isDefined == shuffleLeft,
s"expected shuffle in plan to be $shuffleLeft but found\n${joinOperator.left}")
assert(
joinOperator.right.find(_.isInstanceOf[ShuffleExchange]).isDefined == shuffleRight,
s"expected shuffle in plan to be $shuffleRight but found\n${joinOperator.right}")
+
+ // check existence of sort
+ assert(
+ joinOperator.left.find(_.isInstanceOf[SortExec]).isDefined == sortLeft,
+ s"expected sort in plan to be $shuffleLeft but found\n${joinOperator.left}")
+ assert(
+ joinOperator.right.find(_.isInstanceOf[SortExec]).isDefined == sortRight,
+ s"expected sort in plan to be $shuffleRight but found\n${joinOperator.right}")
}
}
}
@@ -321,6 +341,45 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
}
}
+ test("avoid shuffle and sort when bucket and sort columns are join keys") {
+ val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
+ testBucketing(
+ bucketSpec, bucketSpec, Seq("i", "j"),
+ shuffleLeft = false, shuffleRight = false,
+ sortLeft = false, sortRight = false
+ )
+ }
+
+ test("avoid shuffle and sort when sort columns are a super set of join keys") {
+ val bucketSpec1 = Some(BucketSpec(8, Seq("i"), Seq("i", "j")))
+ val bucketSpec2 = Some(BucketSpec(8, Seq("i"), Seq("i", "k")))
+ testBucketing(
+ bucketSpec1, bucketSpec2, Seq("i"),
+ shuffleLeft = false, shuffleRight = false,
+ sortLeft = false, sortRight = false
+ )
+ }
+
+ test("only sort one side when sort columns are different") {
+ val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
+ val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("k")))
+ testBucketing(
+ bucketSpec1, bucketSpec2, Seq("i", "j"),
+ shuffleLeft = false, shuffleRight = false,
+ sortLeft = false, sortRight = true
+ )
+ }
+
+ test("only sort one side when sort columns are same but their ordering is different") {
+ val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
+ val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("j", "i")))
+ testBucketing(
+ bucketSpec1, bucketSpec2, Seq("i", "j"),
+ shuffleLeft = false, shuffleRight = false,
+ sortLeft = false, sortRight = true
+ )
+ }
+
test("avoid shuffle when grouping keys are equal to bucket keys") {
withTable("bucketed_table") {
df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("bucketed_table")