diff options
Diffstat (limited to 'core/src/test')
4 files changed, 136 insertions, 161 deletions
diff --git a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala new file mode 100644 index 0000000000..6061e544e7 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.hash + +import java.io.{File, FileWriter} + +import scala.language.reflectiveCalls + +import org.scalatest.FunSuite + +import org.apache.spark.{SparkEnv, SparkContext, LocalSparkContext, SparkConf} +import org.apache.spark.executor.ShuffleWriteMetrics +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.shuffle.FileShuffleBlockManager +import org.apache.spark.storage.{ShuffleBlockId, FileSegment} + +class HashShuffleManagerSuite extends FunSuite with LocalSparkContext { + private val testConf = new SparkConf(false) + + private def checkSegments(segment1: FileSegment, segment2: FileSegment) { + assert (segment1.file.getCanonicalPath === segment2.file.getCanonicalPath) + assert (segment1.offset === segment2.offset) + assert (segment1.length === segment2.length) + } + + test("consolidated shuffle can write to shuffle group without messing existing offsets/lengths") { + + val conf = new SparkConf(false) + // reset after EACH object write. This is to ensure that there are bytes appended after + // an object is written. So if the codepaths assume writeObject is end of data, this should + // flush those bugs out. This was common bug in ExternalAppendOnlyMap, etc. + conf.set("spark.serializer.objectStreamReset", "1") + conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") + conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager") + + sc = new SparkContext("local", "test", conf) + + val shuffleBlockManager = + SparkEnv.get.shuffleManager.shuffleBlockManager.asInstanceOf[FileShuffleBlockManager] + + val shuffle1 = shuffleBlockManager.forMapTask(1, 1, 1, new JavaSerializer(conf), + new ShuffleWriteMetrics) + for (writer <- shuffle1.writers) { + writer.write("test1") + writer.write("test2") + } + for (writer <- shuffle1.writers) { + writer.commitAndClose() + } + + val shuffle1Segment = shuffle1.writers(0).fileSegment() + shuffle1.releaseWriters(success = true) + + val shuffle2 = shuffleBlockManager.forMapTask(1, 2, 1, new JavaSerializer(conf), + new ShuffleWriteMetrics) + + for (writer <- shuffle2.writers) { + writer.write("test3") + writer.write("test4") + } + for (writer <- shuffle2.writers) { + writer.commitAndClose() + } + val shuffle2Segment = shuffle2.writers(0).fileSegment() + shuffle2.releaseWriters(success = true) + + // Now comes the test : + // Write to shuffle 3; and close it, but before registering it, check if the file lengths for + // previous task (forof shuffle1) is the same as 'segments'. Earlier, we were inferring length + // of block based on remaining data in file : which could mess things up when there is concurrent read + // and writes happening to the same shuffle group. + + val shuffle3 = shuffleBlockManager.forMapTask(1, 3, 1, new JavaSerializer(testConf), + new ShuffleWriteMetrics) + for (writer <- shuffle3.writers) { + writer.write("test3") + writer.write("test4") + } + for (writer <- shuffle3.writers) { + writer.commitAndClose() + } + // check before we register. + checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)).left.get) + shuffle3.releaseWriters(success = true) + checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)).left.get) + shuffleBlockManager.removeShuffle(1) + + } + + + def writeToFile(file: File, numBytes: Int) { + val writer = new FileWriter(file, true) + for (i <- 0 until numBytes) writer.write(i) + writer.close() + } +} diff --git a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala index fbfcb5156d..3c86f6bafc 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala @@ -60,11 +60,11 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { } // 3rd block is going to fail - doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(0)), any()) - doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(1)), any()) - doAnswer(answer).when(blockManager).getLocalFromDisk(meq(blIds(2)), any()) - doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(3)), any()) - doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(4)), any()) + doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(0)), any()) + doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(1)), any()) + doAnswer(answer).when(blockManager).getLocalShuffleFromDisk(meq(blIds(2)), any()) + doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(3)), any()) + doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(4)), any()) val bmId = BlockManagerId("test-client", "test-client", 1) val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])]( @@ -76,24 +76,24 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { iterator.initialize() - // Without exhausting the iterator, the iterator should be lazy and not call getLocalFromDisk. - verify(blockManager, times(0)).getLocalFromDisk(any(), any()) + // Without exhausting the iterator, the iterator should be lazy and not call getLocalShuffleFromDisk. + verify(blockManager, times(0)).getLocalShuffleFromDisk(any(), any()) assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements") // the 2nd element of the tuple returned by iterator.next should be defined when fetching successfully assert(iterator.next()._2.isDefined, "1st element should be defined but is not actually defined") - verify(blockManager, times(1)).getLocalFromDisk(any(), any()) + verify(blockManager, times(1)).getLocalShuffleFromDisk(any(), any()) assert(iterator.hasNext, "iterator should have 5 elements but actually has 1 element") assert(iterator.next()._2.isDefined, "2nd element should be defined but is not actually defined") - verify(blockManager, times(2)).getLocalFromDisk(any(), any()) + verify(blockManager, times(2)).getLocalShuffleFromDisk(any(), any()) assert(iterator.hasNext, "iterator should have 5 elements but actually has 2 elements") // 3rd fetch should be failed intercept[Exception] { iterator.next() } - verify(blockManager, times(3)).getLocalFromDisk(any(), any()) + verify(blockManager, times(3)).getLocalShuffleFromDisk(any(), any()) } @@ -115,11 +115,11 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { val optItr = mock(classOf[Option[Iterator[Any]]]) // All blocks should be fetched successfully - doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(0)), any()) - doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(1)), any()) - doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(2)), any()) - doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(3)), any()) - doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(4)), any()) + doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(0)), any()) + doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(1)), any()) + doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(2)), any()) + doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(3)), any()) + doReturn(optItr).when(blockManager).getLocalShuffleFromDisk(meq(blIds(4)), any()) val bmId = BlockManagerId("test-client", "test-client", 1) val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])]( @@ -131,8 +131,8 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { iterator.initialize() - // Without exhausting the iterator, the iterator should be lazy and not call getLocalFromDisk. - verify(blockManager, times(0)).getLocalFromDisk(any(), any()) + // Without exhausting the iterator, the iterator should be lazy and not call getLocalShuffleFromDisk. + verify(blockManager, times(0)).getLocalShuffleFromDisk(any(), any()) assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements") assert(iterator.next._2.isDefined, "All elements should be defined but 1st element is not actually defined") @@ -145,7 +145,7 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { assert(iterator.hasNext, "iterator should have 5 elements but actually has 4 elements") assert(iterator.next._2.isDefined, "All elements should be defined but 5th element is not actually defined") - verify(blockManager, times(5)).getLocalFromDisk(any(), any()) + verify(blockManager, times(5)).getLocalShuffleFromDisk(any(), any()) } test("block fetch from remote fails using BasicBlockFetcherIterator") { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index bdcea07e57..14ffadab99 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -49,6 +49,7 @@ import scala.concurrent.Await import scala.concurrent.duration._ import scala.language.implicitConversions import scala.language.postfixOps +import org.apache.spark.shuffle.ShuffleBlockManager class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter with PrivateMethodTester { @@ -823,11 +824,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter // be nice to refactor classes involved in disk storage in a way that // allows for easier testing. val blockManager = mock(classOf[BlockManager]) - val shuffleBlockManager = mock(classOf[ShuffleBlockManager]) - when(shuffleBlockManager.conf).thenReturn(conf) - val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf) - when(blockManager.conf).thenReturn(conf.clone.set(confKey, 0.toString)) + val diskBlockManager = new DiskBlockManager(blockManager, conf) + val diskStoreMapped = new DiskStore(blockManager, diskBlockManager) diskStoreMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY) val mapped = diskStoreMapped.getBytes(blockId).get diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index aabaeadd7a..26082ded8c 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -26,6 +26,7 @@ import scala.language.reflectiveCalls import akka.actor.Props import com.google.common.io.Files +import org.mockito.Mockito.{mock, when} import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} import org.apache.spark.SparkConf @@ -40,18 +41,8 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before private var rootDir1: File = _ private var rootDirs: String = _ - // This suite focuses primarily on consolidation features, - // so we coerce consolidation if not already enabled. - testConf.set("spark.shuffle.consolidateFiles", "true") - - private val shuffleManager = new HashShuffleManager(testConf.clone) - - val shuffleBlockManager = new ShuffleBlockManager(null, shuffleManager) { - override def conf = testConf.clone - var idToSegmentMap = mutable.Map[ShuffleBlockId, FileSegment]() - override def getBlockLocation(id: ShuffleBlockId) = idToSegmentMap(id) - } - + val blockManager = mock(classOf[BlockManager]) + when(blockManager.conf).thenReturn(testConf) var diskBlockManager: DiskBlockManager = _ override def beforeAll() { @@ -73,22 +64,17 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before override def beforeEach() { val conf = testConf.clone conf.set("spark.local.dir", rootDirs) - diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf) - shuffleBlockManager.idToSegmentMap.clear() + diskBlockManager = new DiskBlockManager(blockManager, conf) } override def afterEach() { diskBlockManager.stop() - shuffleBlockManager.idToSegmentMap.clear() } test("basic block creation") { val blockId = new TestBlockId("test") - assertSegmentEquals(blockId, blockId.name, 0, 0) - val newFile = diskBlockManager.getFile(blockId) writeToFile(newFile, 10) - assertSegmentEquals(blockId, blockId.name, 0, 10) assert(diskBlockManager.containsBlock(blockId)) newFile.delete() assert(!diskBlockManager.containsBlock(blockId)) @@ -101,127 +87,6 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before assert(diskBlockManager.getAllBlocks.toSet === ids.toSet) } - test("block appending") { - val blockId = new TestBlockId("test") - val newFile = diskBlockManager.getFile(blockId) - writeToFile(newFile, 15) - assertSegmentEquals(blockId, blockId.name, 0, 15) - val newFile2 = diskBlockManager.getFile(blockId) - assert(newFile === newFile2) - writeToFile(newFile2, 12) - assertSegmentEquals(blockId, blockId.name, 0, 27) - newFile.delete() - } - - test("block remapping") { - val filename = "test" - val blockId0 = new ShuffleBlockId(1, 2, 3) - val newFile = diskBlockManager.getFile(filename) - writeToFile(newFile, 15) - shuffleBlockManager.idToSegmentMap(blockId0) = new FileSegment(newFile, 0, 15) - assertSegmentEquals(blockId0, filename, 0, 15) - - val blockId1 = new ShuffleBlockId(1, 2, 4) - val newFile2 = diskBlockManager.getFile(filename) - writeToFile(newFile2, 12) - shuffleBlockManager.idToSegmentMap(blockId1) = new FileSegment(newFile, 15, 12) - assertSegmentEquals(blockId1, filename, 15, 12) - - assert(newFile === newFile2) - newFile.delete() - } - - private def checkSegments(segment1: FileSegment, segment2: FileSegment) { - assert (segment1.file.getCanonicalPath === segment2.file.getCanonicalPath) - assert (segment1.offset === segment2.offset) - assert (segment1.length === segment2.length) - } - - test("consolidated shuffle can write to shuffle group without messing existing offsets/lengths") { - - val serializer = new JavaSerializer(testConf) - val confCopy = testConf.clone - // reset after EACH object write. This is to ensure that there are bytes appended after - // an object is written. So if the codepaths assume writeObject is end of data, this should - // flush those bugs out. This was common bug in ExternalAppendOnlyMap, etc. - confCopy.set("spark.serializer.objectStreamReset", "1") - - val securityManager = new org.apache.spark.SecurityManager(confCopy) - // Do not use the shuffleBlockManager above ! - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, confCopy, - securityManager) - val master = new BlockManagerMaster( - actorSystem.actorOf(Props(new BlockManagerMasterActor(true, confCopy, new LiveListenerBus))), - confCopy) - val store = new BlockManager("<driver>", actorSystem, master , serializer, confCopy, - securityManager, null, shuffleManager) - - try { - - val shuffleManager = store.shuffleBlockManager - - val shuffle1 = shuffleManager.forMapTask(1, 1, 1, serializer, new ShuffleWriteMetrics) - for (writer <- shuffle1.writers) { - writer.write("test1") - writer.write("test2") - } - for (writer <- shuffle1.writers) { - writer.commitAndClose() - } - - val shuffle1Segment = shuffle1.writers(0).fileSegment() - shuffle1.releaseWriters(success = true) - - val shuffle2 = shuffleManager.forMapTask(1, 2, 1, new JavaSerializer(testConf), - new ShuffleWriteMetrics) - - for (writer <- shuffle2.writers) { - writer.write("test3") - writer.write("test4") - } - for (writer <- shuffle2.writers) { - writer.commitAndClose() - } - val shuffle2Segment = shuffle2.writers(0).fileSegment() - shuffle2.releaseWriters(success = true) - - // Now comes the test : - // Write to shuffle 3; and close it, but before registering it, check if the file lengths for - // previous task (forof shuffle1) is the same as 'segments'. Earlier, we were inferring length - // of block based on remaining data in file : which could mess things up when there is concurrent read - // and writes happening to the same shuffle group. - - val shuffle3 = shuffleManager.forMapTask(1, 3, 1, new JavaSerializer(testConf), - new ShuffleWriteMetrics) - for (writer <- shuffle3.writers) { - writer.write("test3") - writer.write("test4") - } - for (writer <- shuffle3.writers) { - writer.commitAndClose() - } - // check before we register. - checkSegments(shuffle2Segment, shuffleManager.getBlockLocation(ShuffleBlockId(1, 2, 0))) - shuffle3.releaseWriters(success = true) - checkSegments(shuffle2Segment, shuffleManager.getBlockLocation(ShuffleBlockId(1, 2, 0))) - shuffleManager.removeShuffle(1) - } finally { - - if (store != null) { - store.stop() - } - actorSystem.shutdown() - actorSystem.awaitTermination() - } - } - - def assertSegmentEquals(blockId: BlockId, filename: String, offset: Int, length: Int) { - val segment = diskBlockManager.getBlockLocation(blockId) - assert(segment.file.getName === filename) - assert(segment.offset === offset) - assert(segment.length === length) - } - def writeToFile(file: File, numBytes: Int) { val writer = new FileWriter(file, true) for (i <- 0 until numBytes) writer.write(i) |