aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-09-02 22:15:54 -0700
committerAndrew Or <andrew@databricks.com>2015-09-02 22:15:54 -0700
commit62b4690d6b3016f41292b640ac28644ef31e299d (patch)
treea4a329779cb3741bf29c9bb2e723d204136ff468 /core
parent3ddb9b32335154e47890a0c761e0dfea3ccaac7b (diff)
downloadspark-62b4690d6b3016f41292b640ac28644ef31e299d.tar.gz
spark-62b4690d6b3016f41292b640ac28644ef31e299d.tar.bz2
spark-62b4690d6b3016f41292b640ac28644ef31e299d.zip
[SPARK-10379] preserve first page in UnsafeShuffleExternalSorter
Author: Davies Liu <davies@databricks.com> Closes #8543 from davies/preserve_page.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala2
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java5
3 files changed, 8 insertions, 3 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
index 3d1ef0c48a..e73ba39468 100644
--- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
@@ -122,6 +122,10 @@ final class UnsafeShuffleExternalSorter {
this.maxRecordSizeBytes = pageSizeBytes - 4;
this.writeMetrics = writeMetrics;
initializeForWriting();
+
+ // preserve first page to ensure that we have at least one page to work with. Otherwise,
+ // other operators in the same task may starve this sorter (SPARK-9709).
+ acquireNewPageIfNecessary(pageSizeBytes);
}
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
index 1f2213d0c4..417ff5278d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
@@ -41,7 +41,7 @@ private[spark] class MapPartitionsWithPreparationRDD[U: ClassTag, T: ClassTag, M
// In certain join operations, prepare can be called on the same partition multiple times.
// In this case, we need to ensure that each call to compute gets a separate prepare argument.
- private[this] var preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M]
+ private[this] val preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M]
/**
* Prepare a partition for a single call to compute.
diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
index 94650be536..a266b0c36e 100644
--- a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
@@ -530,8 +530,9 @@ public class UnsafeShuffleWriterSuite {
for (int i = 0; i < numRecordsPerPage * 10; i++) {
writer.insertRecordIntoSorter(new Tuple2<Object, Object>(1, 1));
newPeakMemory = writer.getPeakMemoryUsedBytes();
- if (i % numRecordsPerPage == 0) {
- // We allocated a new page for this record, so peak memory should change
+ if (i % numRecordsPerPage == 0 && i != 0) {
+ // The first page is allocated in constructor, another page will be allocated after
+ // every numRecordsPerPage records (peak memory should change).
assertEquals(previousPeakMemory + pageSizeBytes, newPeakMemory);
} else {
assertEquals(previousPeakMemory, newPeakMemory);