aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala2
2 files changed, 9 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 6235897ed1..0b0f59c3e4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -156,6 +156,11 @@ private[joins] class UniqueKeyHashedRelation(
private[execution] object HashedRelation {
+ /**
+ * Create a HashedRelation from an Iterator of InternalRow.
+ *
+ * Note: The caller should make sure that these InternalRow are different objects.
+ */
def apply(
input: Iterator[InternalRow],
keyGenerator: Projection,
@@ -188,7 +193,7 @@ private[execution] object HashedRelation {
keyIsUnique = false
existingMatchList
}
- matchList += currentRow.copy()
+ matchList += currentRow
}
}
@@ -438,7 +443,7 @@ private[joins] object UnsafeHashedRelation {
} else {
existingMatchList
}
- matchList += unsafeRow.copy()
+ matchList += unsafeRow
}
}
@@ -622,7 +627,7 @@ private[joins] object LongHashedRelation {
keyIsUnique = false
existingMatchList
}
- matchList += unsafeRow.copy()
+ matchList += unsafeRow
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
index 242ed61232..14389e45ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
@@ -47,7 +47,7 @@ case class LeftSemiJoinHash(
val numOutputRows = longMetric("numOutputRows")
right.execute().zipPartitions(left.execute()) { (buildIter, streamIter) =>
- val hashRelation = HashedRelation(buildIter, rightKeyGenerator)
+ val hashRelation = HashedRelation(buildIter.map(_.copy()), rightKeyGenerator)
hashSemiJoin(streamIter, hashRelation, numOutputRows)
}
}