diff options
author | Davies Liu <davies@databricks.com> | 2016-09-06 10:46:31 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-09-06 10:46:31 -0700 |
commit | f7e26d788757f917b32749856bb29feb7b4c2987 (patch) | |
tree | b3efe1ff2f73e345c1e4c955239c648e0b9739ca /sql/core/src/test/scala/org | |
parent | bc2767df2666ff615e7f44e980555afab06dd8a3 (diff) | |
download | spark-f7e26d788757f917b32749856bb29feb7b4c2987.tar.gz spark-f7e26d788757f917b32749856bb29feb7b4c2987.tar.bz2 spark-f7e26d788757f917b32749856bb29feb7b4c2987.zip |
[SPARK-16922] [SPARK-17211] [SQL] make the address of values portable in LongToUnsafeRowMap
## What changes were proposed in this pull request?
In LongToUnsafeRowMap, we use offset of a value as pointer, stored in a array also in the page for chained values. The offset is not portable, because Platform.LONG_ARRAY_OFFSET will be different with different JVM Heap size, then the deserialized LongToUnsafeRowMap will be corrupt.
This PR will change to use portable address (without Platform.LONG_ARRAY_OFFSET).
## How was this patch tested?
Added a test case with random generated keys, to improve the coverage. But this test is not a regression test, that could require a Spark cluster that have at least 32G heap in driver or executor.
Author: Davies Liu <davies@databricks.com>
Closes #14927 from davies/longmap.
Diffstat (limited to 'sql/core/src/test/scala/org')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala | 56 |
1 files changed, 56 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala index 1196f5ec7b..ede63fea96 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.joins import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} +import scala.util.Random + import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager} import org.apache.spark.serializer.KryoSerializer @@ -197,6 +199,60 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext { } } + test("LongToUnsafeRowMap with random keys") { + val taskMemoryManager = new TaskMemoryManager( + new StaticMemoryManager( + new SparkConf().set("spark.memory.offHeap.enabled", "false"), + Long.MaxValue, + Long.MaxValue, + 1), + 0) + val unsafeProj = UnsafeProjection.create(Seq(BoundReference(0, LongType, false))) + + val N = 1000000 + val rand = new Random + val keys = (0 to N).map(x => rand.nextLong()).toArray + + val map = new LongToUnsafeRowMap(taskMemoryManager, 10) + keys.foreach { k => + map.append(k, unsafeProj(InternalRow(k))) + } + map.optimize() + + val os = new ByteArrayOutputStream() + val out = new ObjectOutputStream(os) + map.writeExternal(out) + out.flush() + val in = new ObjectInputStream(new ByteArrayInputStream(os.toByteArray)) + val map2 = new LongToUnsafeRowMap(taskMemoryManager, 1) + map2.readExternal(in) + + val row = unsafeProj(InternalRow(0L)).copy() + keys.foreach { k => + val r = map2.get(k, row) + assert(r.hasNext) + var c = 0 + while (r.hasNext) { + val rr = r.next() + assert(rr.getLong(0) === k) + c += 1 + } + } + var i = 0 + while (i < N * 10) { + val k = rand.nextLong() + val r = map2.get(k, row) + if (r != null) { + assert(r.hasNext) + while (r.hasNext) { + assert(r.next().getLong(0) === k) + } + } + i += 1 + } + map.free() + } + test("Spark-14521") { val ser = new KryoSerializer( (new SparkConf).set("spark.kryo.referenceTracking", "false")).newInstance() |