aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-09-06 10:46:31 -0700
committerDavies Liu <davies.liu@gmail.com>2016-09-06 10:46:31 -0700
commitf7e26d788757f917b32749856bb29feb7b4c2987 (patch)
treeb3efe1ff2f73e345c1e4c955239c648e0b9739ca /sql/core/src/test/scala
parentbc2767df2666ff615e7f44e980555afab06dd8a3 (diff)
downloadspark-f7e26d788757f917b32749856bb29feb7b4c2987.tar.gz
spark-f7e26d788757f917b32749856bb29feb7b4c2987.tar.bz2
spark-f7e26d788757f917b32749856bb29feb7b4c2987.zip
[SPARK-16922] [SPARK-17211] [SQL] make the address of values portable in LongToUnsafeRowMap
## What changes were proposed in this pull request? In LongToUnsafeRowMap, we use offset of a value as pointer, stored in a array also in the page for chained values. The offset is not portable, because Platform.LONG_ARRAY_OFFSET will be different with different JVM Heap size, then the deserialized LongToUnsafeRowMap will be corrupt. This PR will change to use portable address (without Platform.LONG_ARRAY_OFFSET). ## How was this patch tested? Added a test case with random generated keys, to improve the coverage. But this test is not a regression test, that could require a Spark cluster that have at least 32G heap in driver or executor. Author: Davies Liu <davies@databricks.com> Closes #14927 from davies/longmap.
Diffstat (limited to 'sql/core/src/test/scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala56
1 files changed, 56 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
index 1196f5ec7b..ede63fea96 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.joins
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
+import scala.util.Random
+
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager}
import org.apache.spark.serializer.KryoSerializer
@@ -197,6 +199,60 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
}
}
+ test("LongToUnsafeRowMap with random keys") {
+ val taskMemoryManager = new TaskMemoryManager(
+ new StaticMemoryManager(
+ new SparkConf().set("spark.memory.offHeap.enabled", "false"),
+ Long.MaxValue,
+ Long.MaxValue,
+ 1),
+ 0)
+ val unsafeProj = UnsafeProjection.create(Seq(BoundReference(0, LongType, false)))
+
+ val N = 1000000
+ val rand = new Random
+ val keys = (0 to N).map(x => rand.nextLong()).toArray
+
+ val map = new LongToUnsafeRowMap(taskMemoryManager, 10)
+ keys.foreach { k =>
+ map.append(k, unsafeProj(InternalRow(k)))
+ }
+ map.optimize()
+
+ val os = new ByteArrayOutputStream()
+ val out = new ObjectOutputStream(os)
+ map.writeExternal(out)
+ out.flush()
+ val in = new ObjectInputStream(new ByteArrayInputStream(os.toByteArray))
+ val map2 = new LongToUnsafeRowMap(taskMemoryManager, 1)
+ map2.readExternal(in)
+
+ val row = unsafeProj(InternalRow(0L)).copy()
+ keys.foreach { k =>
+ val r = map2.get(k, row)
+ assert(r.hasNext)
+ var c = 0
+ while (r.hasNext) {
+ val rr = r.next()
+ assert(rr.getLong(0) === k)
+ c += 1
+ }
+ }
+ var i = 0
+ while (i < N * 10) {
+ val k = rand.nextLong()
+ val r = map2.get(k, row)
+ if (r != null) {
+ assert(r.hasNext)
+ while (r.hasNext) {
+ assert(r.next().getLong(0) === k)
+ }
+ }
+ i += 1
+ }
+ map.free()
+ }
+
test("Spark-14521") {
val ser = new KryoSerializer(
(new SparkConf).set("spark.kryo.referenceTracking", "false")).newInstance()