aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-03-14 22:25:57 -0700
committerReynold Xin <rxin@databricks.com>2016-03-14 22:25:57 -0700
commit9256840cb631cad50852b2b218a1ac71b567084a (patch)
tree8598e38c36486b15503b2ed97ffa0588f9cd2c98 /sql
parente76679a814f5a0903c5f93d9a482f5ddc56fe0d2 (diff)
downloadspark-9256840cb631cad50852b2b218a1ac71b567084a.tar.gz
spark-9256840cb631cad50852b2b218a1ac71b567084a.tar.bz2
spark-9256840cb631cad50852b2b218a1ac71b567084a.zip
[SPARK-13661][SQL] avoid the copy in HashedRelation
## What changes were proposed in this pull request? Avoid the copy in HashedRelation, since most of the HashedRelation are built with Array[Row], added the copy() for LeftSemiJoinHash. This could help to reduce the memory consumption for Broadcast join. ## How was this patch tested? Existing tests. Author: Davies Liu <davies@databricks.com> Closes #11666 from davies/remove_copy.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala2
2 files changed, 9 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 6235897ed1..0b0f59c3e4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -156,6 +156,11 @@ private[joins] class UniqueKeyHashedRelation(
private[execution] object HashedRelation {
+ /**
+ * Create a HashedRelation from an Iterator of InternalRow.
+ *
+ * Note: The caller should make sure that these InternalRow are different objects.
+ */
def apply(
input: Iterator[InternalRow],
keyGenerator: Projection,
@@ -188,7 +193,7 @@ private[execution] object HashedRelation {
keyIsUnique = false
existingMatchList
}
- matchList += currentRow.copy()
+ matchList += currentRow
}
}
@@ -438,7 +443,7 @@ private[joins] object UnsafeHashedRelation {
} else {
existingMatchList
}
- matchList += unsafeRow.copy()
+ matchList += unsafeRow
}
}
@@ -622,7 +627,7 @@ private[joins] object LongHashedRelation {
keyIsUnique = false
existingMatchList
}
- matchList += unsafeRow.copy()
+ matchList += unsafeRow
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
index 242ed61232..14389e45ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
@@ -47,7 +47,7 @@ case class LeftSemiJoinHash(
val numOutputRows = longMetric("numOutputRows")
right.execute().zipPartitions(left.execute()) { (buildIter, streamIter) =>
- val hashRelation = HashedRelation(buildIter, rightKeyGenerator)
+ val hashRelation = HashedRelation(buildIter.map(_.copy()), rightKeyGenerator)
hashSemiJoin(streamIter, hashRelation, numOutputRows)
}
}