aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-07-08 10:43:00 -0700
committerMichael Armbrust <michael@databricks.com>2015-07-08 10:43:00 -0700
commit74335b31072951244967f878d8b766cd1bfc2ac6 (patch)
treef402a910b94ddce1edb3758ab45b00aa403867b2
parent3e831a26965a5e92210431f9ad6935f70aa01b48 (diff)
downloadspark-74335b31072951244967f878d8b766cd1bfc2ac6.tar.gz
spark-74335b31072951244967f878d8b766cd1bfc2ac6.tar.bz2
spark-74335b31072951244967f878d8b766cd1bfc2ac6.zip
[SPARK-5707] [SQL] fix serialization of generated projection
Author: Davies Liu <davies@databricks.com> Closes #7272 from davies/fix_projection and squashes the following commits: 075ef76 [Davies Liu] fix codegen with BroadcastHashJion
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala2
3 files changed, 3 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
index 06c244f211..ab757fc7de 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
@@ -79,8 +79,7 @@ case class BroadcastHashOuterJoin(
// Note that we use .execute().collect() because we don't want to convert data to Scala types
val input: Array[InternalRow] = buildPlan.execute().map(_.copy()).collect()
// buildHashTable uses code-generated rows as keys, which are not serializable
- val hashed =
- buildHashTable(input.iterator, new InterpretedProjection(buildKeys, buildPlan.output))
+ val hashed = buildHashTable(input.iterator, newProjection(buildKeys, buildPlan.output))
sparkContext.broadcast(hashed)
}(BroadcastHashOuterJoin.broadcastHashOuterJoinExecutionContext)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
index 3337451877..0522ee85ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
@@ -171,7 +171,7 @@ override def outputPartitioning: Partitioning = joinType match {
var existingMatchList = hashTable.get(rowKey)
if (existingMatchList == null) {
existingMatchList = new CompactBuffer[InternalRow]()
- hashTable.put(rowKey, existingMatchList)
+ hashTable.put(rowKey.copy(), existingMatchList)
}
existingMatchList += currentRow.copy()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index de062c791f..6b51f5d415 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -125,7 +125,7 @@ private[joins] object HashedRelation {
val existingMatchList = hashTable.get(rowKey)
val matchList = if (existingMatchList == null) {
val newMatchList = new CompactBuffer[InternalRow]()
- hashTable.put(rowKey, newMatchList)
+ hashTable.put(rowKey.copy(), newMatchList)
newMatchList
} else {
keyIsUnique = false