aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-04-01 14:34:59 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-04-01 14:34:59 -0700
commite41acb757327e3226ffe312766ec759c16616588 (patch)
tree94d57d0c6c4d3177be7ecb541535c620de2c054c /core/src/main/scala/org/apache
parentbd7b91cefb0d192d808778e6182dcdd2c143e132 (diff)
downloadspark-e41acb757327e3226ffe312766ec759c16616588.tar.gz
spark-e41acb757327e3226ffe312766ec759c16616588.tar.bz2
spark-e41acb757327e3226ffe312766ec759c16616588.zip
[SPARK-13992] Add support for off-heap caching
This patch adds support for caching blocks in the executor processes using direct / off-heap memory. ## User-facing changes **Updated semantics of `OFF_HEAP` storage level**: In Spark 1.x, the `OFF_HEAP` storage level indicated that an RDD should be cached in Tachyon. Spark 2.x removed the external block store API that Tachyon caching was based on (see #10752 / SPARK-12667), so `OFF_HEAP` became an alias for `MEMORY_ONLY_SER`. As of this patch, `OFF_HEAP` means "serialized and cached in off-heap memory or on disk". Via the `StorageLevel` constructor, `useOffHeap` can be set if `serialized == true` and can be used to construct custom storage levels which support replication. **Storage UI reporting**: the storage UI will now report whether in-memory blocks are stored on- or off-heap. **Only supported by UnifiedMemoryManager**: for simplicity, this feature is only supported when the default UnifiedMemoryManager is used; applications which use the legacy memory manager (`spark.memory.useLegacyMode=true`) are not currently able to allocate off-heap storage memory, so using off-heap caching will fail with an error when legacy memory management is enabled. Given that we plan to eventually remove the legacy memory manager, this is not a significant restriction. **Memory management policies:** the policies for dividing available memory between execution and storage are the same for both on- and off-heap memory. For off-heap memory, the total amount of memory available for use by Spark is controlled by `spark.memory.offHeap.size`, which is an absolute size. Off-heap storage memory obeys `spark.memory.storageFraction` in order to control the amount of unevictable storage memory. For example, if `spark.memory.offHeap.size` is 1 gigabyte and Spark uses the default `storageFraction` of 0.5, then up to 500 megabytes of off-heap cached blocks will be protected from eviction due to execution memory pressure. If necessary, we can split `spark.memory.storageFraction` into separate on- and off-heap configurations, but this doesn't seem necessary now and can be done later without any breaking changes. **Use of off-heap memory does not imply use of off-heap execution (or vice-versa)**: for now, the settings controlling the use of off-heap execution memory (`spark.memory.offHeap.enabled`) and off-heap caching are completely independent, so Spark SQL can be configured to use off-heap memory for execution while continuing to cache blocks on-heap. If desired, we can change this in a followup patch so that `spark.memory.offHeap.enabled` affect the default storage level for cached SQL tables. ## Internal changes - Rename `ByteArrayChunkOutputStream` to `ChunkedByteBufferOutputStream` - It now returns a `ChunkedByteBuffer` instead of an array of byte arrays. - Its constructor now accept an `allocator` function which is called to allocate `ByteBuffer`s. This allows us to control whether it allocates regular ByteBuffers or off-heap DirectByteBuffers. - Because block serialization is now performed during the unroll process, a `ChunkedByteBufferOutputStream` which is configured with a `DirectByteBuffer` allocator will use off-heap memory for both unroll and storage memory. - The `MemoryStore`'s MemoryEntries now tracks whether blocks are stored on- or off-heap. - `evictBlocksToFreeSpace()` now accepts a `MemoryMode` parameter so that we don't try to evict off-heap blocks in response to on-heap memory pressure (or vice-versa). - Make sure that off-heap buffers are properly de-allocated during MemoryStore eviction. - The JVM limits the total size of allocated direct byte buffers using the `-XX:MaxDirectMemorySize` flag and the default tends to be fairly low (< 512 megabytes in some JVMs). To work around this limitation, this patch adds a custom DirectByteBuffer allocator which ignores this memory limit. Author: Josh Rosen <joshrosen@databricks.com> Closes #11805 from JoshRosen/off-heap-caching.
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala70
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageLevel.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala153
-rw-r--r--core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferOutputStream.scala (renamed from core/src/main/scala/org/apache/spark/util/io/ByteArrayChunkOutputStream.scala)38
10 files changed, 223 insertions, 126 deletions
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index e5e6a9e4a8..632b0ae9c2 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -30,7 +30,7 @@ import org.apache.spark.io.CompressionCodec
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.{BlockId, BroadcastBlockId, StorageLevel}
import org.apache.spark.util.{ByteBufferInputStream, Utils}
-import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer}
+import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}
/**
* A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
@@ -228,12 +228,12 @@ private object TorrentBroadcast extends Logging {
blockSize: Int,
serializer: Serializer,
compressionCodec: Option[CompressionCodec]): Array[ByteBuffer] = {
- val bos = new ByteArrayChunkOutputStream(blockSize)
- val out: OutputStream = compressionCodec.map(c => c.compressedOutputStream(bos)).getOrElse(bos)
+ val cbbos = new ChunkedByteBufferOutputStream(blockSize, ByteBuffer.allocate)
+ val out = compressionCodec.map(c => c.compressedOutputStream(cbbos)).getOrElse(cbbos)
val ser = serializer.newInstance()
val serOut = ser.serializeStream(out)
serOut.writeObject[T](obj).close()
- bos.toArrays.map(ByteBuffer.wrap)
+ cbbos.toChunkedByteBuffer.getChunks()
}
def unBlockifyObject[T: ClassTag](
diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
index a67e8da26b..0b552cabfc 100644
--- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
@@ -35,6 +35,11 @@ private[memory] class StorageMemoryPool(
memoryMode: MemoryMode
) extends MemoryPool(lock) with Logging {
+ private[this] val poolName: String = memoryMode match {
+ case MemoryMode.ON_HEAP => "on-heap storage"
+ case MemoryMode.OFF_HEAP => "off-heap storage"
+ }
+
@GuardedBy("lock")
private[this] var _memoryUsed: Long = 0L
@@ -60,7 +65,7 @@ private[memory] class StorageMemoryPool(
/**
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
- *
+ *
* @return whether all N bytes were successfully granted.
*/
def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized {
@@ -83,9 +88,8 @@ private[memory] class StorageMemoryPool(
assert(numBytesToAcquire >= 0)
assert(numBytesToFree >= 0)
assert(memoryUsed <= poolSize)
- // Once we support off-heap caching, this will need to change:
- if (numBytesToFree > 0 && memoryMode == MemoryMode.ON_HEAP) {
- memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree)
+ if (numBytesToFree > 0) {
+ memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode)
}
// NOTE: If the memory store evicts blocks, then those evictions will synchronously call
// back into this StorageMemoryPool in order to free memory. Therefore, these variables
@@ -122,14 +126,8 @@ private[memory] class StorageMemoryPool(
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
if (remainingSpaceToFree > 0) {
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
- val spaceFreedByEviction = {
- // Once we support off-heap caching, this will need to change:
- if (memoryMode == MemoryMode.ON_HEAP) {
- memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree)
- } else {
- 0
- }
- }
+ val spaceFreedByEviction =
+ memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode)
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
decrementPoolSize(spaceFreedByEviction)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index d2b8ca90a9..46c64f61de 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable.HashMap
import org.apache.spark.{Accumulator, SparkEnv, TaskContext, TaskContextImpl}
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.memory.TaskMemoryManager
+import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Utils}
@@ -90,7 +90,8 @@ private[spark] abstract class Task[T](
try {
Utils.tryLogNonFatalError {
// Release memory used by this thread for unrolling blocks
- SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask()
+ SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)
+ SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP)
// Notify any tasks waiting for execution memory to be freed to wake up and try to
// acquire memory again. This makes impossible the scenario where a task sleeps forever
// because there are no other tasks left to notify it. Since this is safe to do but may
diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
index 27e5fa4c2b..745ef12691 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
@@ -25,7 +25,7 @@ import scala.reflect.ClassTag
import org.apache.spark.SparkConf
import org.apache.spark.io.CompressionCodec
import org.apache.spark.storage._
-import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer}
+import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}
/**
* Component which configures serialization and compression for various Spark components, including
@@ -128,17 +128,9 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar
/** Serializes into a chunked byte buffer. */
def dataSerialize[T: ClassTag](blockId: BlockId, values: Iterator[T]): ChunkedByteBuffer = {
- val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(1024 * 1024 * 4)
- dataSerializeStream(blockId, byteArrayChunkOutputStream, values)
- new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap))
- }
-
- /**
- * Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of
- * the iterator is reached.
- */
- def dataDeserialize[T: ClassTag](blockId: BlockId, bytes: ChunkedByteBuffer): Iterator[T] = {
- dataDeserializeStream[T](blockId, bytes.toInputStream(dispose = true))
+ val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, ByteBuffer.allocate)
+ dataSerializeStream(blockId, bbos, values)
+ bbos.toChunkedByteBuffer
}
/**
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 3014cafc28..9608418b43 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -18,6 +18,7 @@
package org.apache.spark.storage
import java.io._
+import java.nio.ByteBuffer
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.{Await, ExecutionContext, Future}
@@ -39,6 +40,7 @@ import org.apache.spark.rpc.RpcEnv
import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.storage.memory._
+import org.apache.spark.unsafe.Platform
import org.apache.spark.util._
import org.apache.spark.util.io.ChunkedByteBuffer
@@ -372,8 +374,12 @@ private[spark] class BlockManager(
val onDisk = level.useDisk && diskStore.contains(blockId)
val deserialized = if (inMem) level.deserialized else false
val replication = if (inMem || onDisk) level.replication else 1
- val storageLevel =
- StorageLevel(onDisk, inMem, deserialized, replication)
+ val storageLevel = StorageLevel(
+ useDisk = onDisk,
+ useMemory = inMem,
+ useOffHeap = level.useOffHeap,
+ deserialized = deserialized,
+ replication = replication)
val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
BlockStatus(storageLevel, memSize, diskSize)
@@ -407,8 +413,8 @@ private[spark] class BlockManager(
val iter: Iterator[Any] = if (level.deserialized) {
memoryStore.getValues(blockId).get
} else {
- serializerManager.dataDeserialize(
- blockId, memoryStore.getBytes(blockId).get)(info.classTag)
+ serializerManager.dataDeserializeStream(
+ blockId, memoryStore.getBytes(blockId).get.toInputStream())(info.classTag)
}
val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
@@ -416,11 +422,15 @@ private[spark] class BlockManager(
val iterToReturn: Iterator[Any] = {
val diskBytes = diskStore.getBytes(blockId)
if (level.deserialized) {
- val diskValues = serializerManager.dataDeserialize(blockId, diskBytes)(info.classTag)
+ val diskValues = serializerManager.dataDeserializeStream(
+ blockId,
+ diskBytes.toInputStream(dispose = true))(info.classTag)
maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
} else {
- val bytes = maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes)
- serializerManager.dataDeserialize(blockId, bytes)(info.classTag)
+ val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes)
+ .map {_.toInputStream(dispose = false)}
+ .getOrElse { diskBytes.toInputStream(dispose = true) }
+ serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
}
}
val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId))
@@ -481,7 +491,8 @@ private[spark] class BlockManager(
if (level.useMemory && memoryStore.contains(blockId)) {
memoryStore.getBytes(blockId).get
} else if (level.useDisk && diskStore.contains(blockId)) {
- maybeCacheDiskBytesInMemory(info, blockId, level, diskStore.getBytes(blockId))
+ val diskBytes = diskStore.getBytes(blockId)
+ maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes).getOrElse(diskBytes)
} else {
releaseLock(blockId)
throw new SparkException(s"Block $blockId was not found even though it's read-locked")
@@ -496,8 +507,9 @@ private[spark] class BlockManager(
*/
private def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
getRemoteBytes(blockId).map { data =>
- new BlockResult(
- serializerManager.dataDeserialize(blockId, data), DataReadMethod.Network, data.size)
+ val values =
+ serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))
+ new BlockResult(values, DataReadMethod.Network, data.size)
}
}
@@ -745,7 +757,8 @@ private[spark] class BlockManager(
// Put it in memory first, even if it also has useDisk set to true;
// We will drop it to disk later if the memory store can't hold it.
val putSucceeded = if (level.deserialized) {
- val values = serializerManager.dataDeserialize(blockId, bytes)(classTag)
+ val values =
+ serializerManager.dataDeserializeStream(blockId, bytes.toInputStream())(classTag)
memoryStore.putIteratorAsValues(blockId, values, classTag) match {
case Right(_) => true
case Left(iter) =>
@@ -755,7 +768,7 @@ private[spark] class BlockManager(
false
}
} else {
- memoryStore.putBytes(blockId, size, () => bytes)
+ memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes)
}
if (!putSucceeded && level.useDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
@@ -893,7 +906,7 @@ private[spark] class BlockManager(
}
}
} else { // !level.deserialized
- memoryStore.putIteratorAsBytes(blockId, iterator(), classTag) match {
+ memoryStore.putIteratorAsBytes(blockId, iterator(), classTag, level.memoryMode) match {
case Right(s) =>
size = s
case Left(partiallySerializedValues) =>
@@ -951,14 +964,16 @@ private[spark] class BlockManager(
* Attempts to cache spilled bytes read from disk into the MemoryStore in order to speed up
* subsequent reads. This method requires the caller to hold a read lock on the block.
*
- * @return a copy of the bytes. The original bytes passed this method should no longer
- * be used after this method returns.
+ * @return a copy of the bytes from the memory store if the put succeeded, otherwise None.
+ * If this returns bytes from the memory store then the original disk store bytes will
+ * automatically be disposed and the caller should not continue to use them. Otherwise,
+ * if this returns None then the original disk store bytes will be unaffected.
*/
private def maybeCacheDiskBytesInMemory(
blockInfo: BlockInfo,
blockId: BlockId,
level: StorageLevel,
- diskBytes: ChunkedByteBuffer): ChunkedByteBuffer = {
+ diskBytes: ChunkedByteBuffer): Option[ChunkedByteBuffer] = {
require(!level.deserialized)
if (level.useMemory) {
// Synchronize on blockInfo to guard against a race condition where two readers both try to
@@ -966,25 +981,29 @@ private[spark] class BlockManager(
blockInfo.synchronized {
if (memoryStore.contains(blockId)) {
diskBytes.dispose()
- memoryStore.getBytes(blockId).get
+ Some(memoryStore.getBytes(blockId).get)
} else {
- val putSucceeded = memoryStore.putBytes(blockId, diskBytes.size, () => {
+ val allocator = level.memoryMode match {
+ case MemoryMode.ON_HEAP => ByteBuffer.allocate _
+ case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
+ }
+ val putSucceeded = memoryStore.putBytes(blockId, diskBytes.size, level.memoryMode, () => {
// https://issues.apache.org/jira/browse/SPARK-6076
// If the file size is bigger than the free memory, OOM will happen. So if we
// cannot put it into MemoryStore, copyForMemory should not be created. That's why
// this action is put into a `() => ChunkedByteBuffer` and created lazily.
- diskBytes.copy()
+ diskBytes.copy(allocator)
})
if (putSucceeded) {
diskBytes.dispose()
- memoryStore.getBytes(blockId).get
+ Some(memoryStore.getBytes(blockId).get)
} else {
- diskBytes
+ None
}
}
}
} else {
- diskBytes
+ None
}
}
@@ -1055,7 +1074,12 @@ private[spark] class BlockManager(
val peersForReplication = new ArrayBuffer[BlockManagerId]
val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
- val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
+ val tLevel = StorageLevel(
+ useDisk = level.useDisk,
+ useMemory = level.useMemory,
+ useOffHeap = level.useOffHeap,
+ deserialized = level.deserialized,
+ replication = 1)
val startTime = System.currentTimeMillis
val random = new Random(blockId.hashCode)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index d2a5c69e15..8fa1215011 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -453,7 +453,7 @@ private[spark] class BlockManagerInfo(
}
if (storageLevel.isValid) {
- /* isValid means it is either stored in-memory, on-disk or on-externalBlockStore.
+ /* isValid means it is either stored in-memory or on-disk.
* The memSize here indicates the data size in or dropped from memory,
* externalBlockStoreSize here indicates the data size in or dropped from externalBlockStore,
* and the diskSize here indicates the data size in or dropped to disk.
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 7d23295e25..216ec07934 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -60,10 +60,7 @@ class StorageLevel private(
assert(replication < 40, "Replication restricted to be less than 40 for calculating hash codes")
if (useOffHeap) {
- require(!useDisk, "Off-heap storage level does not support using disk")
- require(!useMemory, "Off-heap storage level does not support using heap memory")
require(!deserialized, "Off-heap storage level does not support deserialized storage")
- require(replication == 1, "Off-heap storage level does not support multiple replication")
}
private[spark] def memoryMode: MemoryMode = {
@@ -86,7 +83,7 @@ class StorageLevel private(
false
}
- def isValid: Boolean = (useMemory || useDisk || useOffHeap) && (replication > 0)
+ def isValid: Boolean = (useMemory || useDisk) && (replication > 0)
def toInt: Int = {
var ret = 0
@@ -123,7 +120,8 @@ class StorageLevel private(
private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this)
override def toString: String = {
- s"StorageLevel($useDisk, $useMemory, $useOffHeap, $deserialized, $replication)"
+ s"StorageLevel(disk=$useDisk, memory=$useMemory, offheap=$useOffHeap, " +
+ s"deserialized=$deserialized, replication=$replication)"
}
override def hashCode(): Int = toInt * 41 + replication
@@ -131,8 +129,9 @@ class StorageLevel private(
def description: String = {
var result = ""
result += (if (useDisk) "Disk " else "")
- result += (if (useMemory) "Memory " else "")
- result += (if (useOffHeap) "ExternalBlockStore " else "")
+ if (useMemory) {
+ result += (if (useOffHeap) "Memory (off heap) " else "Memory ")
+ }
result += (if (deserialized) "Deserialized " else "Serialized ")
result += s"${replication}x Replicated"
result
@@ -156,9 +155,7 @@ object StorageLevel {
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
-
- // Redirect to MEMORY_ONLY_SER for now.
- val OFF_HEAP = MEMORY_ONLY_SER
+ val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
/**
* :: DeveloperApi ::
@@ -183,7 +180,7 @@ object StorageLevel {
/**
* :: DeveloperApi ::
- * Create a new StorageLevel object without setting useOffHeap.
+ * Create a new StorageLevel object.
*/
@DeveloperApi
def apply(
@@ -198,7 +195,7 @@ object StorageLevel {
/**
* :: DeveloperApi ::
- * Create a new StorageLevel object.
+ * Create a new StorageLevel object without setting useOffHeap.
*/
@DeveloperApi
def apply(
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 3ca41f32c1..df38d11e43 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -32,20 +32,25 @@ import org.apache.spark.internal.Logging
import org.apache.spark.memory.{MemoryManager, MemoryMode}
import org.apache.spark.serializer.{SerializationStream, SerializerManager}
import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel}
+import org.apache.spark.unsafe.Platform
import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils}
import org.apache.spark.util.collection.SizeTrackingVector
-import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer}
+import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}
private sealed trait MemoryEntry[T] {
def size: Long
+ def memoryMode: MemoryMode
def classTag: ClassTag[T]
}
private case class DeserializedMemoryEntry[T](
value: Array[T],
size: Long,
- classTag: ClassTag[T]) extends MemoryEntry[T]
+ classTag: ClassTag[T]) extends MemoryEntry[T] {
+ val memoryMode: MemoryMode = MemoryMode.ON_HEAP
+}
private case class SerializedMemoryEntry[T](
buffer: ChunkedByteBuffer,
+ memoryMode: MemoryMode,
classTag: ClassTag[T]) extends MemoryEntry[T] {
def size: Long = buffer.size
}
@@ -86,7 +91,10 @@ private[spark] class MemoryStore(
// A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes)
// All accesses of this map are assumed to have manually synchronized on `memoryManager`
- private val unrollMemoryMap = mutable.HashMap[Long, Long]()
+ private val onHeapUnrollMemoryMap = mutable.HashMap[Long, Long]()
+ // Note: off-heap unroll memory is only used in putIteratorAsBytes() because off-heap caching
+ // always stores serialized values.
+ private val offHeapUnrollMemoryMap = mutable.HashMap[Long, Long]()
// Initial memory to request before unrolling any block
private val unrollMemoryThreshold: Long =
@@ -131,13 +139,14 @@ private[spark] class MemoryStore(
def putBytes[T: ClassTag](
blockId: BlockId,
size: Long,
+ memoryMode: MemoryMode,
_bytes: () => ChunkedByteBuffer): Boolean = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
- if (memoryManager.acquireStorageMemory(blockId, size, MemoryMode.ON_HEAP)) {
+ if (memoryManager.acquireStorageMemory(blockId, size, memoryMode)) {
// We acquired enough memory for the block, so go ahead and put it
val bytes = _bytes()
assert(bytes.size == size)
- val entry = new SerializedMemoryEntry[T](bytes, implicitly[ClassTag[T]])
+ val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]])
entries.synchronized {
entries.put(blockId, entry)
}
@@ -190,7 +199,8 @@ private[spark] class MemoryStore(
var vector = new SizeTrackingVector[T]()(classTag)
// Request enough memory to begin unrolling
- keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
+ keepUnrolling =
+ reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, MemoryMode.ON_HEAP)
if (!keepUnrolling) {
logWarning(s"Failed to reserve initial memory threshold of " +
@@ -207,7 +217,8 @@ private[spark] class MemoryStore(
val currentSize = vector.estimateSize()
if (currentSize >= memoryThreshold) {
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
- keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
+ keepUnrolling =
+ reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP)
if (keepUnrolling) {
unrollMemoryUsedByThisBlock += amountToRequest
}
@@ -228,7 +239,7 @@ private[spark] class MemoryStore(
def transferUnrollToStorage(amount: Long): Unit = {
// Synchronize so that transfer is atomic
memoryManager.synchronized {
- releaseUnrollMemoryForThisTask(amount)
+ releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)
val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP)
assert(success, "transferring unroll memory to storage memory failed")
}
@@ -247,7 +258,7 @@ private[spark] class MemoryStore(
// If this task attempt already owns more unroll memory than is necessary to store the
// block, then release the extra memory that will not be used.
val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
- releaseUnrollMemoryForThisTask(excessUnrollMemory)
+ releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory)
transferUnrollToStorage(size)
true
}
@@ -295,10 +306,16 @@ private[spark] class MemoryStore(
private[storage] def putIteratorAsBytes[T](
blockId: BlockId,
values: Iterator[T],
- classTag: ClassTag[T]): Either[PartiallySerializedBlock[T], Long] = {
+ classTag: ClassTag[T],
+ memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long] = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
+ val allocator = memoryMode match {
+ case MemoryMode.ON_HEAP => ByteBuffer.allocate _
+ case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
+ }
+
// Whether there is still enough memory for us to continue unrolling this block
var keepUnrolling = true
// Initial per-task memory to request for unrolling blocks (bytes).
@@ -307,15 +324,15 @@ private[spark] class MemoryStore(
var unrollMemoryUsedByThisBlock = 0L
// Underlying buffer for unrolling the block
val redirectableStream = new RedirectableOutputStream
- val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
- redirectableStream.setOutputStream(byteArrayChunkOutputStream)
+ val bbos = new ChunkedByteBufferOutputStream(initialMemoryThreshold.toInt, allocator)
+ redirectableStream.setOutputStream(bbos)
val serializationStream: SerializationStream = {
val ser = serializerManager.getSerializer(classTag).newInstance()
ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
}
// Request enough memory to begin unrolling
- keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
+ keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode)
if (!keepUnrolling) {
logWarning(s"Failed to reserve initial memory threshold of " +
@@ -325,9 +342,9 @@ private[spark] class MemoryStore(
}
def reserveAdditionalMemoryIfNecessary(): Unit = {
- if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
- val amountToRequest = byteArrayChunkOutputStream.size - unrollMemoryUsedByThisBlock
- keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
+ if (bbos.size > unrollMemoryUsedByThisBlock) {
+ val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock
+ keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
if (keepUnrolling) {
unrollMemoryUsedByThisBlock += amountToRequest
}
@@ -349,12 +366,11 @@ private[spark] class MemoryStore(
}
if (keepUnrolling) {
- val entry = SerializedMemoryEntry[T](
- new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap)), classTag)
+ val entry = SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag)
// Synchronize so that transfer is atomic
memoryManager.synchronized {
- releaseUnrollMemoryForThisTask(unrollMemoryUsedByThisBlock)
- val success = memoryManager.acquireStorageMemory(blockId, entry.size, MemoryMode.ON_HEAP)
+ releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock)
+ val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode)
assert(success, "transferring unroll memory to storage memory failed")
}
entries.synchronized {
@@ -365,7 +381,7 @@ private[spark] class MemoryStore(
Right(entry.size)
} else {
// We ran out of space while unrolling the values for this block
- logUnrollFailureMessage(blockId, byteArrayChunkOutputStream.size)
+ logUnrollFailureMessage(blockId, bbos.size)
Left(
new PartiallySerializedBlock(
this,
@@ -374,7 +390,8 @@ private[spark] class MemoryStore(
serializationStream,
redirectableStream,
unrollMemoryUsedByThisBlock,
- new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap)),
+ memoryMode,
+ bbos.toChunkedByteBuffer,
values,
classTag))
}
@@ -386,7 +403,7 @@ private[spark] class MemoryStore(
case null => None
case e: DeserializedMemoryEntry[_] =>
throw new IllegalArgumentException("should only call getBytes on serialized blocks")
- case SerializedMemoryEntry(bytes, _) => Some(bytes)
+ case SerializedMemoryEntry(bytes, _, _) => Some(bytes)
}
}
@@ -407,8 +424,12 @@ private[spark] class MemoryStore(
entries.remove(blockId)
}
if (entry != null) {
- memoryManager.releaseStorageMemory(entry.size, MemoryMode.ON_HEAP)
- logInfo(s"Block $blockId of size ${entry.size} dropped " +
+ entry match {
+ case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
+ case _ =>
+ }
+ memoryManager.releaseStorageMemory(entry.size, entry.memoryMode)
+ logDebug(s"Block $blockId of size ${entry.size} dropped " +
s"from memory (free ${maxMemory - blocksMemoryUsed})")
true
} else {
@@ -420,7 +441,8 @@ private[spark] class MemoryStore(
entries.synchronized {
entries.clear()
}
- unrollMemoryMap.clear()
+ onHeapUnrollMemoryMap.clear()
+ offHeapUnrollMemoryMap.clear()
memoryManager.releaseAllStorageMemory()
logInfo("MemoryStore cleared")
}
@@ -440,16 +462,20 @@ private[spark] class MemoryStore(
*
* @param blockId the ID of the block we are freeing space for, if any
* @param space the size of this block
+ * @param memoryMode the type of memory to free (on- or off-heap)
* @return the amount of memory (in bytes) freed by eviction
*/
- private[spark] def evictBlocksToFreeSpace(blockId: Option[BlockId], space: Long): Long = {
+ private[spark] def evictBlocksToFreeSpace(
+ blockId: Option[BlockId],
+ space: Long,
+ memoryMode: MemoryMode): Long = {
assert(space > 0)
memoryManager.synchronized {
var freedMemory = 0L
val rddToAdd = blockId.flatMap(getRddId)
val selectedBlocks = new ArrayBuffer[BlockId]
- def blockIsEvictable(blockId: BlockId): Boolean = {
- rddToAdd.isEmpty || rddToAdd != getRddId(blockId)
+ def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = {
+ entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))
}
// This is synchronized to ensure that the set of entries is not changed
// (because of getValue or getBytes) while traversing the iterator, as that
@@ -459,7 +485,8 @@ private[spark] class MemoryStore(
while (freedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
- if (blockIsEvictable(blockId)) {
+ val entry = pair.getValue
+ if (blockIsEvictable(blockId, entry)) {
// We don't want to evict blocks which are currently being read, so we need to obtain
// an exclusive write lock on blocks which are candidates for eviction. We perform a
// non-blocking "tryLock" here in order to ignore blocks which are locked for reading:
@@ -474,7 +501,7 @@ private[spark] class MemoryStore(
def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = {
val data = entry match {
case DeserializedMemoryEntry(values, _, _) => Left(values)
- case SerializedMemoryEntry(buffer, _) => Right(buffer)
+ case SerializedMemoryEntry(buffer, _, _) => Right(buffer)
}
val newEffectiveStorageLevel =
blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag)
@@ -530,11 +557,18 @@ private[spark] class MemoryStore(
*
* @return whether the request is granted.
*/
- def reserveUnrollMemoryForThisTask(blockId: BlockId, memory: Long): Boolean = {
+ def reserveUnrollMemoryForThisTask(
+ blockId: BlockId,
+ memory: Long,
+ memoryMode: MemoryMode): Boolean = {
memoryManager.synchronized {
- val success = memoryManager.acquireUnrollMemory(blockId, memory, MemoryMode.ON_HEAP)
+ val success = memoryManager.acquireUnrollMemory(blockId, memory, memoryMode)
if (success) {
val taskAttemptId = currentTaskAttemptId()
+ val unrollMemoryMap = memoryMode match {
+ case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap
+ case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap
+ }
unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory
}
success
@@ -545,9 +579,13 @@ private[spark] class MemoryStore(
* Release memory used by this task for unrolling blocks.
* If the amount is not specified, remove the current task's allocation altogether.
*/
- def releaseUnrollMemoryForThisTask(memory: Long = Long.MaxValue): Unit = {
+ def releaseUnrollMemoryForThisTask(memoryMode: MemoryMode, memory: Long = Long.MaxValue): Unit = {
val taskAttemptId = currentTaskAttemptId()
memoryManager.synchronized {
+ val unrollMemoryMap = memoryMode match {
+ case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap
+ case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap
+ }
if (unrollMemoryMap.contains(taskAttemptId)) {
val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId))
if (memoryToRelease > 0) {
@@ -555,7 +593,7 @@ private[spark] class MemoryStore(
if (unrollMemoryMap(taskAttemptId) == 0) {
unrollMemoryMap.remove(taskAttemptId)
}
- memoryManager.releaseUnrollMemory(memoryToRelease, MemoryMode.ON_HEAP)
+ memoryManager.releaseUnrollMemory(memoryToRelease, memoryMode)
}
}
}
@@ -565,20 +603,23 @@ private[spark] class MemoryStore(
* Return the amount of memory currently occupied for unrolling blocks across all tasks.
*/
def currentUnrollMemory: Long = memoryManager.synchronized {
- unrollMemoryMap.values.sum
+ onHeapUnrollMemoryMap.values.sum + offHeapUnrollMemoryMap.values.sum
}
/**
* Return the amount of memory currently occupied for unrolling blocks by this task.
*/
def currentUnrollMemoryForThisTask: Long = memoryManager.synchronized {
- unrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L)
+ onHeapUnrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L) +
+ offHeapUnrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L)
}
/**
* Return the number of tasks currently unrolling blocks.
*/
- private def numTasksUnrolling: Int = memoryManager.synchronized { unrollMemoryMap.keys.size }
+ private def numTasksUnrolling: Int = memoryManager.synchronized {
+ (onHeapUnrollMemoryMap.keys ++ offHeapUnrollMemoryMap.keys).toSet.size
+ }
/**
* Log information about current memory usage.
@@ -627,7 +668,7 @@ private[storage] class PartiallyUnrolledIterator[T](
private[this] var iter: Iterator[T] = {
val completionIterator = CompletionIterator[T, Iterator[T]](unrolled, {
unrolledIteratorIsConsumed = true
- memoryStore.releaseUnrollMemoryForThisTask(unrollMemory)
+ memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, unrollMemory)
})
completionIterator ++ rest
}
@@ -640,7 +681,7 @@ private[storage] class PartiallyUnrolledIterator[T](
*/
def close(): Unit = {
if (!unrolledIteratorIsConsumed) {
- memoryStore.releaseUnrollMemoryForThisTask(unrollMemory)
+ memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, unrollMemory)
unrolledIteratorIsConsumed = true
}
iter = null
@@ -669,6 +710,7 @@ private class RedirectableOutputStream extends OutputStream {
* @param serializationStream a serialization stream which writes to [[redirectableOutputStream]].
* @param redirectableOutputStream an OutputStream which can be redirected to a different sink.
* @param unrollMemory the amount of unroll memory used by the values in `unrolled`.
+ * @param memoryMode whether the unroll memory is on- or off-heap
* @param unrolled a byte buffer containing the partially-serialized values.
* @param rest the rest of the original iterator passed to
* [[MemoryStore.putIteratorAsValues()]].
@@ -681,18 +723,36 @@ private[storage] class PartiallySerializedBlock[T](
serializationStream: SerializationStream,
redirectableOutputStream: RedirectableOutputStream,
unrollMemory: Long,
+ memoryMode: MemoryMode,
unrolled: ChunkedByteBuffer,
rest: Iterator[T],
classTag: ClassTag[T]) {
+ // If the task does not fully consume `valuesIterator` or otherwise fails to consume or dispose of
+ // this PartiallySerializedBlock then we risk leaking of direct buffers, so we use a task
+ // completion listener here in order to ensure that `unrolled.dispose()` is called at least once.
+ // The dispose() method is idempotent, so it's safe to call it unconditionally.
+ Option(TaskContext.get()).foreach { taskContext =>
+ taskContext.addTaskCompletionListener { _ =>
+ // When a task completes, its unroll memory will automatically be freed. Thus we do not call
+ // releaseUnrollMemoryForThisTask() here because we want to avoid double-freeing.
+ unrolled.dispose()
+ }
+ }
+
/**
* Called to dispose of this block and free its memory.
*/
def discard(): Unit = {
try {
+ // We want to close the output stream in order to free any resources associated with the
+ // serializer itself (such as Kryo's internal buffers). close() might cause data to be
+ // written, so redirect the output stream to discard that data.
+ redirectableOutputStream.setOutputStream(ByteStreams.nullOutputStream())
serializationStream.close()
} finally {
- memoryStore.releaseUnrollMemoryForThisTask(unrollMemory)
+ unrolled.dispose()
+ memoryStore.releaseUnrollMemoryForThisTask(memoryMode, unrollMemory)
}
}
@@ -701,12 +761,14 @@ private[storage] class PartiallySerializedBlock[T](
* and then serializing the values from the original input iterator.
*/
def finishWritingToStream(os: OutputStream): Unit = {
- ByteStreams.copy(unrolled.toInputStream(), os)
+ // `unrolled`'s underlying buffers will be freed once this input stream is fully read:
+ ByteStreams.copy(unrolled.toInputStream(dispose = true), os)
+ memoryStore.releaseUnrollMemoryForThisTask(memoryMode, unrollMemory)
redirectableOutputStream.setOutputStream(os)
while (rest.hasNext) {
serializationStream.writeObject(rest.next())(classTag)
}
- discard()
+ serializationStream.close()
}
/**
@@ -717,10 +779,13 @@ private[storage] class PartiallySerializedBlock[T](
* `close()` on it to free its resources.
*/
def valuesIterator: PartiallyUnrolledIterator[T] = {
+ // `unrolled`'s underlying buffers will be freed once this input stream is fully read:
+ val unrolledIter = serializerManager.dataDeserializeStream(
+ blockId, unrolled.toInputStream(dispose = true))(classTag)
new PartiallyUnrolledIterator(
memoryStore,
unrollMemory,
- unrolled = serializerManager.dataDeserialize(blockId, unrolled)(classTag),
+ unrolled = CompletionIterator[T, Iterator[T]](unrolledIter, discard()),
rest = rest)
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index c643c4b63c..fb4706e78d 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -41,6 +41,8 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
require(chunks.forall(_.limit() > 0), "chunks must be non-empty")
require(chunks.forall(_.position() == 0), "chunks' positions must be 0")
+ private[this] var disposed: Boolean = false
+
/**
* This size of this buffer, in bytes.
*/
@@ -117,11 +119,12 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
/**
* Make a copy of this ChunkedByteBuffer, copying all of the backing data into new buffers.
* The new buffer will share no resources with the original buffer.
+ *
+ * @param allocator a method for allocating byte buffers
*/
- def copy(): ChunkedByteBuffer = {
+ def copy(allocator: Int => ByteBuffer): ChunkedByteBuffer = {
val copiedChunks = getChunks().map { chunk =>
- // TODO: accept an allocator in this copy method to integrate with mem. accounting systems
- val newChunk = ByteBuffer.allocate(chunk.limit())
+ val newChunk = allocator(chunk.limit())
newChunk.put(chunk)
newChunk.flip()
newChunk
@@ -136,7 +139,10 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
* unfortunately no standard API to do this.
*/
def dispose(): Unit = {
- chunks.foreach(StorageUtils.dispose)
+ if (!disposed) {
+ chunks.foreach(StorageUtils.dispose)
+ disposed = true
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/io/ByteArrayChunkOutputStream.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferOutputStream.scala
index 16fe3be303..67b50d1e70 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ByteArrayChunkOutputStream.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferOutputStream.scala
@@ -18,19 +18,25 @@
package org.apache.spark.util.io
import java.io.OutputStream
+import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.storage.StorageUtils
/**
* An OutputStream that writes to fixed-size chunks of byte arrays.
*
* @param chunkSize size of each chunk, in bytes.
*/
-private[spark]
-class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream {
+private[spark] class ChunkedByteBufferOutputStream(
+ chunkSize: Int,
+ allocator: Int => ByteBuffer)
+ extends OutputStream {
- private[this] val chunks = new ArrayBuffer[Array[Byte]]
+ private[this] var toChunkedByteBufferWasCalled = false
+
+ private val chunks = new ArrayBuffer[ByteBuffer]
/** Index of the last chunk. Starting with -1 when the chunks array is empty. */
private[this] var lastChunkIndex = -1
@@ -48,7 +54,7 @@ class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream {
override def write(b: Int): Unit = {
allocateNewChunkIfNeeded()
- chunks(lastChunkIndex)(position) = b.toByte
+ chunks(lastChunkIndex).put(b.toByte)
position += 1
_size += 1
}
@@ -58,7 +64,7 @@ class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream {
while (written < len) {
allocateNewChunkIfNeeded()
val thisBatch = math.min(chunkSize - position, len - written)
- System.arraycopy(bytes, written + off, chunks(lastChunkIndex), position, thisBatch)
+ chunks(lastChunkIndex).put(bytes, written + off, thisBatch)
written += thisBatch
position += thisBatch
}
@@ -67,33 +73,41 @@ class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream {
@inline
private def allocateNewChunkIfNeeded(): Unit = {
+ require(!toChunkedByteBufferWasCalled, "cannot write after toChunkedByteBuffer() is called")
if (position == chunkSize) {
- chunks += new Array[Byte](chunkSize)
+ chunks += allocator(chunkSize)
lastChunkIndex += 1
position = 0
}
}
- def toArrays: Array[Array[Byte]] = {
+ def toChunkedByteBuffer: ChunkedByteBuffer = {
+ require(!toChunkedByteBufferWasCalled, "toChunkedByteBuffer() can only be called once")
+ toChunkedByteBufferWasCalled = true
if (lastChunkIndex == -1) {
- new Array[Array[Byte]](0)
+ new ChunkedByteBuffer(Array.empty[ByteBuffer])
} else {
// Copy the first n-1 chunks to the output, and then create an array that fits the last chunk.
// An alternative would have been returning an array of ByteBuffers, with the last buffer
// bounded to only the last chunk's position. However, given our use case in Spark (to put
// the chunks in block manager), only limiting the view bound of the buffer would still
// require the block manager to store the whole chunk.
- val ret = new Array[Array[Byte]](chunks.size)
+ val ret = new Array[ByteBuffer](chunks.size)
for (i <- 0 until chunks.size - 1) {
ret(i) = chunks(i)
+ ret(i).flip()
}
if (position == chunkSize) {
ret(lastChunkIndex) = chunks(lastChunkIndex)
+ ret(lastChunkIndex).flip()
} else {
- ret(lastChunkIndex) = new Array[Byte](position)
- System.arraycopy(chunks(lastChunkIndex), 0, ret(lastChunkIndex), 0, position)
+ ret(lastChunkIndex) = allocator(position)
+ chunks(lastChunkIndex).flip()
+ ret(lastChunkIndex).put(chunks(lastChunkIndex))
+ ret(lastChunkIndex).flip()
+ StorageUtils.dispose(chunks(lastChunkIndex))
}
- ret
+ new ChunkedByteBuffer(ret)
}
}
}