aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala111
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala36
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala143
4 files changed, 136 insertions, 161 deletions
diff --git a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
new file mode 100644
index 0000000000..6061e544e7
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.hash
+
+import java.io.{File, FileWriter}
+
+import scala.language.reflectiveCalls
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.{SparkEnv, SparkContext, LocalSparkContext, SparkConf}
+import org.apache.spark.executor.ShuffleWriteMetrics
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.shuffle.FileShuffleBlockManager
+import org.apache.spark.storage.{ShuffleBlockId, FileSegment}
+
+class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
+ private val testConf = new SparkConf(false)
+
+ private def checkSegments(segment1: FileSegment, segment2: FileSegment) {
+ assert (segment1.file.getCanonicalPath === segment2.file.getCanonicalPath)
+ assert (segment1.offset === segment2.offset)
+ assert (segment1.length === segment2.length)
+ }
+
+ test("consolidated shuffle can write to shuffle group without messing existing offsets/lengths") {
+
+ val conf = new SparkConf(false)
+ // reset after EACH object write. This is to ensure that there are bytes appended after
+ // an object is written. So if the codepaths assume writeObject is end of data, this should
+ // flush those bugs out. This was common bug in ExternalAppendOnlyMap, etc.
+ conf.set("spark.serializer.objectStreamReset", "1")
+ conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
+ conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager")
+
+ sc = new SparkContext("local", "test", conf)
+
+ val shuffleBlockManager =
+ SparkEnv.get.shuffleManager.shuffleBlockManager.asInstanceOf[FileShuffleBlockManager]
+
+ val shuffle1 = shuffleBlockManager.forMapTask(1, 1, 1, new JavaSerializer(conf),
+ new ShuffleWriteMetrics)
+ for (writer <- shuffle1.writers) {
+ writer.write("test1")
+ writer.write("test2")
+ }
+ for (writer <- shuffle1.writers) {
+ writer.commitAndClose()
+ }
+
+ val shuffle1Segment = shuffle1.writers(0).fileSegment()
+ shuffle1.releaseWriters(success = true)
+
+ val shuffle2 = shuffleBlockManager.forMapTask(1, 2, 1, new JavaSerializer(conf),
+ new ShuffleWriteMetrics)
+
+ for (writer <- shuffle2.writers) {
+ writer.write("test3")
+ writer.write("test4")
+ }
+ for (writer <- shuffle2.writers) {
+ writer.commitAndClose()
+ }
+ val shuffle2Segment = shuffle2.writers(0).fileSegment()
+ shuffle2.releaseWriters(success = true)
+
+ // Now comes the test :
+ // Write to shuffle 3; and close it, but before registering it, check if the file lengths for
+ // previous task (forof shuffle1) is the same as 'segments'. Earlier, we were inferring length
+ // of block based on remaining data in file : which could mess things up when there is concurrent read
+ // and writes happening to the same shuffle group.
+
+ val shuffle3 = shuffleBlockManager.forMapTask(1, 3, 1, new JavaSerializer(testConf),
+ new ShuffleWriteMetrics)
+ for (writer <- shuffle3.writers) {
+ writer.write("test3")
+ writer.write("test4")
+ }
+ for (writer <- shuffle3.writers) {
+ writer.commitAndClose()
+ }
+ // check before we register.
+ checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)).left.get)
+ shuffle3.releaseWriters(success = true)
+ checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)).left.get)
+ shuffleBlockManager.removeShuffle(1)
+
+ }
+
+
+ def writeToFile(file: File, numBytes: Int) {
+ val writer = new FileWriter(file, true)
+ for (i <- 0 until numBytes) writer.write(i)
+ writer.close()
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
index fbfcb5156d..3c86f6bafc 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
@@ -60,11 +60,11 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
}
// 3rd block is going to fail
- doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(0)), any())
- doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(1)), any())
- doAnswer(answer).when(blockManager).getLocalFromDisk(meq(blIds(2)), any())
- doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(3)), any())
- doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(4)), any())
+ doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(0)), any())
+ doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(1)), any())
+ doAnswer(answer).when(blockManager).getLocalShuffleFromDisk(meq(blIds(2)), any())
+ doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(3)), any())
+ doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(4)), any())
val bmId = BlockManagerId("test-client", "test-client", 1)
val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
@@ -76,24 +76,24 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
iterator.initialize()
- // Without exhausting the iterator, the iterator should be lazy and not call getLocalFromDisk.
- verify(blockManager, times(0)).getLocalFromDisk(any(), any())
+ // Without exhausting the iterator, the iterator should be lazy and not call getLocalShuffleFromDisk.
+ verify(blockManager, times(0)).getLocalShuffleFromDisk(any(), any())
assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements")
// the 2nd element of the tuple returned by iterator.next should be defined when fetching successfully
assert(iterator.next()._2.isDefined, "1st element should be defined but is not actually defined")
- verify(blockManager, times(1)).getLocalFromDisk(any(), any())
+ verify(blockManager, times(1)).getLocalShuffleFromDisk(any(), any())
assert(iterator.hasNext, "iterator should have 5 elements but actually has 1 element")
assert(iterator.next()._2.isDefined, "2nd element should be defined but is not actually defined")
- verify(blockManager, times(2)).getLocalFromDisk(any(), any())
+ verify(blockManager, times(2)).getLocalShuffleFromDisk(any(), any())
assert(iterator.hasNext, "iterator should have 5 elements but actually has 2 elements")
// 3rd fetch should be failed
intercept[Exception] {
iterator.next()
}
- verify(blockManager, times(3)).getLocalFromDisk(any(), any())
+ verify(blockManager, times(3)).getLocalShuffleFromDisk(any(), any())
}
@@ -115,11 +115,11 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
val optItr = mock(classOf[Option[Iterator[Any]]])
// All blocks should be fetched successfully
- doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(0)), any())
- doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(1)), any())
- doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(2)), any())
- doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(3)), any())
- doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(4)), any())
+ doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(0)), any())
+ doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(1)), any())
+ doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(2)), any())
+ doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(3)), any())
+ doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(4)), any())
val bmId = BlockManagerId("test-client", "test-client", 1)
val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
@@ -131,8 +131,8 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
iterator.initialize()
- // Without exhausting the iterator, the iterator should be lazy and not call getLocalFromDisk.
- verify(blockManager, times(0)).getLocalFromDisk(any(), any())
+ // Without exhausting the iterator, the iterator should be lazy and not call getLocalShuffleFromDisk.
+ verify(blockManager, times(0)).getLocalShuffleFromDisk(any(), any())
assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements")
assert(iterator.next._2.isDefined, "All elements should be defined but 1st element is not actually defined")
@@ -145,7 +145,7 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
assert(iterator.hasNext, "iterator should have 5 elements but actually has 4 elements")
assert(iterator.next._2.isDefined, "All elements should be defined but 5th element is not actually defined")
- verify(blockManager, times(5)).getLocalFromDisk(any(), any())
+ verify(blockManager, times(5)).getLocalShuffleFromDisk(any(), any())
}
test("block fetch from remote fails using BasicBlockFetcherIterator") {
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index bdcea07e57..14ffadab99 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -49,6 +49,7 @@ import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.implicitConversions
import scala.language.postfixOps
+import org.apache.spark.shuffle.ShuffleBlockManager
class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
with PrivateMethodTester {
@@ -823,11 +824,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
// be nice to refactor classes involved in disk storage in a way that
// allows for easier testing.
val blockManager = mock(classOf[BlockManager])
- val shuffleBlockManager = mock(classOf[ShuffleBlockManager])
- when(shuffleBlockManager.conf).thenReturn(conf)
- val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf)
-
when(blockManager.conf).thenReturn(conf.clone.set(confKey, 0.toString))
+ val diskBlockManager = new DiskBlockManager(blockManager, conf)
+
val diskStoreMapped = new DiskStore(blockManager, diskBlockManager)
diskStoreMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY)
val mapped = diskStoreMapped.getBytes(blockId).get
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index aabaeadd7a..26082ded8c 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -26,6 +26,7 @@ import scala.language.reflectiveCalls
import akka.actor.Props
import com.google.common.io.Files
+import org.mockito.Mockito.{mock, when}
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
import org.apache.spark.SparkConf
@@ -40,18 +41,8 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
private var rootDir1: File = _
private var rootDirs: String = _
- // This suite focuses primarily on consolidation features,
- // so we coerce consolidation if not already enabled.
- testConf.set("spark.shuffle.consolidateFiles", "true")
-
- private val shuffleManager = new HashShuffleManager(testConf.clone)
-
- val shuffleBlockManager = new ShuffleBlockManager(null, shuffleManager) {
- override def conf = testConf.clone
- var idToSegmentMap = mutable.Map[ShuffleBlockId, FileSegment]()
- override def getBlockLocation(id: ShuffleBlockId) = idToSegmentMap(id)
- }
-
+ val blockManager = mock(classOf[BlockManager])
+ when(blockManager.conf).thenReturn(testConf)
var diskBlockManager: DiskBlockManager = _
override def beforeAll() {
@@ -73,22 +64,17 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
override def beforeEach() {
val conf = testConf.clone
conf.set("spark.local.dir", rootDirs)
- diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf)
- shuffleBlockManager.idToSegmentMap.clear()
+ diskBlockManager = new DiskBlockManager(blockManager, conf)
}
override def afterEach() {
diskBlockManager.stop()
- shuffleBlockManager.idToSegmentMap.clear()
}
test("basic block creation") {
val blockId = new TestBlockId("test")
- assertSegmentEquals(blockId, blockId.name, 0, 0)
-
val newFile = diskBlockManager.getFile(blockId)
writeToFile(newFile, 10)
- assertSegmentEquals(blockId, blockId.name, 0, 10)
assert(diskBlockManager.containsBlock(blockId))
newFile.delete()
assert(!diskBlockManager.containsBlock(blockId))
@@ -101,127 +87,6 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
assert(diskBlockManager.getAllBlocks.toSet === ids.toSet)
}
- test("block appending") {
- val blockId = new TestBlockId("test")
- val newFile = diskBlockManager.getFile(blockId)
- writeToFile(newFile, 15)
- assertSegmentEquals(blockId, blockId.name, 0, 15)
- val newFile2 = diskBlockManager.getFile(blockId)
- assert(newFile === newFile2)
- writeToFile(newFile2, 12)
- assertSegmentEquals(blockId, blockId.name, 0, 27)
- newFile.delete()
- }
-
- test("block remapping") {
- val filename = "test"
- val blockId0 = new ShuffleBlockId(1, 2, 3)
- val newFile = diskBlockManager.getFile(filename)
- writeToFile(newFile, 15)
- shuffleBlockManager.idToSegmentMap(blockId0) = new FileSegment(newFile, 0, 15)
- assertSegmentEquals(blockId0, filename, 0, 15)
-
- val blockId1 = new ShuffleBlockId(1, 2, 4)
- val newFile2 = diskBlockManager.getFile(filename)
- writeToFile(newFile2, 12)
- shuffleBlockManager.idToSegmentMap(blockId1) = new FileSegment(newFile, 15, 12)
- assertSegmentEquals(blockId1, filename, 15, 12)
-
- assert(newFile === newFile2)
- newFile.delete()
- }
-
- private def checkSegments(segment1: FileSegment, segment2: FileSegment) {
- assert (segment1.file.getCanonicalPath === segment2.file.getCanonicalPath)
- assert (segment1.offset === segment2.offset)
- assert (segment1.length === segment2.length)
- }
-
- test("consolidated shuffle can write to shuffle group without messing existing offsets/lengths") {
-
- val serializer = new JavaSerializer(testConf)
- val confCopy = testConf.clone
- // reset after EACH object write. This is to ensure that there are bytes appended after
- // an object is written. So if the codepaths assume writeObject is end of data, this should
- // flush those bugs out. This was common bug in ExternalAppendOnlyMap, etc.
- confCopy.set("spark.serializer.objectStreamReset", "1")
-
- val securityManager = new org.apache.spark.SecurityManager(confCopy)
- // Do not use the shuffleBlockManager above !
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, confCopy,
- securityManager)
- val master = new BlockManagerMaster(
- actorSystem.actorOf(Props(new BlockManagerMasterActor(true, confCopy, new LiveListenerBus))),
- confCopy)
- val store = new BlockManager("<driver>", actorSystem, master , serializer, confCopy,
- securityManager, null, shuffleManager)
-
- try {
-
- val shuffleManager = store.shuffleBlockManager
-
- val shuffle1 = shuffleManager.forMapTask(1, 1, 1, serializer, new ShuffleWriteMetrics)
- for (writer <- shuffle1.writers) {
- writer.write("test1")
- writer.write("test2")
- }
- for (writer <- shuffle1.writers) {
- writer.commitAndClose()
- }
-
- val shuffle1Segment = shuffle1.writers(0).fileSegment()
- shuffle1.releaseWriters(success = true)
-
- val shuffle2 = shuffleManager.forMapTask(1, 2, 1, new JavaSerializer(testConf),
- new ShuffleWriteMetrics)
-
- for (writer <- shuffle2.writers) {
- writer.write("test3")
- writer.write("test4")
- }
- for (writer <- shuffle2.writers) {
- writer.commitAndClose()
- }
- val shuffle2Segment = shuffle2.writers(0).fileSegment()
- shuffle2.releaseWriters(success = true)
-
- // Now comes the test :
- // Write to shuffle 3; and close it, but before registering it, check if the file lengths for
- // previous task (forof shuffle1) is the same as 'segments'. Earlier, we were inferring length
- // of block based on remaining data in file : which could mess things up when there is concurrent read
- // and writes happening to the same shuffle group.
-
- val shuffle3 = shuffleManager.forMapTask(1, 3, 1, new JavaSerializer(testConf),
- new ShuffleWriteMetrics)
- for (writer <- shuffle3.writers) {
- writer.write("test3")
- writer.write("test4")
- }
- for (writer <- shuffle3.writers) {
- writer.commitAndClose()
- }
- // check before we register.
- checkSegments(shuffle2Segment, shuffleManager.getBlockLocation(ShuffleBlockId(1, 2, 0)))
- shuffle3.releaseWriters(success = true)
- checkSegments(shuffle2Segment, shuffleManager.getBlockLocation(ShuffleBlockId(1, 2, 0)))
- shuffleManager.removeShuffle(1)
- } finally {
-
- if (store != null) {
- store.stop()
- }
- actorSystem.shutdown()
- actorSystem.awaitTermination()
- }
- }
-
- def assertSegmentEquals(blockId: BlockId, filename: String, offset: Int, length: Int) {
- val segment = diskBlockManager.getBlockLocation(blockId)
- assert(segment.file.getName === filename)
- assert(segment.offset === offset)
- assert(segment.length === length)
- }
-
def writeToFile(file: File, numBytes: Int) {
val writer = new FileWriter(file, true)
for (i <- 0 until numBytes) writer.write(i)