aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-10-10 01:44:36 -0700
committerReynold Xin <rxin@apache.org>2014-10-10 01:44:36 -0700
commit90f73fcc47c7bf881f808653d46a9936f37c3c31 (patch)
treeb1be2d788a28d267bf4fa96eb467954de18aae63
parent411cf29fff011561f0093bb6101af87842828369 (diff)
downloadspark-90f73fcc47c7bf881f808653d46a9936f37c3c31.tar.gz
spark-90f73fcc47c7bf881f808653d46a9936f37c3c31.tar.bz2
spark-90f73fcc47c7bf881f808653d46a9936f37c3c31.zip
[SPARK-3889] Attempt to avoid SIGBUS by not mmapping files in ConnectionManager
In general, individual shuffle blocks are frequently small, so mmapping them often creates a lot of waste. It may not be bad to mmap the larger ones, but it is pretty inconvenient to get configuration into ManagedBuffer, and besides it is unlikely to help all that much. Author: Aaron Davidson <aaron@databricks.com> Closes #2742 from aarondav/mmap and squashes the following commits: a152065 [Aaron Davidson] Add other pathway back 52b6cd2 [Aaron Davidson] [SPARK-3889] Attempt to avoid SIGBUS by not mmapping files in ConnectionManager
-rw-r--r--core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala16
1 files changed, 15 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
index a4409181ec..4c9ca97a2a 100644
--- a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
@@ -66,13 +66,27 @@ sealed abstract class ManagedBuffer {
final class FileSegmentManagedBuffer(val file: File, val offset: Long, val length: Long)
extends ManagedBuffer {
+ /**
+ * Memory mapping is expensive and can destabilize the JVM (SPARK-1145, SPARK-3889).
+ * Avoid unless there's a good reason not to.
+ */
+ private val MIN_MEMORY_MAP_BYTES = 2 * 1024 * 1024;
+
override def size: Long = length
override def nioByteBuffer(): ByteBuffer = {
var channel: FileChannel = null
try {
channel = new RandomAccessFile(file, "r").getChannel
- channel.map(MapMode.READ_ONLY, offset, length)
+ // Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.
+ if (length < MIN_MEMORY_MAP_BYTES) {
+ val buf = ByteBuffer.allocate(length.toInt)
+ channel.read(buf, offset)
+ buf.flip()
+ buf
+ } else {
+ channel.map(MapMode.READ_ONLY, offset, length)
+ }
} catch {
case e: IOException =>
Try(channel.size).toOption match {