aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src
diff options
context:
space:
mode:
authorDaoyuan Wang <daoyuan.wang@intel.com>2015-04-15 14:06:10 -0700
committerMichael Armbrust <michael@databricks.com>2015-04-15 14:06:10 -0700
commit585638e81ce09a72b9e7f95d38e0d432cfa02456 (patch)
treeb26fb5702b77fd7124b952be806a4cfc039890ce /sql/catalyst/src
parent4754e16f4746ebd882b2ce7f1efc6e4d4408922c (diff)
downloadspark-585638e81ce09a72b9e7f95d38e0d432cfa02456.tar.gz
spark-585638e81ce09a72b9e7f95d38e0d432cfa02456.tar.bz2
spark-585638e81ce09a72b9e7f95d38e0d432cfa02456.zip
[SPARK-2213] [SQL] sort merge join for spark sql
Thanks for the initial work from Ishiihara in #3173 This PR introduce a new join method of sort merge join, which firstly ensure that keys of same value are in the same partition, and inside each partition the Rows are sorted by key. Then we can run down both sides together, find matched rows using [sort merge join](http://en.wikipedia.org/wiki/Sort-merge_join). In this way, we don't have to store the whole hash table of one side as hash join, thus we have less memory usage. Also, this PR would benefit from #3438 , making the sorting phrase much more efficient. We introduced a new configuration of "spark.sql.planner.sortMergeJoin" to switch between this(`true`) and ShuffledHashJoin(`false`), probably we want the default value of it be `false` at first. Author: Daoyuan Wang <daoyuan.wang@intel.com> Author: Michael Armbrust <michael@databricks.com> This patch had conflicts when merged, resolved by Committer: Michael Armbrust <michael@databricks.com> Closes #5208 from adrian-wang/smj and squashes the following commits: 2493b9f [Daoyuan Wang] fix style 5049d88 [Daoyuan Wang] propagate rowOrdering for RangePartitioning f91a2ae [Daoyuan Wang] yin's comment: use external sort if option is enabled, add comments f515cd2 [Daoyuan Wang] yin's comment: outputOrdering, join suite refine ec8061b [Daoyuan Wang] minor change 413fd24 [Daoyuan Wang] Merge pull request #3 from marmbrus/pr/5208 952168a [Michael Armbrust] add type 5492884 [Michael Armbrust] copy when ordering 7ddd656 [Michael Armbrust] Cleanup addition of ordering requirements b198278 [Daoyuan Wang] inherit ordering in project c8e82a3 [Daoyuan Wang] fix style 6e897dd [Daoyuan Wang] hide boundReference from manually construct RowOrdering for key compare in smj 8681d73 [Daoyuan Wang] refactor Exchange and fix copy for sorting 2875ef2 [Daoyuan Wang] fix changed configuration 61d7f49 [Daoyuan Wang] add omitted comment 00a4430 [Daoyuan Wang] fix bug 078d69b [Daoyuan Wang] address comments: add comments, do sort in shuffle, and others 3af6ba5 [Daoyuan Wang] use buffer for only one side 171001f [Daoyuan Wang] change default outputordering 47455c9 [Daoyuan Wang] add apache license ... a28277f [Daoyuan Wang] fix style 645c70b [Daoyuan Wang] address comments using sort 068c35d [Daoyuan Wang] fix new style and add some tests 925203b [Daoyuan Wang] address comments 07ce92f [Daoyuan Wang] fix ArrayIndexOutOfBound 42fca0e [Daoyuan Wang] code clean e3ec096 [Daoyuan Wang] fix comment style.. 2edd235 [Daoyuan Wang] fix outputpartitioning 57baa40 [Daoyuan Wang] fix sort eval bug 303b6da [Daoyuan Wang] fix several errors 95db7ad [Daoyuan Wang] fix brackets for if-statement 4464f16 [Daoyuan Wang] fix error 880d8e9 [Daoyuan Wang] sort merge join for spark sql
Diffstat (limited to 'sql/catalyst/src')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala10
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala13
2 files changed, 21 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
index 1b62e17ff4..b6ec7d3417 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
@@ -17,8 +17,7 @@
package org.apache.spark.sql.catalyst.expressions
-import org.apache.spark.sql.types.{UTF8String, StructType, NativeType}
-
+import org.apache.spark.sql.types.{UTF8String, DataType, StructType, NativeType}
/**
* An extended interface to [[Row]] that allows the values for each column to be updated. Setting
@@ -239,3 +238,10 @@ class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[Row] {
return 0
}
}
+
+object RowOrdering {
+ def forSchema(dataTypes: Seq[DataType]): RowOrdering =
+ new RowOrdering(dataTypes.zipWithIndex.map {
+ case(dt, index) => new SortOrder(BoundReference(index, dt, nullable = true), Ascending)
+ })
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 288c11f69f..fb4217a448 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -94,6 +94,9 @@ sealed trait Partitioning {
* only compatible if the `numPartitions` of them is the same.
*/
def compatibleWith(other: Partitioning): Boolean
+
+ /** Returns the expressions that are used to key the partitioning. */
+ def keyExpressions: Seq[Expression]
}
case class UnknownPartitioning(numPartitions: Int) extends Partitioning {
@@ -106,6 +109,8 @@ case class UnknownPartitioning(numPartitions: Int) extends Partitioning {
case UnknownPartitioning(_) => true
case _ => false
}
+
+ override def keyExpressions: Seq[Expression] = Nil
}
case object SinglePartition extends Partitioning {
@@ -117,6 +122,8 @@ case object SinglePartition extends Partitioning {
case SinglePartition => true
case _ => false
}
+
+ override def keyExpressions: Seq[Expression] = Nil
}
case object BroadcastPartitioning extends Partitioning {
@@ -128,6 +135,8 @@ case object BroadcastPartitioning extends Partitioning {
case SinglePartition => true
case _ => false
}
+
+ override def keyExpressions: Seq[Expression] = Nil
}
/**
@@ -158,6 +167,8 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
case _ => false
}
+ override def keyExpressions: Seq[Expression] = expressions
+
override def eval(input: Row = null): EvaluatedType =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
}
@@ -200,6 +211,8 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
case _ => false
}
+ override def keyExpressions: Seq[Expression] = ordering.map(_.child)
+
override def eval(input: Row): EvaluatedType =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
}