aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-10-13 13:49:59 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-10-13 13:49:59 -0700
commitb3ffac5178795f2d8e7908b3e77e8e89f50b5f6f (patch)
tree058d6885c0fffa8cfb496f5c4ed675f6a5345f75 /core/src/test
parent2b574f52d7bf51b1fe2a73086a3735b633e9083f (diff)
downloadspark-b3ffac5178795f2d8e7908b3e77e8e89f50b5f6f.tar.gz
spark-b3ffac5178795f2d8e7908b3e77e8e89f50b5f6f.tar.bz2
spark-b3ffac5178795f2d8e7908b3e77e8e89f50b5f6f.zip
[SPARK-10983] Unified memory manager
This patch unifies the memory management of the storage and execution regions such that either side can borrow memory from each other. When memory pressure arises, storage will be evicted in favor of execution. To avoid regressions in cases where storage is crucial, we dynamically allocate a fraction of space for storage that execution cannot evict. Several configurations are introduced: - **spark.memory.fraction (default 0.75)**: ​fraction of the heap space used for execution and storage. The lower this is, the more frequently spills and cached data eviction occur. The purpose of this config is to set aside memory for internal metadata, user data structures, and imprecise size estimation in the case of sparse, unusually large records. - **spark.memory.storageFraction (default 0.5)**: size of the storage region within the space set aside by `s​park.memory.fraction`. ​Cached data may only be evicted if total storage exceeds this region. - **spark.memory.useLegacyMode (default false)**: whether to use the memory management that existed in Spark 1.5 and before. This is mainly for backward compatibility. For a detailed description of the design, see [SPARK-10000](https://issues.apache.org/jira/browse/SPARK-10000). This patch builds on top of the `MemoryManager` interface introduced in #9000. Author: Andrew Or <andrew@databricks.com> Closes #9084 from andrewor14/unified-memory-manager.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala133
-rw-r--r--core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala105
-rw-r--r--core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala208
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala23
9 files changed, 402 insertions, 97 deletions
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 600c1403b0..34a4bb968e 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -213,11 +213,8 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
}
test("compute when only some partitions fit in memory") {
- val conf = new SparkConf().set("spark.storage.memoryFraction", "0.01")
- sc = new SparkContext(clusterUrl, "test", conf)
- // data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
- // to only 5 MB (0.01 of 512 MB), so not all of it will fit in memory; we use 20 partitions
- // to make sure that *some* of them do fit though
+ sc = new SparkContext(clusterUrl, "test", new SparkConf)
+ // TODO: verify that only a subset of partitions fit in memory (SPARK-11078)
val data = sc.parallelize(1 to 4000000, 20).persist(StorageLevel.MEMORY_ONLY_SER)
assert(data.count() === 4000000)
assert(data.count() === 4000000)
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index d91b799ecf..4a0877d86f 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -247,11 +247,13 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
.setMaster("local")
.set("spark.shuffle.spill.compress", shuffleSpillCompress.toString)
.set("spark.shuffle.compress", shuffleCompress.toString)
- .set("spark.shuffle.memoryFraction", "0.001")
resetSparkContext()
sc = new SparkContext(myConf)
+ val diskBlockManager = sc.env.blockManager.diskBlockManager
try {
- sc.parallelize(0 until 100000).map(i => (i / 4, i)).groupByKey().collect()
+ assert(diskBlockManager.getAllFiles().isEmpty)
+ sc.parallelize(0 until 10).map(i => (i / 4, i)).groupByKey().collect()
+ assert(diskBlockManager.getAllFiles().nonEmpty)
} catch {
case e: Exception =>
val errMsg = s"Failed with spark.shuffle.spill.compress=$shuffleSpillCompress," +
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
new file mode 100644
index 0000000000..36e4566310
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory
+
+import java.util.concurrent.atomic.AtomicLong
+
+import org.mockito.Matchers.{any, anyLong}
+import org.mockito.Mockito.{mock, when}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.storage.MemoryStore
+
+
+/**
+ * Helper trait for sharing code among [[MemoryManager]] tests.
+ */
+private[memory] trait MemoryManagerSuite extends SparkFunSuite {
+
+ import MemoryManagerSuite.DEFAULT_ENSURE_FREE_SPACE_CALLED
+
+ // Note: Mockito's verify mechanism does not provide a way to reset method call counts
+ // without also resetting stubbed methods. Since our test code relies on the latter,
+ // we need to use our own variable to track invocations of `ensureFreeSpace`.
+
+ /**
+ * The amount of free space requested in the last call to [[MemoryStore.ensureFreeSpace]]
+ *
+ * This set whenever [[MemoryStore.ensureFreeSpace]] is called, and cleared when the test
+ * code makes explicit assertions on this variable through [[assertEnsureFreeSpaceCalled]].
+ */
+ private val ensureFreeSpaceCalled = new AtomicLong(DEFAULT_ENSURE_FREE_SPACE_CALLED)
+
+ /**
+ * Make a mocked [[MemoryStore]] whose [[MemoryStore.ensureFreeSpace]] method is stubbed.
+ *
+ * This allows our test code to release storage memory when [[MemoryStore.ensureFreeSpace]]
+ * is called without relying on [[org.apache.spark.storage.BlockManager]] and all of its
+ * dependencies.
+ */
+ protected def makeMemoryStore(mm: MemoryManager): MemoryStore = {
+ val ms = mock(classOf[MemoryStore])
+ when(ms.ensureFreeSpace(anyLong(), any())).thenAnswer(ensureFreeSpaceAnswer(mm, 0))
+ when(ms.ensureFreeSpace(any(), anyLong(), any())).thenAnswer(ensureFreeSpaceAnswer(mm, 1))
+ mm.setMemoryStore(ms)
+ ms
+ }
+
+ /**
+ * Make an [[Answer]] that stubs [[MemoryStore.ensureFreeSpace]] with the right arguments.
+ */
+ private def ensureFreeSpaceAnswer(mm: MemoryManager, numBytesPos: Int): Answer[Boolean] = {
+ new Answer[Boolean] {
+ override def answer(invocation: InvocationOnMock): Boolean = {
+ val args = invocation.getArguments
+ require(args.size > numBytesPos, s"bad test: expected >$numBytesPos arguments " +
+ s"in ensureFreeSpace, found ${args.size}")
+ require(args(numBytesPos).isInstanceOf[Long], s"bad test: expected ensureFreeSpace " +
+ s"argument at index $numBytesPos to be a Long: ${args.mkString(", ")}")
+ val numBytes = args(numBytesPos).asInstanceOf[Long]
+ mockEnsureFreeSpace(mm, numBytes)
+ }
+ }
+ }
+
+ /**
+ * Simulate the part of [[MemoryStore.ensureFreeSpace]] that releases storage memory.
+ *
+ * This is a significant simplification of the real method, which actually drops existing
+ * blocks based on the size of each block. Instead, here we simply release as many bytes
+ * as needed to ensure the requested amount of free space. This allows us to set up the
+ * test without relying on the [[org.apache.spark.storage.BlockManager]], which brings in
+ * many other dependencies.
+ *
+ * Every call to this method will set a global variable, [[ensureFreeSpaceCalled]], that
+ * records the number of bytes this is called with. This variable is expected to be cleared
+ * by the test code later through [[assertEnsureFreeSpaceCalled]].
+ */
+ private def mockEnsureFreeSpace(mm: MemoryManager, numBytes: Long): Boolean = mm.synchronized {
+ require(ensureFreeSpaceCalled.get() === DEFAULT_ENSURE_FREE_SPACE_CALLED,
+ "bad test: ensure free space variable was not reset")
+ // Record the number of bytes we freed this call
+ ensureFreeSpaceCalled.set(numBytes)
+ if (numBytes <= mm.maxStorageMemory) {
+ def freeMemory = mm.maxStorageMemory - mm.storageMemoryUsed
+ val spaceToRelease = numBytes - freeMemory
+ if (spaceToRelease > 0) {
+ mm.releaseStorageMemory(spaceToRelease)
+ }
+ freeMemory >= numBytes
+ } else {
+ // We attempted to free more bytes than our max allowable memory
+ false
+ }
+ }
+
+ /**
+ * Assert that [[MemoryStore.ensureFreeSpace]] is called with the given parameters.
+ */
+ protected def assertEnsureFreeSpaceCalled(ms: MemoryStore, numBytes: Long): Unit = {
+ assert(ensureFreeSpaceCalled.get() === numBytes,
+ s"expected ensure free space to be called with $numBytes")
+ ensureFreeSpaceCalled.set(DEFAULT_ENSURE_FREE_SPACE_CALLED)
+ }
+
+ /**
+ * Assert that [[MemoryStore.ensureFreeSpace]] is NOT called.
+ */
+ protected def assertEnsureFreeSpaceNotCalled[T](ms: MemoryStore): Unit = {
+ assert(ensureFreeSpaceCalled.get() === DEFAULT_ENSURE_FREE_SPACE_CALLED,
+ "ensure free space should not have been called!")
+ }
+}
+
+private object MemoryManagerSuite {
+ private val DEFAULT_ENSURE_FREE_SPACE_CALLED = -1L
+}
diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index c436a8b5c9..6cae1f871e 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -19,32 +19,44 @@ package org.apache.spark.memory
import scala.collection.mutable.ArrayBuffer
-import org.mockito.Mockito.{mock, reset, verify, when}
-import org.mockito.Matchers.{any, eq => meq}
+import org.mockito.Mockito.when
+import org.apache.spark.SparkConf
import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, TestBlockId}
-import org.apache.spark.{SparkConf, SparkFunSuite}
-class StaticMemoryManagerSuite extends SparkFunSuite {
+class StaticMemoryManagerSuite extends MemoryManagerSuite {
private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4")
+ private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+
+ /**
+ * Make a [[StaticMemoryManager]] and a [[MemoryStore]] with limited class dependencies.
+ */
+ private def makeThings(
+ maxExecutionMem: Long,
+ maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = {
+ val mm = new StaticMemoryManager(
+ conf, maxExecutionMemory = maxExecutionMem, maxStorageMemory = maxStorageMem)
+ val ms = makeMemoryStore(mm)
+ (mm, ms)
+ }
test("basic execution memory") {
val maxExecutionMem = 1000L
val (mm, _) = makeThings(maxExecutionMem, Long.MaxValue)
assert(mm.executionMemoryUsed === 0L)
- assert(mm.acquireExecutionMemory(10L) === 10L)
+ assert(mm.acquireExecutionMemory(10L, evictedBlocks) === 10L)
assert(mm.executionMemoryUsed === 10L)
- assert(mm.acquireExecutionMemory(100L) === 100L)
+ assert(mm.acquireExecutionMemory(100L, evictedBlocks) === 100L)
// Acquire up to the max
- assert(mm.acquireExecutionMemory(1000L) === 890L)
+ assert(mm.acquireExecutionMemory(1000L, evictedBlocks) === 890L)
assert(mm.executionMemoryUsed === maxExecutionMem)
- assert(mm.acquireExecutionMemory(1L) === 0L)
+ assert(mm.acquireExecutionMemory(1L, evictedBlocks) === 0L)
assert(mm.executionMemoryUsed === maxExecutionMem)
mm.releaseExecutionMemory(800L)
assert(mm.executionMemoryUsed === 200L)
// Acquire after release
- assert(mm.acquireExecutionMemory(1L) === 1L)
+ assert(mm.acquireExecutionMemory(1L, evictedBlocks) === 1L)
assert(mm.executionMemoryUsed === 201L)
// Release beyond what was acquired
mm.releaseExecutionMemory(maxExecutionMem)
@@ -54,37 +66,36 @@ class StaticMemoryManagerSuite extends SparkFunSuite {
test("basic storage memory") {
val maxStorageMem = 1000L
val dummyBlock = TestBlockId("you can see the world you brought to live")
- val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
assert(mm.storageMemoryUsed === 0L)
assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks))
// `ensureFreeSpace` should be called with the number of bytes requested
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 10L)
+ assertEnsureFreeSpaceCalled(ms, 10L)
assert(mm.storageMemoryUsed === 10L)
- assert(evictedBlocks.isEmpty)
assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 100L)
+ assertEnsureFreeSpaceCalled(ms, 100L)
assert(mm.storageMemoryUsed === 110L)
- // Acquire up to the max, not granted
- assert(!mm.acquireStorageMemory(dummyBlock, 1000L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 1000L)
+ // Acquire more than the max, not granted
+ assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, maxStorageMem + 1L)
assert(mm.storageMemoryUsed === 110L)
- assert(mm.acquireStorageMemory(dummyBlock, 890L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 890L)
+ // Acquire up to the max, requests after this are still granted due to LRU eviction
+ assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 1000L)
assert(mm.storageMemoryUsed === 1000L)
- assert(!mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L)
+ assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 1L)
assert(mm.storageMemoryUsed === 1000L)
mm.releaseStorageMemory(800L)
assert(mm.storageMemoryUsed === 200L)
// Acquire after release
assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L)
+ assertEnsureFreeSpaceCalled(ms, 1L)
assert(mm.storageMemoryUsed === 201L)
- mm.releaseStorageMemory()
+ mm.releaseAllStorageMemory()
assert(mm.storageMemoryUsed === 0L)
assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L)
+ assertEnsureFreeSpaceCalled(ms, 1L)
assert(mm.storageMemoryUsed === 1L)
// Release beyond what was acquired
mm.releaseStorageMemory(100L)
@@ -95,18 +106,17 @@ class StaticMemoryManagerSuite extends SparkFunSuite {
val maxExecutionMem = 200L
val maxStorageMem = 1000L
val dummyBlock = TestBlockId("ain't nobody love like you do")
- val dummyBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val (mm, ms) = makeThings(maxExecutionMem, maxStorageMem)
// Only execution memory should increase
- assert(mm.acquireExecutionMemory(100L) === 100L)
+ assert(mm.acquireExecutionMemory(100L, evictedBlocks) === 100L)
assert(mm.storageMemoryUsed === 0L)
assert(mm.executionMemoryUsed === 100L)
- assert(mm.acquireExecutionMemory(1000L) === 100L)
+ assert(mm.acquireExecutionMemory(1000L, evictedBlocks) === 100L)
assert(mm.storageMemoryUsed === 0L)
assert(mm.executionMemoryUsed === 200L)
// Only storage memory should increase
- assert(mm.acquireStorageMemory(dummyBlock, 50L, dummyBlocks))
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 50L)
+ assert(mm.acquireStorageMemory(dummyBlock, 50L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 50L)
assert(mm.storageMemoryUsed === 50L)
assert(mm.executionMemoryUsed === 200L)
// Only execution memory should be released
@@ -114,7 +124,7 @@ class StaticMemoryManagerSuite extends SparkFunSuite {
assert(mm.storageMemoryUsed === 50L)
assert(mm.executionMemoryUsed === 67L)
// Only storage memory should be released
- mm.releaseStorageMemory()
+ mm.releaseAllStorageMemory()
assert(mm.storageMemoryUsed === 0L)
assert(mm.executionMemoryUsed === 67L)
}
@@ -122,51 +132,26 @@ class StaticMemoryManagerSuite extends SparkFunSuite {
test("unroll memory") {
val maxStorageMem = 1000L
val dummyBlock = TestBlockId("lonely water")
- val dummyBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
- assert(mm.acquireUnrollMemory(dummyBlock, 100L, dummyBlocks))
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 100L)
+ assert(mm.acquireUnrollMemory(dummyBlock, 100L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 100L)
assert(mm.storageMemoryUsed === 100L)
mm.releaseUnrollMemory(40L)
assert(mm.storageMemoryUsed === 60L)
when(ms.currentUnrollMemory).thenReturn(60L)
- assert(mm.acquireUnrollMemory(dummyBlock, 500L, dummyBlocks))
+ assert(mm.acquireUnrollMemory(dummyBlock, 500L, evictedBlocks))
// `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 bytes.
// Since we already occupy 60 bytes, we will try to ensure only 400 - 60 = 340 bytes.
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 340L)
+ assertEnsureFreeSpaceCalled(ms, 340L)
assert(mm.storageMemoryUsed === 560L)
when(ms.currentUnrollMemory).thenReturn(560L)
- assert(!mm.acquireUnrollMemory(dummyBlock, 800L, dummyBlocks))
+ assert(!mm.acquireUnrollMemory(dummyBlock, 800L, evictedBlocks))
assert(mm.storageMemoryUsed === 560L)
// We already have 560 bytes > the max unroll space of 400 bytes, so no bytes are freed
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 0L)
+ assertEnsureFreeSpaceCalled(ms, 0L)
// Release beyond what was acquired
mm.releaseUnrollMemory(maxStorageMem)
assert(mm.storageMemoryUsed === 0L)
}
- /**
- * Make a [[StaticMemoryManager]] and a [[MemoryStore]] with limited class dependencies.
- */
- private def makeThings(
- maxExecutionMem: Long,
- maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = {
- val mm = new StaticMemoryManager(
- conf, maxExecutionMemory = maxExecutionMem, maxStorageMemory = maxStorageMem)
- val ms = mock(classOf[MemoryStore])
- mm.setMemoryStore(ms)
- (mm, ms)
- }
-
- /**
- * Assert that [[MemoryStore.ensureFreeSpace]] is called with the given parameters.
- */
- private def assertEnsureFreeSpaceCalled(
- ms: MemoryStore,
- blockId: BlockId,
- numBytes: Long): Unit = {
- verify(ms).ensureFreeSpace(meq(blockId), meq(numBytes: java.lang.Long), any())
- reset(ms)
- }
-
}
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
new file mode 100644
index 0000000000..e7baa50dc2
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.scalatest.PrivateMethodTester
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, TestBlockId}
+
+
+class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTester {
+ private val conf = new SparkConf().set("spark.memory.storageFraction", "0.5")
+ private val dummyBlock = TestBlockId("--")
+ private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+
+ /**
+ * Make a [[UnifiedMemoryManager]] and a [[MemoryStore]] with limited class dependencies.
+ */
+ private def makeThings(maxMemory: Long): (UnifiedMemoryManager, MemoryStore) = {
+ val mm = new UnifiedMemoryManager(conf, maxMemory)
+ val ms = makeMemoryStore(mm)
+ (mm, ms)
+ }
+
+ private def getStorageRegionSize(mm: UnifiedMemoryManager): Long = {
+ mm invokePrivate PrivateMethod[Long]('storageRegionSize)()
+ }
+
+ test("storage region size") {
+ val maxMemory = 1000L
+ val (mm, _) = makeThings(maxMemory)
+ val storageFraction = conf.get("spark.memory.storageFraction").toDouble
+ val expectedStorageRegionSize = maxMemory * storageFraction
+ val actualStorageRegionSize = getStorageRegionSize(mm)
+ assert(expectedStorageRegionSize === actualStorageRegionSize)
+ }
+
+ test("basic execution memory") {
+ val maxMemory = 1000L
+ val (mm, _) = makeThings(maxMemory)
+ assert(mm.executionMemoryUsed === 0L)
+ assert(mm.acquireExecutionMemory(10L, evictedBlocks) === 10L)
+ assert(mm.executionMemoryUsed === 10L)
+ assert(mm.acquireExecutionMemory(100L, evictedBlocks) === 100L)
+ // Acquire up to the max
+ assert(mm.acquireExecutionMemory(1000L, evictedBlocks) === 890L)
+ assert(mm.executionMemoryUsed === maxMemory)
+ assert(mm.acquireExecutionMemory(1L, evictedBlocks) === 0L)
+ assert(mm.executionMemoryUsed === maxMemory)
+ mm.releaseExecutionMemory(800L)
+ assert(mm.executionMemoryUsed === 200L)
+ // Acquire after release
+ assert(mm.acquireExecutionMemory(1L, evictedBlocks) === 1L)
+ assert(mm.executionMemoryUsed === 201L)
+ // Release beyond what was acquired
+ mm.releaseExecutionMemory(maxMemory)
+ assert(mm.executionMemoryUsed === 0L)
+ }
+
+ test("basic storage memory") {
+ val maxMemory = 1000L
+ val (mm, ms) = makeThings(maxMemory)
+ assert(mm.storageMemoryUsed === 0L)
+ assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks))
+ // `ensureFreeSpace` should be called with the number of bytes requested
+ assertEnsureFreeSpaceCalled(ms, 10L)
+ assert(mm.storageMemoryUsed === 10L)
+ assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 100L)
+ assert(mm.storageMemoryUsed === 110L)
+ // Acquire more than the max, not granted
+ assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, maxMemory + 1L)
+ assert(mm.storageMemoryUsed === 110L)
+ // Acquire up to the max, requests after this are still granted due to LRU eviction
+ assert(mm.acquireStorageMemory(dummyBlock, maxMemory, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 1000L)
+ assert(mm.storageMemoryUsed === 1000L)
+ assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 1L)
+ assert(mm.storageMemoryUsed === 1000L)
+ mm.releaseStorageMemory(800L)
+ assert(mm.storageMemoryUsed === 200L)
+ // Acquire after release
+ assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 1L)
+ assert(mm.storageMemoryUsed === 201L)
+ mm.releaseAllStorageMemory()
+ assert(mm.storageMemoryUsed === 0L)
+ assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 1L)
+ assert(mm.storageMemoryUsed === 1L)
+ // Release beyond what was acquired
+ mm.releaseStorageMemory(100L)
+ assert(mm.storageMemoryUsed === 0L)
+ }
+
+ test("execution evicts storage") {
+ val maxMemory = 1000L
+ val (mm, ms) = makeThings(maxMemory)
+ // First, ensure the test classes are set up as expected
+ val expectedStorageRegionSize = 500L
+ val expectedExecutionRegionSize = 500L
+ val storageRegionSize = getStorageRegionSize(mm)
+ val executionRegionSize = maxMemory - expectedStorageRegionSize
+ require(storageRegionSize === expectedStorageRegionSize,
+ "bad test: storage region size is unexpected")
+ require(executionRegionSize === expectedExecutionRegionSize,
+ "bad test: storage region size is unexpected")
+ // Acquire enough storage memory to exceed the storage region
+ assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 750L)
+ assert(mm.executionMemoryUsed === 0L)
+ assert(mm.storageMemoryUsed === 750L)
+ require(mm.storageMemoryUsed > storageRegionSize,
+ s"bad test: storage memory used should exceed the storage region")
+ // Execution needs to request 250 bytes to evict storage memory
+ assert(mm.acquireExecutionMemory(100L, evictedBlocks) === 100L)
+ assert(mm.executionMemoryUsed === 100L)
+ assert(mm.storageMemoryUsed === 750L)
+ assertEnsureFreeSpaceNotCalled(ms)
+ // Execution wants 200 bytes but only 150 are free, so storage is evicted
+ assert(mm.acquireExecutionMemory(200L, evictedBlocks) === 200L)
+ assertEnsureFreeSpaceCalled(ms, 200L)
+ assert(mm.executionMemoryUsed === 300L)
+ mm.releaseAllStorageMemory()
+ require(mm.executionMemoryUsed < executionRegionSize,
+ s"bad test: execution memory used should be within the execution region")
+ require(mm.storageMemoryUsed === 0, "bad test: all storage memory should have been released")
+ // Acquire some storage memory again, but this time keep it within the storage region
+ assert(mm.acquireStorageMemory(dummyBlock, 400L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 400L)
+ require(mm.storageMemoryUsed < storageRegionSize,
+ s"bad test: storage memory used should be within the storage region")
+ // Execution cannot evict storage because the latter is within the storage fraction,
+ // so grant only what's remaining without evicting anything, i.e. 1000 - 300 - 400 = 300
+ assert(mm.acquireExecutionMemory(400L, evictedBlocks) === 300L)
+ assert(mm.executionMemoryUsed === 600L)
+ assert(mm.storageMemoryUsed === 400L)
+ assertEnsureFreeSpaceNotCalled(ms)
+ }
+
+ test("storage does not evict execution") {
+ val maxMemory = 1000L
+ val (mm, ms) = makeThings(maxMemory)
+ // First, ensure the test classes are set up as expected
+ val expectedStorageRegionSize = 500L
+ val expectedExecutionRegionSize = 500L
+ val storageRegionSize = getStorageRegionSize(mm)
+ val executionRegionSize = maxMemory - expectedStorageRegionSize
+ require(storageRegionSize === expectedStorageRegionSize,
+ "bad test: storage region size is unexpected")
+ require(executionRegionSize === expectedExecutionRegionSize,
+ "bad test: storage region size is unexpected")
+ // Acquire enough execution memory to exceed the execution region
+ assert(mm.acquireExecutionMemory(800L, evictedBlocks) === 800L)
+ assert(mm.executionMemoryUsed === 800L)
+ assert(mm.storageMemoryUsed === 0L)
+ assertEnsureFreeSpaceNotCalled(ms)
+ require(mm.executionMemoryUsed > executionRegionSize,
+ s"bad test: execution memory used should exceed the execution region")
+ // Storage should not be able to evict execution
+ assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
+ assert(mm.executionMemoryUsed === 800L)
+ assert(mm.storageMemoryUsed === 100L)
+ assertEnsureFreeSpaceCalled(ms, 100L)
+ assert(!mm.acquireStorageMemory(dummyBlock, 250L, evictedBlocks))
+ assert(mm.executionMemoryUsed === 800L)
+ assert(mm.storageMemoryUsed === 100L)
+ assertEnsureFreeSpaceCalled(ms, 250L)
+ mm.releaseExecutionMemory(maxMemory)
+ mm.releaseStorageMemory(maxMemory)
+ // Acquire some execution memory again, but this time keep it within the execution region
+ assert(mm.acquireExecutionMemory(200L, evictedBlocks) === 200L)
+ assert(mm.executionMemoryUsed === 200L)
+ assert(mm.storageMemoryUsed === 0L)
+ assertEnsureFreeSpaceNotCalled(ms)
+ require(mm.executionMemoryUsed < executionRegionSize,
+ s"bad test: execution memory used should be within the execution region")
+ // Storage should still not be able to evict execution
+ assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
+ assert(mm.executionMemoryUsed === 200L)
+ assert(mm.storageMemoryUsed === 750L)
+ assertEnsureFreeSpaceCalled(ms, 750L)
+ assert(!mm.acquireStorageMemory(dummyBlock, 850L, evictedBlocks))
+ assert(mm.executionMemoryUsed === 200L)
+ assert(mm.storageMemoryUsed === 750L)
+ assertEnsureFreeSpaceCalled(ms, 850L)
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
index 6d45b1a101..5877aa042d 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
@@ -24,7 +24,8 @@ import org.mockito.Mockito._
import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._
-import org.apache.spark.{SparkConf, SparkFunSuite, TaskContext}
+import org.apache.spark.{SparkFunSuite, TaskContext}
+import org.apache.spark.executor.TaskMetrics
class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
@@ -37,7 +38,9 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
try {
val taskAttemptId = nextTaskAttemptId.getAndIncrement
val mockTaskContext = mock(classOf[TaskContext], RETURNS_SMART_NULLS)
+ val taskMetrics = new TaskMetrics
when(mockTaskContext.taskAttemptId()).thenReturn(taskAttemptId)
+ when(mockTaskContext.taskMetrics()).thenReturn(taskMetrics)
TaskContext.setTaskContext(mockTaskContext)
body
} finally {
diff --git a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala
index 6351539e91..259020a2dd 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala
@@ -36,9 +36,6 @@ class UnsafeShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
override def beforeAll() {
conf.set("spark.shuffle.manager", "tungsten-sort")
- // UnsafeShuffleManager requires at least 128 MB of memory per task in order to be able to sort
- // shuffle records.
- conf.set("spark.shuffle.memoryFraction", "0.5")
}
test("UnsafeShuffleManager properly cleans up files for shuffles that use the new shuffle path") {
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 12e9bafcc9..0a03c32c64 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -22,6 +22,8 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark._
import org.apache.spark.io.CompressionCodec
+// TODO: some of these spilling tests probably aren't actually spilling (SPARK-11078)
+
class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS
private def createCombiner[T](i: T) = ArrayBuffer[T](i)
@@ -243,7 +245,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
*/
private def testSimpleSpilling(codec: Option[String] = None): Unit = {
val conf = createSparkConf(loadDefaults = true, codec) // Load defaults for Spark home
- conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
// reduceByKey - should spill ~8 times
@@ -291,7 +292,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with hash collisions") {
val conf = createSparkConf(loadDefaults = true)
- conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val map = createExternalMap[String]
@@ -340,7 +340,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with many hash collisions") {
val conf = createSparkConf(loadDefaults = true)
- conf.set("spark.shuffle.memoryFraction", "0.0001")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val map = new ExternalAppendOnlyMap[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)
@@ -365,7 +364,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with hash collisions using the Int.MaxValue key") {
val conf = createSparkConf(loadDefaults = true)
- conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val map = createExternalMap[Int]
@@ -382,7 +380,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with null keys and values") {
val conf = createSparkConf(loadDefaults = true)
- conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val map = createExternalMap[Int]
@@ -401,8 +398,8 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
test("external aggregation updates peak execution memory") {
val conf = createSparkConf(loadDefaults = false)
- .set("spark.shuffle.memoryFraction", "0.001")
.set("spark.shuffle.manager", "hash") // make sure we're not also using ExternalSorter
+ .set("spark.testing.memory", (10 * 1024 * 1024).toString)
sc = new SparkContext("local", "test", conf)
// No spilling
AccumulatorSuite.verifyPeakExecutionMemorySet(sc, "external map without spilling") {
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index bdb0f4d507..651c7eaa65 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -24,6 +24,8 @@ import scala.util.Random
import org.apache.spark._
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
+// TODO: some of these spilling tests probably aren't actually spilling (SPARK-11078)
+
class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
private def createSparkConf(loadDefaults: Boolean, kryo: Boolean): SparkConf = {
val conf = new SparkConf(loadDefaults)
@@ -38,6 +40,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
conf.set("spark.shuffle.sort.bypassMergeThreshold", "0")
// Ensure that we actually have multiple batches per spill file
conf.set("spark.shuffle.spill.batchSize", "10")
+ conf.set("spark.testing.memory", "2000000")
conf
}
@@ -50,7 +53,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def emptyDataStream(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -91,7 +93,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def fewElementsPerPartition(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -140,7 +141,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def emptyPartitionsWithSpilling(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -174,7 +174,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def testSpillingInLocalCluster(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
@@ -252,7 +251,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def spillingInLocalClusterWithManyReduceTasks(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
@@ -323,7 +321,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("cleanup of intermediate files in sorter") {
val conf = createSparkConf(true, false) // Load defaults, otherwise SPARK_HOME is not found
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
@@ -348,7 +345,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("cleanup of intermediate files in sorter if there are errors") {
val conf = createSparkConf(true, false) // Load defaults, otherwise SPARK_HOME is not found
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
@@ -372,7 +368,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("cleanup of intermediate files in shuffle") {
val conf = createSparkConf(false, false)
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
@@ -387,7 +382,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("cleanup of intermediate files in shuffle with errors") {
val conf = createSparkConf(false, false)
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
@@ -416,7 +410,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def noPartialAggregationOrSorting(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -438,7 +431,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def partialAggregationWithoutSpill(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -461,7 +453,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def partialAggregationWIthSpillNoOrdering(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -485,7 +476,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def partialAggregationWithSpillWithOrdering(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -512,7 +502,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def sortingWithoutAggregationNoSpill(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -536,7 +525,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def sortingWithoutAggregationWithSpill(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -553,7 +541,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with hash collisions") {
val conf = createSparkConf(true, false)
- conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
def createCombiner(i: String): ArrayBuffer[String] = ArrayBuffer[String](i)
@@ -610,7 +597,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with many hash collisions") {
val conf = createSparkConf(true, false)
- conf.set("spark.shuffle.memoryFraction", "0.0001")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val agg = new Aggregator[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)
@@ -633,7 +619,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with hash collisions using the Int.MaxValue key") {
val conf = createSparkConf(true, false)
- conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
def createCombiner(i: Int): ArrayBuffer[Int] = ArrayBuffer[Int](i)
@@ -657,7 +642,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with null keys and values") {
val conf = createSparkConf(true, false)
- conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
def createCombiner(i: String): ArrayBuffer[String] = ArrayBuffer[String](i)
@@ -693,7 +677,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
private def sortWithoutBreakingSortingContracts(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.01")
conf.set("spark.shuffle.manager", "sort")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)