aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorSameer Agarwal <sameer@databricks.com>2016-05-26 15:49:16 -0700
committerReynold Xin <rxin@databricks.com>2016-05-26 15:49:16 -0700
commitfe6de16f781ff659b34e0ddda427d371d3d94536 (patch)
treec5cb8525e3e29aa98ff5d364ff9e96c82e4a327f /core/src
parentb5859e0bb8cc147858cb28d8bdb5ca3b4a2cec77 (diff)
downloadspark-fe6de16f781ff659b34e0ddda427d371d3d94536.tar.gz
spark-fe6de16f781ff659b34e0ddda427d371d3d94536.tar.bz2
spark-fe6de16f781ff659b34e0ddda427d371d3d94536.zip
[SPARK-8428][SPARK-13850] Fix integer overflows in TimSort
## What changes were proposed in this pull request? This patch fixes a few integer overflows in `UnsafeSortDataFormat.copyRange()` and `ShuffleSortDataFormat copyRange()` that seems to be the most likely cause behind a number of `TimSort` contract violation errors seen in Spark 2.0 and Spark 1.6 while sorting large datasets. ## How was this patch tested? Added a test in `ExternalSorterSuite` that instantiates a large array of the form of [150000000, 150000001, 150000002, ...., 300000000, 0, 1, 2, ..., 149999999] that triggers a `copyRange` in `TimSort.mergeLo` or `TimSort.mergeHi`. Note that the input dataset should contain at least 268.43 million rows with a certain data distribution for an overflow to occur. Author: Sameer Agarwal <sameer@databricks.com> Closes #13336 from sameeragarwal/timsort-bug.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java6
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java6
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala24
3 files changed, 30 insertions, 6 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java
index 8f4e322997..1e924d2aec 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java
@@ -61,10 +61,10 @@ final class ShuffleSortDataFormat extends SortDataFormat<PackedRecordPointer, Lo
public void copyRange(LongArray src, int srcPos, LongArray dst, int dstPos, int length) {
Platform.copyMemory(
src.getBaseObject(),
- src.getBaseOffset() + srcPos * 8,
+ src.getBaseOffset() + srcPos * 8L,
dst.getBaseObject(),
- dst.getBaseOffset() + dstPos * 8,
- length * 8
+ dst.getBaseOffset() + dstPos * 8L,
+ length * 8L
);
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java
index d19b71fbc1..7bda76907f 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java
@@ -75,10 +75,10 @@ public final class UnsafeSortDataFormat
public void copyRange(LongArray src, int srcPos, LongArray dst, int dstPos, int length) {
Platform.copyMemory(
src.getBaseObject(),
- src.getBaseOffset() + srcPos * 16,
+ src.getBaseOffset() + srcPos * 16L,
dst.getBaseObject(),
- dst.getBaseOffset() + dstPos * 16,
- length * 16);
+ dst.getBaseOffset() + dstPos * 16L,
+ length * 16L);
}
@Override
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index 4dd8e31c27..699f7fa1f2 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -17,12 +17,17 @@
package org.apache.spark.util.collection
+import java.util.Comparator
+
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import org.apache.spark._
import org.apache.spark.memory.MemoryTestingUtils
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
+import org.apache.spark.unsafe.array.LongArray
+import org.apache.spark.unsafe.memory.MemoryBlock
+import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordPointerAndKeyPrefix, UnsafeSortDataFormat}
class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
import TestUtils.{assertNotSpilled, assertSpilled}
@@ -93,6 +98,25 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
testWithMultipleSer("sort without breaking sorting contracts", loadDefaults = true)(
sortWithoutBreakingSortingContracts)
+ // This test is ignored by default as it requires a fairly large heap size (16GB)
+ ignore("sort without breaking timsort contracts for large arrays") {
+ val size = 300000000
+ // To manifest the bug observed in SPARK-8428 and SPARK-13850, we explicitly use an array of
+ // the form [150000000, 150000001, 150000002, ...., 300000000, 0, 1, 2, ..., 149999999]
+ // that can trigger copyRange() in TimSort.mergeLo() or TimSort.mergeHi()
+ val ref = Array.tabulate[Long](size) { i => if (i < size / 2) size / 2 + i else i }
+ val buf = new LongArray(MemoryBlock.fromLongArray(ref))
+
+ new Sorter(UnsafeSortDataFormat.INSTANCE).sort(
+ buf, 0, size, new Comparator[RecordPointerAndKeyPrefix] {
+ override def compare(
+ r1: RecordPointerAndKeyPrefix,
+ r2: RecordPointerAndKeyPrefix): Int = {
+ PrefixComparators.LONG.compare(r1.keyPrefix, r2.keyPrefix)
+ }
+ })
+ }
+
test("spilling with hash collisions") {
val size = 1000
val conf = createSparkConf(loadDefaults = true, kryo = false)