diff options
author | Daoyuan <daoyuan.wang@intel.com> | 2014-06-11 12:08:28 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-06-11 12:09:42 -0700 |
commit | ce6deb1e5b4cd40c97730fcf5dc89cb2f624bce2 (patch) | |
tree | e6486802508d79cc289cc3e050289635bc3e02f0 /sql/core | |
parent | 4107cce58c41160a0dc20339621eacdf8a8b1191 (diff) | |
download | spark-ce6deb1e5b4cd40c97730fcf5dc89cb2f624bce2.tar.gz spark-ce6deb1e5b4cd40c97730fcf5dc89cb2f624bce2.tar.bz2 spark-ce6deb1e5b4cd40c97730fcf5dc89cb2f624bce2.zip |
[SQL] Code Cleanup: Left Semi Hash Join
Some improvement for PR #837, add another case to white list and use `filter` to build result iterator.
Author: Daoyuan <daoyuan.wang@intel.com>
Closes #1049 from adrian-wang/clean-LeftSemiJoinHash and squashes the following commits:
b314d5a [Daoyuan] change hashSet name
27579a9 [Daoyuan] add semijoin to white list and use filter to create new iterator in LeftSemiJoinBNL
Signed-off-by: Michael Armbrust <michael@databricks.com>
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala | 40 |
1 files changed, 7 insertions, 33 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index 88ff3d49a7..8d7a5ba59f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -169,7 +169,7 @@ case class LeftSemiJoinHash( def execute() = { buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => - val hashTable = new java.util.HashSet[Row]() + val hashSet = new java.util.HashSet[Row]() var currentRow: Row = null // Create a Hash set of buildKeys @@ -177,43 +177,17 @@ case class LeftSemiJoinHash( currentRow = buildIter.next() val rowKey = buildSideKeyGenerator(currentRow) if(!rowKey.anyNull) { - val keyExists = hashTable.contains(rowKey) + val keyExists = hashSet.contains(rowKey) if (!keyExists) { - hashTable.add(rowKey) + hashSet.add(rowKey) } } } - new Iterator[Row] { - private[this] var currentStreamedRow: Row = _ - private[this] var currentHashMatched: Boolean = false - - private[this] val joinKeys = streamSideKeyGenerator() - - override final def hasNext: Boolean = - streamIter.hasNext && fetchNext() - - override final def next() = { - currentStreamedRow - } - - /** - * Searches the streamed iterator for the next row that has at least one match in hashtable. - * - * @return true if the search is successful, and false the streamed iterator runs out of - * tuples. - */ - private final def fetchNext(): Boolean = { - currentHashMatched = false - while (!currentHashMatched && streamIter.hasNext) { - currentStreamedRow = streamIter.next() - if (!joinKeys(currentStreamedRow).anyNull) { - currentHashMatched = hashTable.contains(joinKeys.currentValue) - } - } - currentHashMatched - } - } + val joinKeys = streamSideKeyGenerator() + streamIter.filter(current => { + !joinKeys(current).anyNull && hashSet.contains(joinKeys.currentValue) + }) } } } |