From 74335b31072951244967f878d8b766cd1bfc2ac6 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 8 Jul 2015 10:43:00 -0700 Subject: [SPARK-5707] [SQL] fix serialization of generated projection Author: Davies Liu Closes #7272 from davies/fix_projection and squashes the following commits: 075ef76 [Davies Liu] fix codegen with BroadcastHashJion --- .../org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala | 3 +-- .../scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala | 2 +- .../scala/org/apache/spark/sql/execution/joins/HashedRelation.scala | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala index 06c244f211..ab757fc7de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala @@ -79,8 +79,7 @@ case class BroadcastHashOuterJoin( // Note that we use .execute().collect() because we don't want to convert data to Scala types val input: Array[InternalRow] = buildPlan.execute().map(_.copy()).collect() // buildHashTable uses code-generated rows as keys, which are not serializable - val hashed = - buildHashTable(input.iterator, new InterpretedProjection(buildKeys, buildPlan.output)) + val hashed = buildHashTable(input.iterator, newProjection(buildKeys, buildPlan.output)) sparkContext.broadcast(hashed) }(BroadcastHashOuterJoin.broadcastHashOuterJoinExecutionContext) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala index 3337451877..0522ee85ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala @@ -171,7 +171,7 @@ override def outputPartitioning: Partitioning = joinType match { var existingMatchList = hashTable.get(rowKey) if (existingMatchList == null) { existingMatchList = new CompactBuffer[InternalRow]() - hashTable.put(rowKey, existingMatchList) + hashTable.put(rowKey.copy(), existingMatchList) } existingMatchList += currentRow.copy() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index de062c791f..6b51f5d415 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -125,7 +125,7 @@ private[joins] object HashedRelation { val existingMatchList = hashTable.get(rowKey) val matchList = if (existingMatchList == null) { val newMatchList = new CompactBuffer[InternalRow]() - hashTable.put(rowKey, newMatchList) + hashTable.put(rowKey.copy(), newMatchList) newMatchList } else { keyIsUnique = false -- cgit v1.2.3