aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-04-27 17:40:56 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-27 17:40:56 -0700
commit6b3c6e5dd8e74435f71ecdb224db532550ef407b (patch)
tree9ff06e8ce1d8dd01b2ccb5e4820adbdd02310298
parent3d9fb09681308abd2066d0d02f2438f5a17c9dd9 (diff)
downloadspark-6b3c6e5dd8e74435f71ecdb224db532550ef407b.tar.gz
spark-6b3c6e5dd8e74435f71ecdb224db532550ef407b.tar.bz2
spark-6b3c6e5dd8e74435f71ecdb224db532550ef407b.zip
SPARK-1145: Memory mapping with many small blocks can cause JVM allocation failures
This includes some minor code clean-up as well. The main change is that small files are not memory mapped. There is a nicer way to write that code block using Scala's `Try` but to make it easy to back port and as simple as possible, I opted for the more explicit but less pretty format. Author: Patrick Wendell <pwendell@gmail.com> Closes #43 from pwendell/block-iter-logging and squashes the following commits: 1cff512 [Patrick Wendell] Small issue from merge. 49f6c269 [Patrick Wendell] Merge remote-tracking branch 'apache/master' into block-iter-logging 4943351 [Patrick Wendell] Added a test and feedback on mateis review a637a18 [Patrick Wendell] Review feedback and adding rewind() when reading byte buffers. b76b95f [Patrick Wendell] Review feedback 4e1514e [Patrick Wendell] Don't memory map for small files d238b88 [Patrick Wendell] Some logging and clean-up
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskStore.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala58
-rw-r--r--docs/configuration.md9
6 files changed, 91 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index ace9cd51c9..a02dd9441d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -148,6 +148,12 @@ object BlockFetcherIterator {
}
protected def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
+ // Make remote requests at most maxBytesInFlight / 5 in length; the reason to keep them
+ // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5
+ // nodes, rather than blocking on reading output from one node.
+ val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
+ logInfo("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize)
+
// Split local and remote blocks. Remote blocks are further split into FetchRequests of size
// at most maxBytesInFlight in order to limit the amount of data in flight.
val remoteRequests = new ArrayBuffer[FetchRequest]
@@ -159,11 +165,6 @@ object BlockFetcherIterator {
_numBlocksToFetch += localBlocksToFetch.size
} else {
numRemote += blockInfos.size
- // Make our requests at least maxBytesInFlight / 5 in length; the reason to keep them
- // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5
- // nodes, rather than blocking on reading output from one node.
- val minRequestSize = math.max(maxBytesInFlight / 5, 1L)
- logInfo("maxBytesInFlight: " + maxBytesInFlight + ", minRequest: " + minRequestSize)
val iterator = blockInfos.iterator
var curRequestSize = 0L
var curBlocks = new ArrayBuffer[(BlockId, Long)]
@@ -178,11 +179,12 @@ object BlockFetcherIterator {
} else if (size < 0) {
throw new BlockException(blockId, "Negative block size " + size)
}
- if (curRequestSize >= minRequestSize) {
+ if (curRequestSize >= targetRequestSize) {
// Add this FetchRequest
remoteRequests += new FetchRequest(address, curBlocks)
curRequestSize = 0
curBlocks = new ArrayBuffer[(BlockId, Long)]
+ logDebug(s"Creating fetch request of $curRequestSize at $address")
}
}
// Add in the final request
@@ -191,7 +193,7 @@ object BlockFetcherIterator {
}
}
}
- logInfo("Getting " + _numBlocksToFetch + " non-zero-bytes blocks out of " +
+ logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " +
totalBlocks + " blocks")
remoteRequests
}
@@ -226,8 +228,8 @@ object BlockFetcherIterator {
sendRequest(fetchRequests.dequeue())
}
- val numGets = remoteRequests.size - fetchRequests.size
- logInfo("Started " + numGets + " remote gets in " + Utils.getUsedTimeMs(startTime))
+ val numFetches = remoteRequests.size - fetchRequests.size
+ logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))
// Get Local Blocks
startTime = System.currentTimeMillis
@@ -327,7 +329,7 @@ object BlockFetcherIterator {
}
copiers = startCopiers(conf.getInt("spark.shuffle.copier.threads", 6))
- logInfo("Started " + fetchRequestsSync.size + " remote gets in " +
+ logInfo("Started " + fetchRequestsSync.size + " remote fetches in " +
Utils.getUsedTimeMs(startTime))
// Get Local Blocks
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 02ba5ecf52..6d7d4f922e 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -46,11 +46,12 @@ private[spark] class BlockManager(
val master: BlockManagerMaster,
val defaultSerializer: Serializer,
maxMemory: Long,
- val conf: SparkConf,
+ val _conf: SparkConf,
securityManager: SecurityManager,
mapOutputTracker: MapOutputTracker)
extends Logging {
+ def conf = _conf
val shuffleBlockManager = new ShuffleBlockManager(this)
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 36ee4bcc41..0ab9fad422 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -33,6 +33,8 @@ import org.apache.spark.util.Utils
private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager)
extends BlockStore(blockManager) with Logging {
+ val minMemoryMapBytes = blockManager.conf.getLong("spark.storage.memoryMapThreshold", 2 * 4096L)
+
override def getSize(blockId: BlockId): Long = {
diskManager.getBlockLocation(blockId).length
}
@@ -94,12 +96,20 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
val segment = diskManager.getBlockLocation(blockId)
val channel = new RandomAccessFile(segment.file, "r").getChannel()
- val buffer = try {
- channel.map(MapMode.READ_ONLY, segment.offset, segment.length)
+
+ try {
+ // For small files, directly read rather than memory map
+ if (segment.length < minMemoryMapBytes) {
+ val buf = ByteBuffer.allocate(segment.length.toInt)
+ channel.read(buf, segment.offset)
+ buf.flip()
+ Some(buf)
+ } else {
+ Some(channel.map(MapMode.READ_ONLY, segment.offset, segment.length))
+ }
} finally {
channel.close()
}
- Some(buffer)
}
override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 5a55e7df34..b678604ff8 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -553,8 +553,7 @@ private[spark] object Utils extends Logging {
}
/**
- * Return the string to tell how long has passed in seconds. The passing parameter should be in
- * millisecond.
+ * Return the string to tell how long has passed in milliseconds.
*/
def getUsedTimeMs(startTimeMs: Long): String = {
" " + (System.currentTimeMillis - startTimeMs) + " ms"
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 907428db80..00deecc1c3 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -17,12 +17,15 @@
package org.apache.spark.storage
-import java.nio.ByteBuffer
+import java.nio.{ByteBuffer, MappedByteBuffer}
+import java.util.Arrays
import akka.actor._
-import org.scalatest.BeforeAndAfter
-import org.scalatest.FunSuite
-import org.scalatest.PrivateMethodTester
+import org.apache.spark.SparkConf
+import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
+import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils}
+import org.mockito.Mockito.{mock, when}
+import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester}
import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.Timeouts._
import org.scalatest.matchers.ShouldMatchers._
@@ -785,6 +788,53 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
}
+ test("reads of memory-mapped and non memory-mapped files are equivalent") {
+ val confKey = "spark.storage.memoryMapThreshold"
+
+ // Create a non-trivial (not all zeros) byte array
+ var counter = 0.toByte
+ def incr = {counter = (counter + 1).toByte; counter;}
+ val bytes = Array.fill[Byte](1000)(incr)
+ val byteBuffer = ByteBuffer.wrap(bytes)
+
+ val blockId = BlockId("rdd_1_2")
+
+ // This sequence of mocks makes these tests fairly brittle. It would
+ // be nice to refactor classes involved in disk storage in a way that
+ // allows for easier testing.
+ val blockManager = mock(classOf[BlockManager])
+ val shuffleBlockManager = mock(classOf[ShuffleBlockManager])
+ when(shuffleBlockManager.conf).thenReturn(conf)
+ val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
+ System.getProperty("java.io.tmpdir"))
+
+ when(blockManager.conf).thenReturn(conf.clone.set(confKey, 0.toString))
+ val diskStoreMapped = new DiskStore(blockManager, diskBlockManager)
+ diskStoreMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY)
+ val mapped = diskStoreMapped.getBytes(blockId).get
+
+ when(blockManager.conf).thenReturn(conf.clone.set(confKey, (1000 * 1000).toString))
+ val diskStoreNotMapped = new DiskStore(blockManager, diskBlockManager)
+ diskStoreNotMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY)
+ val notMapped = diskStoreNotMapped.getBytes(blockId).get
+
+ // Not possible to do isInstanceOf due to visibility of HeapByteBuffer
+ assert(notMapped.getClass.getName.endsWith("HeapByteBuffer"),
+ "Expected HeapByteBuffer for un-mapped read")
+ assert(mapped.isInstanceOf[MappedByteBuffer], "Expected MappedByteBuffer for mapped read")
+
+ def arrayFromByteBuffer(in: ByteBuffer): Array[Byte] = {
+ val array = new Array[Byte](in.remaining())
+ in.get(array)
+ array
+ }
+
+ val mappedAsArray = arrayFromByteBuffer(mapped)
+ val notMappedAsArray = arrayFromByteBuffer(notMapped)
+ assert(Arrays.equals(mappedAsArray, bytes))
+ assert(Arrays.equals(notMappedAsArray, bytes))
+ }
+
test("updated block statuses") {
store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
securityMgr, mapOutputTracker)
diff --git a/docs/configuration.md b/docs/configuration.md
index 8d3442625b..b078c7c111 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -132,6 +132,15 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td>spark.storage.memoryMapThreshold</td>
+ <td>8192</td>
+ <td>
+ Size of a block, in bytes, above which Spark memory maps when reading a block from disk.
+ This prevents Spark from memory mapping very small blocks. In general, memory
+ mapping has high overhead for blocks close to or below the page size of the operating system.
+ </td>
+</tr>
+<tr>
<td>spark.tachyonStore.baseDir</td>
<td>System.getProperty("java.io.tmpdir")</td>
<td>