diff options
author | Davies Liu <davies@databricks.com> | 2016-03-14 22:25:57 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-03-14 22:25:57 -0700 |
commit | 9256840cb631cad50852b2b218a1ac71b567084a (patch) | |
tree | 8598e38c36486b15503b2ed97ffa0588f9cd2c98 /sql | |
parent | e76679a814f5a0903c5f93d9a482f5ddc56fe0d2 (diff) | |
download | spark-9256840cb631cad50852b2b218a1ac71b567084a.tar.gz spark-9256840cb631cad50852b2b218a1ac71b567084a.tar.bz2 spark-9256840cb631cad50852b2b218a1ac71b567084a.zip |
[SPARK-13661][SQL] avoid the copy in HashedRelation
## What changes were proposed in this pull request?
Avoid the copy in HashedRelation, since most of the HashedRelation are built with Array[Row], added the copy() for LeftSemiJoinHash. This could help to reduce the memory consumption for Broadcast join.
## How was this patch tested?
Existing tests.
Author: Davies Liu <davies@databricks.com>
Closes #11666 from davies/remove_copy.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala | 11 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala | 2 |
2 files changed, 9 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 6235897ed1..0b0f59c3e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -156,6 +156,11 @@ private[joins] class UniqueKeyHashedRelation( private[execution] object HashedRelation { + /** + * Create a HashedRelation from an Iterator of InternalRow. + * + * Note: The caller should make sure that these InternalRow are different objects. + */ def apply( input: Iterator[InternalRow], keyGenerator: Projection, @@ -188,7 +193,7 @@ private[execution] object HashedRelation { keyIsUnique = false existingMatchList } - matchList += currentRow.copy() + matchList += currentRow } } @@ -438,7 +443,7 @@ private[joins] object UnsafeHashedRelation { } else { existingMatchList } - matchList += unsafeRow.copy() + matchList += unsafeRow } } @@ -622,7 +627,7 @@ private[joins] object LongHashedRelation { keyIsUnique = false existingMatchList } - matchList += unsafeRow.copy() + matchList += unsafeRow } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala index 242ed61232..14389e45ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala @@ -47,7 +47,7 @@ case class LeftSemiJoinHash( val numOutputRows = longMetric("numOutputRows") right.execute().zipPartitions(left.execute()) { (buildIter, streamIter) => - val hashRelation = HashedRelation(buildIter, rightKeyGenerator) + val hashRelation = HashedRelation(buildIter.map(_.copy()), rightKeyGenerator) hashSemiJoin(streamIter, hashRelation, numOutputRows) } } |