From c3e9a120e33159fb45cd99f3a55fc5cf16cd7c6c Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 11 Aug 2015 22:45:18 -0700 Subject: [SPARK-9831] [SQL] fix serialization with empty broadcast Author: Davies Liu Closes #8117 from davies/fix_serialization and squashes the following commits: d21ac71 [Davies Liu] fix serialization with empty broadcast --- .../spark/sql/execution/joins/HashedRelation.scala | 2 +- .../spark/sql/execution/joins/HashedRelationSuite.scala | 17 +++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index c1bc7947aa..076afe6e4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -299,7 +299,7 @@ private[joins] final class UnsafeHashedRelation( binaryMap = new BytesToBytesMap( taskMemoryManager, shuffleMemoryManager, - nKeys * 2, // reduce hash collision + (nKeys * 1.5 + 1).toInt, // reduce hash collision pageSizeBytes) var i = 0 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala index a1fa2c3864..c635b2d51f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala @@ -103,4 +103,21 @@ class HashedRelationSuite extends SparkFunSuite { assert(hashed2.get(unsafeData(2)) === data2) assert(numDataRows.value.value === data.length) } + + test("test serialization empty hash map") { + val os = new ByteArrayOutputStream() + val out = new ObjectOutputStream(os) + val hashed = new UnsafeHashedRelation( + new java.util.HashMap[UnsafeRow, CompactBuffer[UnsafeRow]]) + hashed.writeExternal(out) + out.flush() + val in = new ObjectInputStream(new ByteArrayInputStream(os.toByteArray)) + val hashed2 = new UnsafeHashedRelation() + hashed2.readExternal(in) + + val schema = StructType(StructField("a", IntegerType, true) :: Nil) + val toUnsafe = UnsafeProjection.create(schema) + val row = toUnsafe(InternalRow(0)) + assert(hashed2.get(row) === null) + } } -- cgit v1.2.3