diff options
author | Davies Liu <davies@databricks.com> | 2015-07-08 10:43:00 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-07-08 10:43:00 -0700 |
commit | 74335b31072951244967f878d8b766cd1bfc2ac6 (patch) | |
tree | f402a910b94ddce1edb3758ab45b00aa403867b2 | |
parent | 3e831a26965a5e92210431f9ad6935f70aa01b48 (diff) | |
download | spark-74335b31072951244967f878d8b766cd1bfc2ac6.tar.gz spark-74335b31072951244967f878d8b766cd1bfc2ac6.tar.bz2 spark-74335b31072951244967f878d8b766cd1bfc2ac6.zip |
[SPARK-5707] [SQL] fix serialization of generated projection
Author: Davies Liu <davies@databricks.com>
Closes #7272 from davies/fix_projection and squashes the following commits:
075ef76 [Davies Liu] fix codegen with BroadcastHashJion
3 files changed, 3 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala index 06c244f211..ab757fc7de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala @@ -79,8 +79,7 @@ case class BroadcastHashOuterJoin( // Note that we use .execute().collect() because we don't want to convert data to Scala types
val input: Array[InternalRow] = buildPlan.execute().map(_.copy()).collect()
// buildHashTable uses code-generated rows as keys, which are not serializable
- val hashed =
- buildHashTable(input.iterator, new InterpretedProjection(buildKeys, buildPlan.output))
+ val hashed = buildHashTable(input.iterator, newProjection(buildKeys, buildPlan.output))
sparkContext.broadcast(hashed)
}(BroadcastHashOuterJoin.broadcastHashOuterJoinExecutionContext)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala index 3337451877..0522ee85ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala @@ -171,7 +171,7 @@ override def outputPartitioning: Partitioning = joinType match { var existingMatchList = hashTable.get(rowKey) if (existingMatchList == null) { existingMatchList = new CompactBuffer[InternalRow]() - hashTable.put(rowKey, existingMatchList) + hashTable.put(rowKey.copy(), existingMatchList) } existingMatchList += currentRow.copy() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index de062c791f..6b51f5d415 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -125,7 +125,7 @@ private[joins] object HashedRelation { val existingMatchList = hashTable.get(rowKey) val matchList = if (existingMatchList == null) { val newMatchList = new CompactBuffer[InternalRow]() - hashTable.put(rowKey, newMatchList) + hashTable.put(rowKey.copy(), newMatchList) newMatchList } else { keyIsUnique = false |