aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache/spark
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache/spark')
-rw-r--r--core/src/test/scala/org/apache/spark/FailureSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/memory/GrantEverythingMemoryManager.scala54
-rw-r--r--core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala134
-rw-r--r--core/src/test/scala/org/apache/spark/memory/MemoryTestingUtils.scala37
-rw-r--r--core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala24
-rw-r--r--core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala26
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala326
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala60
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala48
11 files changed, 323 insertions, 402 deletions
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala
index f58756e6f6..0242cbc924 100644
--- a/core/src/test/scala/org/apache/spark/FailureSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -149,7 +149,7 @@ class FailureSuite extends SparkFunSuite with LocalSparkContext {
// cause is preserved
val thrownDueToTaskFailure = intercept[SparkException] {
sc.parallelize(Seq(0)).mapPartitions { iter =>
- TaskContext.get().taskMemoryManager().allocate(128)
+ TaskContext.get().taskMemoryManager().allocatePage(128)
throw new Exception("intentional task failure")
iter
}.count()
@@ -159,7 +159,7 @@ class FailureSuite extends SparkFunSuite with LocalSparkContext {
// If the task succeeded but memory was leaked, then the task should fail due to that leak
val thrownDueToMemoryLeak = intercept[SparkException] {
sc.parallelize(Seq(0)).mapPartitions { iter =>
- TaskContext.get().taskMemoryManager().allocate(128)
+ TaskContext.get().taskMemoryManager().allocatePage(128)
iter
}.count()
}
diff --git a/core/src/test/scala/org/apache/spark/memory/GrantEverythingMemoryManager.scala b/core/src/test/scala/org/apache/spark/memory/GrantEverythingMemoryManager.scala
new file mode 100644
index 0000000000..fe102d8aeb
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/memory/GrantEverythingMemoryManager.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.{BlockStatus, BlockId}
+
+class GrantEverythingMemoryManager(conf: SparkConf) extends MemoryManager(conf, numCores = 1) {
+ private[memory] override def doAcquireExecutionMemory(
+ numBytes: Long,
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = synchronized {
+ if (oom) {
+ oom = false
+ 0
+ } else {
+ _executionMemoryUsed += numBytes // To suppress warnings when freeing unallocated memory
+ numBytes
+ }
+ }
+ override def acquireStorageMemory(
+ blockId: BlockId,
+ numBytes: Long,
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true
+ override def acquireUnrollMemory(
+ blockId: BlockId,
+ numBytes: Long,
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true
+ override def releaseStorageMemory(numBytes: Long): Unit = { }
+ override def maxExecutionMemory: Long = Long.MaxValue
+ override def maxStorageMemory: Long = Long.MaxValue
+
+ private var oom = false
+
+ def markExecutionAsOutOfMemory(): Unit = {
+ oom = true
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index 36e4566310..1265087743 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -19,10 +19,14 @@ package org.apache.spark.memory
import java.util.concurrent.atomic.AtomicLong
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Future}
+
import org.mockito.Matchers.{any, anyLong}
import org.mockito.Mockito.{mock, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
+import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkFunSuite
import org.apache.spark.storage.MemoryStore
@@ -126,6 +130,136 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
assert(ensureFreeSpaceCalled.get() === DEFAULT_ENSURE_FREE_SPACE_CALLED,
"ensure free space should not have been called!")
}
+
+ /**
+ * Create a MemoryManager with the specified execution memory limit and no storage memory.
+ */
+ protected def createMemoryManager(maxExecutionMemory: Long): MemoryManager
+
+ // -- Tests of sharing of execution memory between tasks ----------------------------------------
+ // Prior to Spark 1.6, these tests were part of ShuffleMemoryManagerSuite.
+
+ implicit val ec = ExecutionContext.global
+
+ test("single task requesting execution memory") {
+ val manager = createMemoryManager(1000L)
+ val taskMemoryManager = new TaskMemoryManager(manager, 0)
+
+ assert(taskMemoryManager.acquireExecutionMemory(100L) === 100L)
+ assert(taskMemoryManager.acquireExecutionMemory(400L) === 400L)
+ assert(taskMemoryManager.acquireExecutionMemory(400L) === 400L)
+ assert(taskMemoryManager.acquireExecutionMemory(200L) === 100L)
+ assert(taskMemoryManager.acquireExecutionMemory(100L) === 0L)
+ assert(taskMemoryManager.acquireExecutionMemory(100L) === 0L)
+
+ taskMemoryManager.releaseExecutionMemory(500L)
+ assert(taskMemoryManager.acquireExecutionMemory(300L) === 300L)
+ assert(taskMemoryManager.acquireExecutionMemory(300L) === 200L)
+
+ taskMemoryManager.cleanUpAllAllocatedMemory()
+ assert(taskMemoryManager.acquireExecutionMemory(1000L) === 1000L)
+ assert(taskMemoryManager.acquireExecutionMemory(100L) === 0L)
+ }
+
+ test("two tasks requesting full execution memory") {
+ val memoryManager = createMemoryManager(1000L)
+ val t1MemManager = new TaskMemoryManager(memoryManager, 1)
+ val t2MemManager = new TaskMemoryManager(memoryManager, 2)
+ val futureTimeout: Duration = 20.seconds
+
+ // Have both tasks request 500 bytes, then wait until both requests have been granted:
+ val t1Result1 = Future { t1MemManager.acquireExecutionMemory(500L) }
+ val t2Result1 = Future { t2MemManager.acquireExecutionMemory(500L) }
+ assert(Await.result(t1Result1, futureTimeout) === 500L)
+ assert(Await.result(t2Result1, futureTimeout) === 500L)
+
+ // Have both tasks each request 500 bytes more; both should immediately return 0 as they are
+ // both now at 1 / N
+ val t1Result2 = Future { t1MemManager.acquireExecutionMemory(500L) }
+ val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L) }
+ assert(Await.result(t1Result2, 200.millis) === 0L)
+ assert(Await.result(t2Result2, 200.millis) === 0L)
+ }
+
+ test("two tasks cannot grow past 1 / N of execution memory") {
+ val memoryManager = createMemoryManager(1000L)
+ val t1MemManager = new TaskMemoryManager(memoryManager, 1)
+ val t2MemManager = new TaskMemoryManager(memoryManager, 2)
+ val futureTimeout: Duration = 20.seconds
+
+ // Have both tasks request 250 bytes, then wait until both requests have been granted:
+ val t1Result1 = Future { t1MemManager.acquireExecutionMemory(250L) }
+ val t2Result1 = Future { t2MemManager.acquireExecutionMemory(250L) }
+ assert(Await.result(t1Result1, futureTimeout) === 250L)
+ assert(Await.result(t2Result1, futureTimeout) === 250L)
+
+ // Have both tasks each request 500 bytes more.
+ // We should only grant 250 bytes to each of them on this second request
+ val t1Result2 = Future { t1MemManager.acquireExecutionMemory(500L) }
+ val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L) }
+ assert(Await.result(t1Result2, futureTimeout) === 250L)
+ assert(Await.result(t2Result2, futureTimeout) === 250L)
+ }
+
+ test("tasks can block to get at least 1 / 2N of execution memory") {
+ val memoryManager = createMemoryManager(1000L)
+ val t1MemManager = new TaskMemoryManager(memoryManager, 1)
+ val t2MemManager = new TaskMemoryManager(memoryManager, 2)
+ val futureTimeout: Duration = 20.seconds
+
+ // t1 grabs 1000 bytes and then waits until t2 is ready to make a request.
+ val t1Result1 = Future { t1MemManager.acquireExecutionMemory(1000L) }
+ assert(Await.result(t1Result1, futureTimeout) === 1000L)
+ val t2Result1 = Future { t2MemManager.acquireExecutionMemory(250L) }
+ // Make sure that t2 didn't grab the memory right away. This is hacky but it would be difficult
+ // to make sure the other thread blocks for some time otherwise.
+ Thread.sleep(300)
+ t1MemManager.releaseExecutionMemory(250L)
+ // The memory freed from t1 should now be granted to t2.
+ assert(Await.result(t2Result1, futureTimeout) === 250L)
+ // Further requests by t2 should be denied immediately because it now has 1 / 2N of the memory.
+ val t2Result2 = Future { t2MemManager.acquireExecutionMemory(100L) }
+ assert(Await.result(t2Result2, 200.millis) === 0L)
+ }
+
+ test("TaskMemoryManager.cleanUpAllAllocatedMemory") {
+ val memoryManager = createMemoryManager(1000L)
+ val t1MemManager = new TaskMemoryManager(memoryManager, 1)
+ val t2MemManager = new TaskMemoryManager(memoryManager, 2)
+ val futureTimeout: Duration = 20.seconds
+
+ // t1 grabs 1000 bytes and then waits until t2 is ready to make a request.
+ val t1Result1 = Future { t1MemManager.acquireExecutionMemory(1000L) }
+ assert(Await.result(t1Result1, futureTimeout) === 1000L)
+ val t2Result1 = Future { t2MemManager.acquireExecutionMemory(500L) }
+ // Make sure that t2 didn't grab the memory right away. This is hacky but it would be difficult
+ // to make sure the other thread blocks for some time otherwise.
+ Thread.sleep(300)
+ // t1 releases all of its memory, so t2 should be able to grab all of the memory
+ t1MemManager.cleanUpAllAllocatedMemory()
+ assert(Await.result(t2Result1, futureTimeout) === 500L)
+ val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L) }
+ assert(Await.result(t2Result2, futureTimeout) === 500L)
+ val t2Result3 = Future { t2MemManager.acquireExecutionMemory(500L) }
+ assert(Await.result(t2Result3, 200.millis) === 0L)
+ }
+
+ test("tasks should not be granted a negative amount of execution memory") {
+ // This is a regression test for SPARK-4715.
+ val memoryManager = createMemoryManager(1000L)
+ val t1MemManager = new TaskMemoryManager(memoryManager, 1)
+ val t2MemManager = new TaskMemoryManager(memoryManager, 2)
+ val futureTimeout: Duration = 20.seconds
+
+ val t1Result1 = Future { t1MemManager.acquireExecutionMemory(700L) }
+ assert(Await.result(t1Result1, futureTimeout) === 700L)
+
+ val t2Result1 = Future { t2MemManager.acquireExecutionMemory(300L) }
+ assert(Await.result(t2Result1, futureTimeout) === 300L)
+
+ val t1Result2 = Future { t1MemManager.acquireExecutionMemory(300L) }
+ assert(Await.result(t1Result2, 200.millis) === 0L)
+ }
}
private object MemoryManagerSuite {
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryTestingUtils.scala b/core/src/test/scala/org/apache/spark/memory/MemoryTestingUtils.scala
new file mode 100644
index 0000000000..4b4c3b0311
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryTestingUtils.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory
+
+import org.apache.spark.{SparkEnv, TaskContextImpl, TaskContext}
+
+/**
+ * Helper methods for mocking out memory-management-related classes in tests.
+ */
+object MemoryTestingUtils {
+ def fakeTaskContext(env: SparkEnv): TaskContext = {
+ val taskMemoryManager = new TaskMemoryManager(env.memoryManager, 0)
+ new TaskContextImpl(
+ stageId = 0,
+ partitionId = 0,
+ taskAttemptId = 0,
+ attemptNumber = 0,
+ taskMemoryManager = taskMemoryManager,
+ metricsSystem = env.metricsSystem,
+ internalAccumulators = Seq.empty)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index 6cae1f871e..885c450d6d 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -36,27 +36,35 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
maxExecutionMem: Long,
maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = {
val mm = new StaticMemoryManager(
- conf, maxExecutionMemory = maxExecutionMem, maxStorageMemory = maxStorageMem)
+ conf, maxExecutionMemory = maxExecutionMem, maxStorageMemory = maxStorageMem, numCores = 1)
val ms = makeMemoryStore(mm)
(mm, ms)
}
+ override protected def createMemoryManager(maxMemory: Long): MemoryManager = {
+ new StaticMemoryManager(
+ conf,
+ maxExecutionMemory = maxMemory,
+ maxStorageMemory = 0,
+ numCores = 1)
+ }
+
test("basic execution memory") {
val maxExecutionMem = 1000L
val (mm, _) = makeThings(maxExecutionMem, Long.MaxValue)
assert(mm.executionMemoryUsed === 0L)
- assert(mm.acquireExecutionMemory(10L, evictedBlocks) === 10L)
+ assert(mm.doAcquireExecutionMemory(10L, evictedBlocks) === 10L)
assert(mm.executionMemoryUsed === 10L)
- assert(mm.acquireExecutionMemory(100L, evictedBlocks) === 100L)
+ assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L)
// Acquire up to the max
- assert(mm.acquireExecutionMemory(1000L, evictedBlocks) === 890L)
+ assert(mm.doAcquireExecutionMemory(1000L, evictedBlocks) === 890L)
assert(mm.executionMemoryUsed === maxExecutionMem)
- assert(mm.acquireExecutionMemory(1L, evictedBlocks) === 0L)
+ assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 0L)
assert(mm.executionMemoryUsed === maxExecutionMem)
mm.releaseExecutionMemory(800L)
assert(mm.executionMemoryUsed === 200L)
// Acquire after release
- assert(mm.acquireExecutionMemory(1L, evictedBlocks) === 1L)
+ assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 1L)
assert(mm.executionMemoryUsed === 201L)
// Release beyond what was acquired
mm.releaseExecutionMemory(maxExecutionMem)
@@ -108,10 +116,10 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
val dummyBlock = TestBlockId("ain't nobody love like you do")
val (mm, ms) = makeThings(maxExecutionMem, maxStorageMem)
// Only execution memory should increase
- assert(mm.acquireExecutionMemory(100L, evictedBlocks) === 100L)
+ assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L)
assert(mm.storageMemoryUsed === 0L)
assert(mm.executionMemoryUsed === 100L)
- assert(mm.acquireExecutionMemory(1000L, evictedBlocks) === 100L)
+ assert(mm.doAcquireExecutionMemory(1000L, evictedBlocks) === 100L)
assert(mm.storageMemoryUsed === 0L)
assert(mm.executionMemoryUsed === 200L)
// Only storage memory should increase
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index e7baa50dc2..0c97f2bd89 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -34,11 +34,15 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
* Make a [[UnifiedMemoryManager]] and a [[MemoryStore]] with limited class dependencies.
*/
private def makeThings(maxMemory: Long): (UnifiedMemoryManager, MemoryStore) = {
- val mm = new UnifiedMemoryManager(conf, maxMemory)
+ val mm = new UnifiedMemoryManager(conf, maxMemory, numCores = 1)
val ms = makeMemoryStore(mm)
(mm, ms)
}
+ override protected def createMemoryManager(maxMemory: Long): MemoryManager = {
+ new UnifiedMemoryManager(conf, maxMemory, numCores = 1)
+ }
+
private def getStorageRegionSize(mm: UnifiedMemoryManager): Long = {
mm invokePrivate PrivateMethod[Long]('storageRegionSize)()
}
@@ -56,18 +60,18 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
val maxMemory = 1000L
val (mm, _) = makeThings(maxMemory)
assert(mm.executionMemoryUsed === 0L)
- assert(mm.acquireExecutionMemory(10L, evictedBlocks) === 10L)
+ assert(mm.doAcquireExecutionMemory(10L, evictedBlocks) === 10L)
assert(mm.executionMemoryUsed === 10L)
- assert(mm.acquireExecutionMemory(100L, evictedBlocks) === 100L)
+ assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L)
// Acquire up to the max
- assert(mm.acquireExecutionMemory(1000L, evictedBlocks) === 890L)
+ assert(mm.doAcquireExecutionMemory(1000L, evictedBlocks) === 890L)
assert(mm.executionMemoryUsed === maxMemory)
- assert(mm.acquireExecutionMemory(1L, evictedBlocks) === 0L)
+ assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 0L)
assert(mm.executionMemoryUsed === maxMemory)
mm.releaseExecutionMemory(800L)
assert(mm.executionMemoryUsed === 200L)
// Acquire after release
- assert(mm.acquireExecutionMemory(1L, evictedBlocks) === 1L)
+ assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 1L)
assert(mm.executionMemoryUsed === 201L)
// Release beyond what was acquired
mm.releaseExecutionMemory(maxMemory)
@@ -132,12 +136,12 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
require(mm.storageMemoryUsed > storageRegionSize,
s"bad test: storage memory used should exceed the storage region")
// Execution needs to request 250 bytes to evict storage memory
- assert(mm.acquireExecutionMemory(100L, evictedBlocks) === 100L)
+ assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L)
assert(mm.executionMemoryUsed === 100L)
assert(mm.storageMemoryUsed === 750L)
assertEnsureFreeSpaceNotCalled(ms)
// Execution wants 200 bytes but only 150 are free, so storage is evicted
- assert(mm.acquireExecutionMemory(200L, evictedBlocks) === 200L)
+ assert(mm.doAcquireExecutionMemory(200L, evictedBlocks) === 200L)
assertEnsureFreeSpaceCalled(ms, 200L)
assert(mm.executionMemoryUsed === 300L)
mm.releaseAllStorageMemory()
@@ -151,7 +155,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
s"bad test: storage memory used should be within the storage region")
// Execution cannot evict storage because the latter is within the storage fraction,
// so grant only what's remaining without evicting anything, i.e. 1000 - 300 - 400 = 300
- assert(mm.acquireExecutionMemory(400L, evictedBlocks) === 300L)
+ assert(mm.doAcquireExecutionMemory(400L, evictedBlocks) === 300L)
assert(mm.executionMemoryUsed === 600L)
assert(mm.storageMemoryUsed === 400L)
assertEnsureFreeSpaceNotCalled(ms)
@@ -170,7 +174,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
require(executionRegionSize === expectedExecutionRegionSize,
"bad test: storage region size is unexpected")
// Acquire enough execution memory to exceed the execution region
- assert(mm.acquireExecutionMemory(800L, evictedBlocks) === 800L)
+ assert(mm.doAcquireExecutionMemory(800L, evictedBlocks) === 800L)
assert(mm.executionMemoryUsed === 800L)
assert(mm.storageMemoryUsed === 0L)
assertEnsureFreeSpaceNotCalled(ms)
@@ -188,7 +192,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
mm.releaseExecutionMemory(maxMemory)
mm.releaseStorageMemory(maxMemory)
// Acquire some execution memory again, but this time keep it within the execution region
- assert(mm.acquireExecutionMemory(200L, evictedBlocks) === 200L)
+ assert(mm.doAcquireExecutionMemory(200L, evictedBlocks) === 200L)
assert(mm.executionMemoryUsed === 200L)
assert(mm.storageMemoryUsed === 0L)
assertEnsureFreeSpaceNotCalled(ms)
diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
deleted file mode 100644
index 5877aa042d..0000000000
--- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle
-
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.atomic.AtomicInteger
-
-import org.mockito.Mockito._
-import org.scalatest.concurrent.Timeouts
-import org.scalatest.time.SpanSugar._
-
-import org.apache.spark.{SparkFunSuite, TaskContext}
-import org.apache.spark.executor.TaskMetrics
-
-class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
-
- val nextTaskAttemptId = new AtomicInteger()
-
- /** Launch a thread with the given body block and return it. */
- private def startThread(name: String)(body: => Unit): Thread = {
- val thread = new Thread("ShuffleMemorySuite " + name) {
- override def run() {
- try {
- val taskAttemptId = nextTaskAttemptId.getAndIncrement
- val mockTaskContext = mock(classOf[TaskContext], RETURNS_SMART_NULLS)
- val taskMetrics = new TaskMetrics
- when(mockTaskContext.taskAttemptId()).thenReturn(taskAttemptId)
- when(mockTaskContext.taskMetrics()).thenReturn(taskMetrics)
- TaskContext.setTaskContext(mockTaskContext)
- body
- } finally {
- TaskContext.unset()
- }
- }
- }
- thread.start()
- thread
- }
-
- test("single task requesting memory") {
- val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
-
- assert(manager.tryToAcquire(100L) === 100L)
- assert(manager.tryToAcquire(400L) === 400L)
- assert(manager.tryToAcquire(400L) === 400L)
- assert(manager.tryToAcquire(200L) === 100L)
- assert(manager.tryToAcquire(100L) === 0L)
- assert(manager.tryToAcquire(100L) === 0L)
-
- manager.release(500L)
- assert(manager.tryToAcquire(300L) === 300L)
- assert(manager.tryToAcquire(300L) === 200L)
-
- manager.releaseMemoryForThisTask()
- assert(manager.tryToAcquire(1000L) === 1000L)
- assert(manager.tryToAcquire(100L) === 0L)
- }
-
- test("two threads requesting full memory") {
- // Two threads request 500 bytes first, wait for each other to get it, and then request
- // 500 more; we should immediately return 0 as both are now at 1 / N
-
- val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
-
- class State {
- var t1Result1 = -1L
- var t2Result1 = -1L
- var t1Result2 = -1L
- var t2Result2 = -1L
- }
- val state = new State
-
- val t1 = startThread("t1") {
- val r1 = manager.tryToAcquire(500L)
- state.synchronized {
- state.t1Result1 = r1
- state.notifyAll()
- while (state.t2Result1 === -1L) {
- state.wait()
- }
- }
- val r2 = manager.tryToAcquire(500L)
- state.synchronized { state.t1Result2 = r2 }
- }
-
- val t2 = startThread("t2") {
- val r1 = manager.tryToAcquire(500L)
- state.synchronized {
- state.t2Result1 = r1
- state.notifyAll()
- while (state.t1Result1 === -1L) {
- state.wait()
- }
- }
- val r2 = manager.tryToAcquire(500L)
- state.synchronized { state.t2Result2 = r2 }
- }
-
- failAfter(20 seconds) {
- t1.join()
- t2.join()
- }
-
- assert(state.t1Result1 === 500L)
- assert(state.t2Result1 === 500L)
- assert(state.t1Result2 === 0L)
- assert(state.t2Result2 === 0L)
- }
-
-
- test("tasks cannot grow past 1 / N") {
- // Two tasks request 250 bytes first, wait for each other to get it, and then request
- // 500 more; we should only grant 250 bytes to each of them on this second request
-
- val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
-
- class State {
- var t1Result1 = -1L
- var t2Result1 = -1L
- var t1Result2 = -1L
- var t2Result2 = -1L
- }
- val state = new State
-
- val t1 = startThread("t1") {
- val r1 = manager.tryToAcquire(250L)
- state.synchronized {
- state.t1Result1 = r1
- state.notifyAll()
- while (state.t2Result1 === -1L) {
- state.wait()
- }
- }
- val r2 = manager.tryToAcquire(500L)
- state.synchronized { state.t1Result2 = r2 }
- }
-
- val t2 = startThread("t2") {
- val r1 = manager.tryToAcquire(250L)
- state.synchronized {
- state.t2Result1 = r1
- state.notifyAll()
- while (state.t1Result1 === -1L) {
- state.wait()
- }
- }
- val r2 = manager.tryToAcquire(500L)
- state.synchronized { state.t2Result2 = r2 }
- }
-
- failAfter(20 seconds) {
- t1.join()
- t2.join()
- }
-
- assert(state.t1Result1 === 250L)
- assert(state.t2Result1 === 250L)
- assert(state.t1Result2 === 250L)
- assert(state.t2Result2 === 250L)
- }
-
- test("tasks can block to get at least 1 / 2N memory") {
- // t1 grabs 1000 bytes and then waits until t2 is ready to make a request. It sleeps
- // for a bit and releases 250 bytes, which should then be granted to t2. Further requests
- // by t2 will return false right away because it now has 1 / 2N of the memory.
-
- val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
-
- class State {
- var t1Requested = false
- var t2Requested = false
- var t1Result = -1L
- var t2Result = -1L
- var t2Result2 = -1L
- var t2WaitTime = 0L
- }
- val state = new State
-
- val t1 = startThread("t1") {
- state.synchronized {
- state.t1Result = manager.tryToAcquire(1000L)
- state.t1Requested = true
- state.notifyAll()
- while (!state.t2Requested) {
- state.wait()
- }
- }
- // Sleep a bit before releasing our memory; this is hacky but it would be difficult to make
- // sure the other thread blocks for some time otherwise
- Thread.sleep(300)
- manager.release(250L)
- }
-
- val t2 = startThread("t2") {
- state.synchronized {
- while (!state.t1Requested) {
- state.wait()
- }
- state.t2Requested = true
- state.notifyAll()
- }
- val startTime = System.currentTimeMillis()
- val result = manager.tryToAcquire(250L)
- val endTime = System.currentTimeMillis()
- state.synchronized {
- state.t2Result = result
- // A second call should return 0 because we're now already at 1 / 2N
- state.t2Result2 = manager.tryToAcquire(100L)
- state.t2WaitTime = endTime - startTime
- }
- }
-
- failAfter(20 seconds) {
- t1.join()
- t2.join()
- }
-
- // Both threads should've been able to acquire their memory; the second one will have waited
- // until the first one acquired 1000 bytes and then released 250
- state.synchronized {
- assert(state.t1Result === 1000L, "t1 could not allocate memory")
- assert(state.t2Result === 250L, "t2 could not allocate memory")
- assert(state.t2WaitTime > 200, s"t2 waited less than 200 ms (${state.t2WaitTime})")
- assert(state.t2Result2 === 0L, "t1 got extra memory the second time")
- }
- }
-
- test("releaseMemoryForThisTask") {
- // t1 grabs 1000 bytes and then waits until t2 is ready to make a request. It sleeps
- // for a bit and releases all its memory. t2 should now be able to grab all the memory.
-
- val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
-
- class State {
- var t1Requested = false
- var t2Requested = false
- var t1Result = -1L
- var t2Result1 = -1L
- var t2Result2 = -1L
- var t2Result3 = -1L
- var t2WaitTime = 0L
- }
- val state = new State
-
- val t1 = startThread("t1") {
- state.synchronized {
- state.t1Result = manager.tryToAcquire(1000L)
- state.t1Requested = true
- state.notifyAll()
- while (!state.t2Requested) {
- state.wait()
- }
- }
- // Sleep a bit before releasing our memory; this is hacky but it would be difficult to make
- // sure the other task blocks for some time otherwise
- Thread.sleep(300)
- manager.releaseMemoryForThisTask()
- }
-
- val t2 = startThread("t2") {
- state.synchronized {
- while (!state.t1Requested) {
- state.wait()
- }
- state.t2Requested = true
- state.notifyAll()
- }
- val startTime = System.currentTimeMillis()
- val r1 = manager.tryToAcquire(500L)
- val endTime = System.currentTimeMillis()
- val r2 = manager.tryToAcquire(500L)
- val r3 = manager.tryToAcquire(500L)
- state.synchronized {
- state.t2Result1 = r1
- state.t2Result2 = r2
- state.t2Result3 = r3
- state.t2WaitTime = endTime - startTime
- }
- }
-
- failAfter(20 seconds) {
- t1.join()
- t2.join()
- }
-
- // Both tasks should've been able to acquire their memory; the second one will have waited
- // until the first one acquired 1000 bytes and then released all of it
- state.synchronized {
- assert(state.t1Result === 1000L, "t1 could not allocate memory")
- assert(state.t2Result1 === 500L, "t2 didn't get 500 bytes the first time")
- assert(state.t2Result2 === 500L, "t2 didn't get 500 bytes the second time")
- assert(state.t2Result3 === 0L, s"t2 got more bytes a third time (${state.t2Result3})")
- assert(state.t2WaitTime > 200, s"t2 waited less than 200 ms (${state.t2WaitTime})")
- }
- }
-
- test("tasks should not be granted a negative size") {
- val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
- manager.tryToAcquire(700L)
-
- val latch = new CountDownLatch(1)
- startThread("t1") {
- manager.tryToAcquire(300L)
- latch.countDown()
- }
- latch.await() // Wait until `t1` calls `tryToAcquire`
-
- val granted = manager.tryToAcquire(300L)
- assert(0 === granted, "granted is negative")
- }
-}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index cc44c676b2..6e3f500e15 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -61,7 +61,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
maxMem: Long,
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
- val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem)
+ val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
val store = new BlockManager(name, rpcEnv, master, serializer, conf,
memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
memManager.setMemoryStore(store.memoryStore)
@@ -261,7 +261,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
val failableTransfer = mock(classOf[BlockTransferService]) // this wont actually work
when(failableTransfer.hostName).thenReturn("some-hostname")
when(failableTransfer.port).thenReturn(1000)
- val memManager = new StaticMemoryManager(conf, Long.MaxValue, 10000)
+ val memManager = new StaticMemoryManager(conf, Long.MaxValue, 10000, numCores = 1)
val failableStore = new BlockManager("failable-store", rpcEnv, master, serializer, conf,
memManager, mapOutputTracker, shuffleManager, failableTransfer, securityMgr, 0)
memManager.setMemoryStore(failableStore.memoryStore)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index f3fab33ca2..d49015afcd 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -68,7 +68,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
maxMem: Long,
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
- val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem)
+ val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
val blockManager = new BlockManager(name, rpcEnv, master, serializer, conf,
memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
memManager.setMemoryStore(blockManager.memoryStore)
@@ -823,7 +823,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
test("block store put failure") {
// Use Java serializer so we can create an unserializable error.
val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
- val memoryManager = new StaticMemoryManager(conf, Long.MaxValue, 1200)
+ val memoryManager = new StaticMemoryManager(
+ conf,
+ maxExecutionMemory = Long.MaxValue,
+ maxStorageMemory = 1200,
+ numCores = 1)
store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
new JavaSerializer(conf), conf, memoryManager, mapOutputTracker,
shuffleManager, transfer, securityMgr, 0)
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 5cb506ea21..dc3185a6d5 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark._
import org.apache.spark.io.CompressionCodec
-
+import org.apache.spark.memory.MemoryTestingUtils
class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
import TestUtils.{assertNotSpilled, assertSpilled}
@@ -32,8 +32,11 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
private def mergeCombiners[T](buf1: ArrayBuffer[T], buf2: ArrayBuffer[T]): ArrayBuffer[T] =
buf1 ++= buf2
- private def createExternalMap[T] = new ExternalAppendOnlyMap[T, T, ArrayBuffer[T]](
- createCombiner[T], mergeValue[T], mergeCombiners[T])
+ private def createExternalMap[T] = {
+ val context = MemoryTestingUtils.fakeTaskContext(sc.env)
+ new ExternalAppendOnlyMap[T, T, ArrayBuffer[T]](
+ createCombiner[T], mergeValue[T], mergeCombiners[T], context = context)
+ }
private def createSparkConf(loadDefaults: Boolean, codec: Option[String] = None): SparkConf = {
val conf = new SparkConf(loadDefaults)
@@ -49,23 +52,27 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
conf
}
- test("simple insert") {
+ test("single insert insert") {
val conf = createSparkConf(loadDefaults = false)
sc = new SparkContext("local", "test", conf)
val map = createExternalMap[Int]
-
- // Single insert
map.insert(1, 10)
- var it = map.iterator
+ val it = map.iterator
assert(it.hasNext)
val kv = it.next()
assert(kv._1 === 1 && kv._2 === ArrayBuffer[Int](10))
assert(!it.hasNext)
+ sc.stop()
+ }
- // Multiple insert
+ test("multiple insert") {
+ val conf = createSparkConf(loadDefaults = false)
+ sc = new SparkContext("local", "test", conf)
+ val map = createExternalMap[Int]
+ map.insert(1, 10)
map.insert(2, 20)
map.insert(3, 30)
- it = map.iterator
+ val it = map.iterator
assert(it.hasNext)
assert(it.toSet === Set[(Int, ArrayBuffer[Int])](
(1, ArrayBuffer[Int](10)),
@@ -144,39 +151,22 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
sc = new SparkContext("local", "test", conf)
val map = createExternalMap[Int]
+ val nullInt = null.asInstanceOf[Int]
map.insert(1, 5)
map.insert(2, 6)
map.insert(3, 7)
- assert(map.size === 3)
- assert(map.iterator.toSet === Set[(Int, Seq[Int])](
- (1, Seq[Int](5)),
- (2, Seq[Int](6)),
- (3, Seq[Int](7))
- ))
-
- // Null keys
- val nullInt = null.asInstanceOf[Int]
+ map.insert(4, nullInt)
map.insert(nullInt, 8)
- assert(map.size === 4)
- assert(map.iterator.toSet === Set[(Int, Seq[Int])](
+ map.insert(nullInt, nullInt)
+ val result = map.iterator.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.sorted))
+ assert(result === Set[(Int, Seq[Int])](
(1, Seq[Int](5)),
(2, Seq[Int](6)),
(3, Seq[Int](7)),
- (nullInt, Seq[Int](8))
+ (4, Seq[Int](nullInt)),
+ (nullInt, Seq[Int](nullInt, 8))
))
- // Null values
- map.insert(4, nullInt)
- map.insert(nullInt, nullInt)
- assert(map.size === 5)
- val result = map.iterator.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet))
- assert(result === Set[(Int, Set[Int])](
- (1, Set[Int](5)),
- (2, Set[Int](6)),
- (3, Set[Int](7)),
- (4, Set[Int](nullInt)),
- (nullInt, Set[Int](nullInt, 8))
- ))
sc.stop()
}
@@ -344,7 +334,9 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
val conf = createSparkConf(loadDefaults = true)
conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString)
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
- val map = new ExternalAppendOnlyMap[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)
+ val context = MemoryTestingUtils.fakeTaskContext(sc.env)
+ val map =
+ new ExternalAppendOnlyMap[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _, context = context)
// Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes
// problems if the map fails to group together the objects with the same code (SPARK-2043).
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index e2cb791771..d7b2d07a40 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.util.collection
+import org.apache.spark.memory.MemoryTestingUtils
+
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
@@ -98,6 +100,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
val conf = createSparkConf(loadDefaults = true, kryo = false)
conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString)
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+ val context = MemoryTestingUtils.fakeTaskContext(sc.env)
def createCombiner(i: String): ArrayBuffer[String] = ArrayBuffer[String](i)
def mergeValue(buffer: ArrayBuffer[String], i: String): ArrayBuffer[String] = buffer += i
@@ -109,7 +112,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
createCombiner _, mergeValue _, mergeCombiners _)
val sorter = new ExternalSorter[String, String, ArrayBuffer[String]](
- Some(agg), None, None, None)
+ context, Some(agg), None, None, None)
val collisionPairs = Seq(
("Aa", "BB"), // 2112
@@ -158,8 +161,9 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
val conf = createSparkConf(loadDefaults = true, kryo = false)
conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString)
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+ val context = MemoryTestingUtils.fakeTaskContext(sc.env)
val agg = new Aggregator[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)
- val sorter = new ExternalSorter[FixedHashObject, Int, Int](Some(agg), None, None, None)
+ val sorter = new ExternalSorter[FixedHashObject, Int, Int](context, Some(agg), None, None, None)
// Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes
// problems if the map fails to group together the objects with the same code (SPARK-2043).
val toInsert = for (i <- 1 to 10; j <- 1 to size) yield (FixedHashObject(j, j % 2), 1)
@@ -180,6 +184,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
val conf = createSparkConf(loadDefaults = true, kryo = false)
conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString)
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+ val context = MemoryTestingUtils.fakeTaskContext(sc.env)
def createCombiner(i: Int): ArrayBuffer[Int] = ArrayBuffer[Int](i)
def mergeValue(buffer: ArrayBuffer[Int], i: Int): ArrayBuffer[Int] = buffer += i
@@ -188,7 +193,8 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
val agg = new Aggregator[Int, Int, ArrayBuffer[Int]](createCombiner, mergeValue, mergeCombiners)
- val sorter = new ExternalSorter[Int, Int, ArrayBuffer[Int]](Some(agg), None, None, None)
+ val sorter =
+ new ExternalSorter[Int, Int, ArrayBuffer[Int]](context, Some(agg), None, None, None)
sorter.insertAll(
(1 to size).iterator.map(i => (i, i)) ++ Iterator((Int.MaxValue, Int.MaxValue)))
assert(sorter.numSpills > 0, "sorter did not spill")
@@ -204,6 +210,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
val conf = createSparkConf(loadDefaults = true, kryo = false)
conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString)
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+ val context = MemoryTestingUtils.fakeTaskContext(sc.env)
def createCombiner(i: String): ArrayBuffer[String] = ArrayBuffer[String](i)
def mergeValue(buffer: ArrayBuffer[String], i: String): ArrayBuffer[String] = buffer += i
@@ -214,7 +221,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
createCombiner, mergeValue, mergeCombiners)
val sorter = new ExternalSorter[String, String, ArrayBuffer[String]](
- Some(agg), None, None, None)
+ context, Some(agg), None, None, None)
sorter.insertAll((1 to size).iterator.map(i => (i.toString, i.toString)) ++ Iterator(
(null.asInstanceOf[String], "1"),
@@ -271,31 +278,32 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
private def emptyDataStream(conf: SparkConf) {
conf.set("spark.shuffle.manager", "sort")
sc = new SparkContext("local", "test", conf)
+ val context = MemoryTestingUtils.fakeTaskContext(sc.env)
val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j)
val ord = implicitly[Ordering[Int]]
// Both aggregator and ordering
val sorter = new ExternalSorter[Int, Int, Int](
- Some(agg), Some(new HashPartitioner(3)), Some(ord), None)
+ context, Some(agg), Some(new HashPartitioner(3)), Some(ord), None)
assert(sorter.iterator.toSeq === Seq())
sorter.stop()
// Only aggregator
val sorter2 = new ExternalSorter[Int, Int, Int](
- Some(agg), Some(new HashPartitioner(3)), None, None)
+ context, Some(agg), Some(new HashPartitioner(3)), None, None)
assert(sorter2.iterator.toSeq === Seq())
sorter2.stop()
// Only ordering
val sorter3 = new ExternalSorter[Int, Int, Int](
- None, Some(new HashPartitioner(3)), Some(ord), None)
+ context, None, Some(new HashPartitioner(3)), Some(ord), None)
assert(sorter3.iterator.toSeq === Seq())
sorter3.stop()
// Neither aggregator nor ordering
val sorter4 = new ExternalSorter[Int, Int, Int](
- None, Some(new HashPartitioner(3)), None, None)
+ context, None, Some(new HashPartitioner(3)), None, None)
assert(sorter4.iterator.toSeq === Seq())
sorter4.stop()
}
@@ -303,6 +311,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
private def fewElementsPerPartition(conf: SparkConf) {
conf.set("spark.shuffle.manager", "sort")
sc = new SparkContext("local", "test", conf)
+ val context = MemoryTestingUtils.fakeTaskContext(sc.env)
val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j)
val ord = implicitly[Ordering[Int]]
@@ -313,28 +322,28 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
// Both aggregator and ordering
val sorter = new ExternalSorter[Int, Int, Int](
- Some(agg), Some(new HashPartitioner(7)), Some(ord), None)
+ context, Some(agg), Some(new HashPartitioner(7)), Some(ord), None)
sorter.insertAll(elements.iterator)
assert(sorter.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
sorter.stop()
// Only aggregator
val sorter2 = new ExternalSorter[Int, Int, Int](
- Some(agg), Some(new HashPartitioner(7)), None, None)
+ context, Some(agg), Some(new HashPartitioner(7)), None, None)
sorter2.insertAll(elements.iterator)
assert(sorter2.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
sorter2.stop()
// Only ordering
val sorter3 = new ExternalSorter[Int, Int, Int](
- None, Some(new HashPartitioner(7)), Some(ord), None)
+ context, None, Some(new HashPartitioner(7)), Some(ord), None)
sorter3.insertAll(elements.iterator)
assert(sorter3.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
sorter3.stop()
// Neither aggregator nor ordering
val sorter4 = new ExternalSorter[Int, Int, Int](
- None, Some(new HashPartitioner(7)), None, None)
+ context, None, Some(new HashPartitioner(7)), None, None)
sorter4.insertAll(elements.iterator)
assert(sorter4.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
sorter4.stop()
@@ -345,12 +354,13 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
conf.set("spark.shuffle.manager", "sort")
conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString)
sc = new SparkContext("local", "test", conf)
+ val context = MemoryTestingUtils.fakeTaskContext(sc.env)
val ord = implicitly[Ordering[Int]]
val elements = Iterator((1, 1), (5, 5)) ++ (0 until size).iterator.map(x => (2, 2))
val sorter = new ExternalSorter[Int, Int, Int](
- None, Some(new HashPartitioner(7)), Some(ord), None)
+ context, None, Some(new HashPartitioner(7)), Some(ord), None)
sorter.insertAll(elements)
assert(sorter.numSpills > 0, "sorter did not spill")
val iter = sorter.partitionedIterator.map(p => (p._1, p._2.toList))
@@ -432,8 +442,9 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
val diskBlockManager = sc.env.blockManager.diskBlockManager
val ord = implicitly[Ordering[Int]]
val expectedSize = if (withFailures) size - 1 else size
+ val context = MemoryTestingUtils.fakeTaskContext(sc.env)
val sorter = new ExternalSorter[Int, Int, Int](
- None, Some(new HashPartitioner(3)), Some(ord), None)
+ context, None, Some(new HashPartitioner(3)), Some(ord), None)
if (withFailures) {
intercept[SparkException] {
sorter.insertAll((0 until size).iterator.map { i =>
@@ -501,7 +512,9 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
None
}
val ord = if (withOrdering) Some(implicitly[Ordering[Int]]) else None
- val sorter = new ExternalSorter[Int, Int, Int](agg, Some(new HashPartitioner(3)), ord, None)
+ val context = MemoryTestingUtils.fakeTaskContext(sc.env)
+ val sorter =
+ new ExternalSorter[Int, Int, Int](context, agg, Some(new HashPartitioner(3)), ord, None)
sorter.insertAll((0 until size).iterator.map { i => (i / 4, i) })
if (withSpilling) {
assert(sorter.numSpills > 0, "sorter did not spill")
@@ -538,8 +551,9 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
val testData = Array.tabulate(size) { _ => rand.nextInt().toString }
+ val context = MemoryTestingUtils.fakeTaskContext(sc.env)
val sorter1 = new ExternalSorter[String, String, String](
- None, None, Some(wrongOrdering), None)
+ context, None, None, Some(wrongOrdering), None)
val thrown = intercept[IllegalArgumentException] {
sorter1.insertAll(testData.iterator.map(i => (i, i)))
assert(sorter1.numSpills > 0, "sorter did not spill")
@@ -561,7 +575,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
createCombiner, mergeValue, mergeCombiners)
val sorter2 = new ExternalSorter[String, String, ArrayBuffer[String]](
- Some(agg), None, None, None)
+ context, Some(agg), None, None, None)
sorter2.insertAll(testData.iterator.map(i => (i, i)))
assert(sorter2.numSpills > 0, "sorter did not spill")