aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala49
-rw-r--r--core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala229
-rw-r--r--core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/io/ByteArrayChunkOutputStream.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala169
6 files changed, 392 insertions, 77 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index eebb43e245..30d2e23efd 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -746,7 +746,7 @@ private[spark] class BlockManager(
// We will drop it to disk later if the memory store can't hold it.
val putSucceeded = if (level.deserialized) {
val values = serializerManager.dataDeserialize(blockId, bytes)(classTag)
- memoryStore.putIterator(blockId, values, level, classTag) match {
+ memoryStore.putIteratorAsValues(blockId, values, classTag) match {
case Right(_) => true
case Left(iter) =>
// If putting deserialized values in memory failed, we will put the bytes directly to
@@ -876,21 +876,40 @@ private[spark] class BlockManager(
if (level.useMemory) {
// Put it in memory first, even if it also has useDisk set to true;
// We will drop it to disk later if the memory store can't hold it.
- memoryStore.putIterator(blockId, iterator(), level, classTag) match {
- case Right(s) =>
- size = s
- case Left(iter) =>
- // Not enough space to unroll this block; drop to disk if applicable
- if (level.useDisk) {
- logWarning(s"Persisting block $blockId to disk instead.")
- diskStore.put(blockId) { fileOutputStream =>
- serializerManager.dataSerializeStream(blockId, fileOutputStream, iter)(classTag)
+ if (level.deserialized) {
+ memoryStore.putIteratorAsValues(blockId, iterator(), classTag) match {
+ case Right(s) =>
+ size = s
+ case Left(iter) =>
+ // Not enough space to unroll this block; drop to disk if applicable
+ if (level.useDisk) {
+ logWarning(s"Persisting block $blockId to disk instead.")
+ diskStore.put(blockId) { fileOutputStream =>
+ serializerManager.dataSerializeStream(blockId, fileOutputStream, iter)(classTag)
+ }
+ size = diskStore.getSize(blockId)
+ } else {
+ iteratorFromFailedMemoryStorePut = Some(iter)
}
- size = diskStore.getSize(blockId)
- } else {
- iteratorFromFailedMemoryStorePut = Some(iter)
- }
+ }
+ } else { // !level.deserialized
+ memoryStore.putIteratorAsBytes(blockId, iterator(), classTag) match {
+ case Right(s) =>
+ size = s
+ case Left(partiallySerializedValues) =>
+ // Not enough space to unroll this block; drop to disk if applicable
+ if (level.useDisk) {
+ logWarning(s"Persisting block $blockId to disk instead.")
+ diskStore.put(blockId) { fileOutputStream =>
+ partiallySerializedValues.finishWritingToStream(fileOutputStream)
+ }
+ size = diskStore.getSize(blockId)
+ } else {
+ iteratorFromFailedMemoryStorePut = Some(partiallySerializedValues.valuesIterator)
+ }
+ }
}
+
} else if (level.useDisk) {
diskStore.put(blockId) { fileOutputStream =>
serializerManager.dataSerializeStream(blockId, fileOutputStream, iterator())(classTag)
@@ -991,7 +1010,7 @@ private[spark] class BlockManager(
// Note: if we had a means to discard the disk iterator, we would do that here.
memoryStore.getValues(blockId).get
} else {
- memoryStore.putIterator(blockId, diskIterator, level, classTag) match {
+ memoryStore.putIteratorAsValues(blockId, diskIterator, classTag) match {
case Left(iter) =>
// The memory store put() failed, so it returned the iterator back to us:
iter
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 90016cbeb8..1a78c9c010 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -17,20 +17,24 @@
package org.apache.spark.storage.memory
+import java.io.OutputStream
+import java.nio.ByteBuffer
import java.util.LinkedHashMap
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
+import com.google.common.io.ByteStreams
+
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.memory.MemoryManager
-import org.apache.spark.serializer.SerializerManager
+import org.apache.spark.serializer.{SerializationStream, SerializerManager}
import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel}
import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils}
import org.apache.spark.util.collection.SizeTrackingVector
-import org.apache.spark.util.io.ChunkedByteBuffer
+import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer}
private sealed trait MemoryEntry[T] {
def size: Long
@@ -42,8 +46,9 @@ private case class DeserializedMemoryEntry[T](
classTag: ClassTag[T]) extends MemoryEntry[T]
private case class SerializedMemoryEntry[T](
buffer: ChunkedByteBuffer,
- size: Long,
- classTag: ClassTag[T]) extends MemoryEntry[T]
+ classTag: ClassTag[T]) extends MemoryEntry[T] {
+ def size: Long = buffer.size
+}
private[storage] trait BlockEvictionHandler {
/**
@@ -132,7 +137,7 @@ private[spark] class MemoryStore(
// We acquired enough memory for the block, so go ahead and put it
val bytes = _bytes()
assert(bytes.size == size)
- val entry = new SerializedMemoryEntry[T](bytes, size, implicitly[ClassTag[T]])
+ val entry = new SerializedMemoryEntry[T](bytes, implicitly[ClassTag[T]])
entries.synchronized {
entries.put(blockId, entry)
}
@@ -145,7 +150,7 @@ private[spark] class MemoryStore(
}
/**
- * Attempt to put the given block in memory store.
+ * Attempt to put the given block in memory store as values.
*
* It's possible that the iterator is too large to materialize and store in memory. To avoid
* OOM exceptions, this method will gradually unroll the iterator while periodically checking
@@ -160,10 +165,9 @@ private[spark] class MemoryStore(
* iterator or call `close()` on it in order to free the storage memory consumed by the
* partially-unrolled block.
*/
- private[storage] def putIterator[T](
+ private[storage] def putIteratorAsValues[T](
blockId: BlockId,
values: Iterator[T],
- level: StorageLevel,
classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
@@ -218,12 +222,8 @@ private[spark] class MemoryStore(
// We successfully unrolled the entirety of this block
val arrayValues = vector.toArray
vector = null
- val entry = if (level.deserialized) {
+ val entry =
new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag)
- } else {
- val bytes = serializerManager.dataSerialize(blockId, arrayValues.iterator)(classTag)
- new SerializedMemoryEntry[T](bytes, bytes.size, classTag)
- }
val size = entry.size
def transferUnrollToStorage(amount: Long): Unit = {
// Synchronize so that transfer is atomic
@@ -255,12 +255,8 @@ private[spark] class MemoryStore(
entries.synchronized {
entries.put(blockId, entry)
}
- val bytesOrValues = if (level.deserialized) "values" else "bytes"
- logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
- blockId,
- bytesOrValues,
- Utils.bytesToString(size),
- Utils.bytesToString(maxMemory - blocksMemoryUsed)))
+ logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(
+ blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
Right(size)
} else {
assert(currentUnrollMemoryForThisTask >= currentUnrollMemoryForThisTask,
@@ -279,13 +275,117 @@ private[spark] class MemoryStore(
}
}
+ /**
+ * Attempt to put the given block in memory store as bytes.
+ *
+ * It's possible that the iterator is too large to materialize and store in memory. To avoid
+ * OOM exceptions, this method will gradually unroll the iterator while periodically checking
+ * whether there is enough free memory. If the block is successfully materialized, then the
+ * temporary unroll memory used during the materialization is "transferred" to storage memory,
+ * so we won't acquire more memory than is actually needed to store the block.
+ *
+ * @return in case of success, the estimated the estimated size of the stored data. In case of
+ * failure, return a handle which allows the caller to either finish the serialization
+ * by spilling to disk or to deserialize the partially-serialized block and reconstruct
+ * the original input iterator. The caller must either fully consume this result
+ * iterator or call `discard()` on it in order to free the storage memory consumed by the
+ * partially-unrolled block.
+ */
+ private[storage] def putIteratorAsBytes[T](
+ blockId: BlockId,
+ values: Iterator[T],
+ classTag: ClassTag[T]): Either[PartiallySerializedBlock[T], Long] = {
+
+ require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
+
+ // Whether there is still enough memory for us to continue unrolling this block
+ var keepUnrolling = true
+ // Initial per-task memory to request for unrolling blocks (bytes).
+ val initialMemoryThreshold = unrollMemoryThreshold
+ // Keep track of unroll memory used by this particular block / putIterator() operation
+ var unrollMemoryUsedByThisBlock = 0L
+ // Underlying buffer for unrolling the block
+ val redirectableStream = new RedirectableOutputStream
+ val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
+ redirectableStream.setOutputStream(byteArrayChunkOutputStream)
+ val serializationStream: SerializationStream = {
+ val ser = serializerManager.getSerializer(classTag).newInstance()
+ ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
+ }
+
+ // Request enough memory to begin unrolling
+ keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
+
+ if (!keepUnrolling) {
+ logWarning(s"Failed to reserve initial memory threshold of " +
+ s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
+ } else {
+ unrollMemoryUsedByThisBlock += initialMemoryThreshold
+ }
+
+ def reserveAdditionalMemoryIfNecessary(): Unit = {
+ if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
+ val amountToRequest = byteArrayChunkOutputStream.size - unrollMemoryUsedByThisBlock
+ keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
+ if (keepUnrolling) {
+ unrollMemoryUsedByThisBlock += amountToRequest
+ }
+ }
+ }
+
+ // Unroll this block safely, checking whether we have exceeded our threshold
+ while (values.hasNext && keepUnrolling) {
+ serializationStream.writeObject(values.next())(classTag)
+ reserveAdditionalMemoryIfNecessary()
+ }
+
+ // Make sure that we have enough memory to store the block. By this point, it is possible that
+ // the block's actual memory usage has exceeded the unroll memory by a small amount, so we
+ // perform one final call to attempt to allocate additional memory if necessary.
+ if (keepUnrolling) {
+ serializationStream.close()
+ reserveAdditionalMemoryIfNecessary()
+ }
+
+ if (keepUnrolling) {
+ val entry = SerializedMemoryEntry[T](
+ new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap)), classTag)
+ // Synchronize so that transfer is atomic
+ memoryManager.synchronized {
+ releaseUnrollMemoryForThisTask(unrollMemoryUsedByThisBlock)
+ val success = memoryManager.acquireStorageMemory(blockId, entry.size)
+ assert(success, "transferring unroll memory to storage memory failed")
+ }
+ entries.synchronized {
+ entries.put(blockId, entry)
+ }
+ logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format(
+ blockId, Utils.bytesToString(entry.size), Utils.bytesToString(blocksMemoryUsed)))
+ Right(entry.size)
+ } else {
+ // We ran out of space while unrolling the values for this block
+ logUnrollFailureMessage(blockId, byteArrayChunkOutputStream.size)
+ Left(
+ new PartiallySerializedBlock(
+ this,
+ serializerManager,
+ blockId,
+ serializationStream,
+ redirectableStream,
+ unrollMemoryUsedByThisBlock,
+ new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap)),
+ values,
+ classTag))
+ }
+ }
+
def getBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
val entry = entries.synchronized { entries.get(blockId) }
entry match {
case null => None
case e: DeserializedMemoryEntry[_] =>
throw new IllegalArgumentException("should only call getBytes on serialized blocks")
- case SerializedMemoryEntry(bytes, _, _) => Some(bytes)
+ case SerializedMemoryEntry(bytes, _) => Some(bytes)
}
}
@@ -373,7 +473,7 @@ private[spark] class MemoryStore(
def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = {
val data = entry match {
case DeserializedMemoryEntry(values, _, _) => Left(values)
- case SerializedMemoryEntry(buffer, _, _) => Right(buffer)
+ case SerializedMemoryEntry(buffer, _) => Right(buffer)
}
val newEffectiveStorageLevel =
blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag)
@@ -507,12 +607,13 @@ private[spark] class MemoryStore(
}
/**
- * The result of a failed [[MemoryStore.putIterator()]] call.
+ * The result of a failed [[MemoryStore.putIteratorAsValues()]] call.
*
- * @param memoryStore the memoryStore, used for freeing memory.
+ * @param memoryStore the memoryStore, used for freeing memory.
* @param unrollMemory the amount of unroll memory used by the values in `unrolled`.
- * @param unrolled an iterator for the partially-unrolled values.
- * @param rest the rest of the original iterator passed to [[MemoryStore.putIterator()]].
+ * @param unrolled an iterator for the partially-unrolled values.
+ * @param rest the rest of the original iterator passed to
+ * [[MemoryStore.putIteratorAsValues()]].
*/
private[storage] class PartiallyUnrolledIterator[T](
memoryStore: MemoryStore,
@@ -544,3 +645,81 @@ private[storage] class PartiallyUnrolledIterator[T](
iter = null
}
}
+
+/**
+ * A wrapper which allows an open [[OutputStream]] to be redirected to a different sink.
+ */
+private class RedirectableOutputStream extends OutputStream {
+ private[this] var os: OutputStream = _
+ def setOutputStream(s: OutputStream): Unit = { os = s }
+ override def write(b: Int): Unit = os.write(b)
+ override def write(b: Array[Byte]): Unit = os.write(b)
+ override def write(b: Array[Byte], off: Int, len: Int): Unit = os.write(b, off, len)
+ override def flush(): Unit = os.flush()
+ override def close(): Unit = os.close()
+}
+
+/**
+ * The result of a failed [[MemoryStore.putIteratorAsBytes()]] call.
+ *
+ * @param memoryStore the MemoryStore, used for freeing memory.
+ * @param serializerManager the SerializerManager, used for deserializing values.
+ * @param blockId the block id.
+ * @param serializationStream a serialization stream which writes to [[redirectableOutputStream]].
+ * @param redirectableOutputStream an OutputStream which can be redirected to a different sink.
+ * @param unrollMemory the amount of unroll memory used by the values in `unrolled`.
+ * @param unrolled a byte buffer containing the partially-serialized values.
+ * @param rest the rest of the original iterator passed to
+ * [[MemoryStore.putIteratorAsValues()]].
+ * @param classTag the [[ClassTag]] for the block.
+ */
+private[storage] class PartiallySerializedBlock[T](
+ memoryStore: MemoryStore,
+ serializerManager: SerializerManager,
+ blockId: BlockId,
+ serializationStream: SerializationStream,
+ redirectableOutputStream: RedirectableOutputStream,
+ unrollMemory: Long,
+ unrolled: ChunkedByteBuffer,
+ rest: Iterator[T],
+ classTag: ClassTag[T]) {
+
+ /**
+ * Called to dispose of this block and free its memory.
+ */
+ def discard(): Unit = {
+ try {
+ serializationStream.close()
+ } finally {
+ memoryStore.releaseUnrollMemoryForThisTask(unrollMemory)
+ }
+ }
+
+ /**
+ * Finish writing this block to the given output stream by first writing the serialized values
+ * and then serializing the values from the original input iterator.
+ */
+ def finishWritingToStream(os: OutputStream): Unit = {
+ ByteStreams.copy(unrolled.toInputStream(), os)
+ redirectableOutputStream.setOutputStream(os)
+ while (rest.hasNext) {
+ serializationStream.writeObject(rest.next())(classTag)
+ }
+ discard()
+ }
+
+ /**
+ * Returns an iterator over the values in this block by first deserializing the serialized
+ * values and then consuming the rest of the original input iterator.
+ *
+ * If the caller does not plan to fully consume the resulting iterator then they must call
+ * `close()` on it to free its resources.
+ */
+ def valuesIterator: PartiallyUnrolledIterator[T] = {
+ new PartiallyUnrolledIterator(
+ memoryStore,
+ unrollMemory,
+ unrolled = serializerManager.dataDeserialize(blockId, unrolled)(classTag),
+ rest = rest)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
index 8527e3ae69..09e7579ae9 100644
--- a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
+++ b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
@@ -27,6 +27,8 @@ private[spark] class ByteBufferOutputStream(capacity: Int) extends ByteArrayOutp
def this() = this(32)
+ def getCount(): Int = count
+
def toByteBuffer: ByteBuffer = {
return ByteBuffer.wrap(buf, 0, count)
}
diff --git a/core/src/main/scala/org/apache/spark/util/io/ByteArrayChunkOutputStream.scala b/core/src/main/scala/org/apache/spark/util/io/ByteArrayChunkOutputStream.scala
index daac6f971e..16fe3be303 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ByteArrayChunkOutputStream.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ByteArrayChunkOutputStream.scala
@@ -30,10 +30,10 @@ import scala.collection.mutable.ArrayBuffer
private[spark]
class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream {
- private val chunks = new ArrayBuffer[Array[Byte]]
+ private[this] val chunks = new ArrayBuffer[Array[Byte]]
/** Index of the last chunk. Starting with -1 when the chunks array is empty. */
- private var lastChunkIndex = -1
+ private[this] var lastChunkIndex = -1
/**
* Next position to write in the last chunk.
@@ -41,12 +41,16 @@ class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream {
* If this equals chunkSize, it means for next write we need to allocate a new chunk.
* This can also never be 0.
*/
- private var position = chunkSize
+ private[this] var position = chunkSize
+ private[this] var _size = 0
+
+ def size: Long = _size
override def write(b: Int): Unit = {
allocateNewChunkIfNeeded()
chunks(lastChunkIndex)(position) = b.toByte
position += 1
+ _size += 1
}
override def write(bytes: Array[Byte], off: Int, len: Int): Unit = {
@@ -58,6 +62,7 @@ class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream {
written += thisBatch
position += thisBatch
}
+ _size += len
}
@inline
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 94f6f87740..7a4cb39b14 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -1035,7 +1035,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
test("safely unroll blocks through putIterator (disk)") {
store = makeBlockManager(12000)
- val memAndDisk = StorageLevel.MEMORY_AND_DISK
val memoryStore = store.memoryStore
val diskStore = store.diskStore
val smallList = List.fill(40)(new Array[Byte](100))
@@ -1044,12 +1043,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- store.putIterator("b1", smallIterator, memAndDisk)
- store.putIterator("b2", smallIterator, memAndDisk)
+ store.putIterator("b1", smallIterator, StorageLevel.MEMORY_AND_DISK)
+ store.putIterator("b2", smallIterator, StorageLevel.MEMORY_AND_DISK)
// Unroll with not enough space. This should succeed but kick out b1 in the process.
// Memory store should contain b2 and b3, while disk store should contain only b1
- val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk, ClassTag.Any)
+ val result3 = memoryStore.putIteratorAsValues("b3", smallIterator, ClassTag.Any)
assert(result3.isRight)
assert(!memoryStore.contains("b1"))
assert(memoryStore.contains("b2"))
@@ -1065,7 +1064,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// the block may be stored to disk. During the unrolling process, block "b2" should be kicked
// out, so the memory store should contain only b3, while the disk store should contain
// b1, b2 and b4.
- val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk, ClassTag.Any)
+ val result4 = memoryStore.putIteratorAsValues("b4", bigIterator, ClassTag.Any)
assert(result4.isLeft)
assert(!memoryStore.contains("b1"))
assert(!memoryStore.contains("b2"))
diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
index b4ab67ca15..43e832dc02 100644
--- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
@@ -29,7 +29,7 @@ import org.scalatest._
import org.apache.spark._
import org.apache.spark.memory.StaticMemoryManager
import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
-import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore, PartiallyUnrolledIterator}
+import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore, PartiallySerializedBlock, PartiallyUnrolledIterator}
import org.apache.spark.util._
import org.apache.spark.util.io.ChunkedByteBuffer
@@ -47,6 +47,8 @@ class MemoryStoreSuite
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m"))
+ val serializerManager = new SerializerManager(serializer, conf)
+
// Implicitly convert strings to BlockIds for test clarity.
implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
def rdd(rddId: Int, splitId: Int): RDDBlockId = RDDBlockId(rddId, splitId)
@@ -61,7 +63,6 @@ class MemoryStoreSuite
def makeMemoryStore(maxMem: Long): (MemoryStore, BlockInfoManager) = {
val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
- val serializerManager = new SerializerManager(serializer, conf)
val blockInfoManager = new BlockInfoManager
val blockEvictionHandler = new BlockEvictionHandler {
var memoryStore: MemoryStore = _
@@ -121,20 +122,20 @@ class MemoryStoreSuite
val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- def putIterator[T](
+ def putIteratorAsValues[T](
blockId: BlockId,
iter: Iterator[T],
classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
assert(blockInfoManager.lockNewBlockForWriting(
blockId,
new BlockInfo(StorageLevel.MEMORY_ONLY, classTag, tellMaster = false)))
- val res = memoryStore.putIterator(blockId, iter, StorageLevel.MEMORY_ONLY, classTag)
+ val res = memoryStore.putIteratorAsValues(blockId, iter, classTag)
blockInfoManager.unlock(blockId)
res
}
// Unroll with all the space in the world. This should succeed.
- var putResult = putIterator("unroll", smallList.iterator, ClassTag.Any)
+ var putResult = putIteratorAsValues("unroll", smallList.iterator, ClassTag.Any)
assert(putResult.isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) =>
@@ -145,9 +146,9 @@ class MemoryStoreSuite
blockInfoManager.removeBlock("unroll")
// Unroll with not enough space. This should succeed after kicking out someBlock1.
- assert(putIterator("someBlock1", smallList.iterator, ct).isRight)
- assert(putIterator("someBlock2", smallList.iterator, ct).isRight)
- putResult = putIterator("unroll", smallList.iterator, ClassTag.Any)
+ assert(putIteratorAsValues("someBlock1", smallList.iterator, ct).isRight)
+ assert(putIteratorAsValues("someBlock2", smallList.iterator, ct).isRight)
+ putResult = putIteratorAsValues("unroll", smallList.iterator, ClassTag.Any)
assert(putResult.isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
assert(memoryStore.contains("someBlock2"))
@@ -162,8 +163,8 @@ class MemoryStoreSuite
// Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
// 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
// In the meantime, however, we kicked out someBlock2 before giving up.
- assert(putIterator("someBlock3", smallList.iterator, ct).isRight)
- putResult = putIterator("unroll", bigList.iterator, ClassTag.Any)
+ assert(putIteratorAsValues("someBlock3", smallList.iterator, ct).isRight)
+ putResult = putIteratorAsValues("unroll", bigList.iterator, ClassTag.Any)
assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
assert(!memoryStore.contains("someBlock2"))
assert(putResult.isLeft)
@@ -174,7 +175,7 @@ class MemoryStoreSuite
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
}
- test("safely unroll blocks through putIterator") {
+ test("safely unroll blocks through putIteratorAsValues") {
val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
val smallList = List.fill(40)(new Array[Byte](100))
val bigList = List.fill(40)(new Array[Byte](1000))
@@ -182,21 +183,21 @@ class MemoryStoreSuite
def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- def putIterator[T](
+ def putIteratorAsValues[T](
blockId: BlockId,
iter: Iterator[T],
classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
assert(blockInfoManager.lockNewBlockForWriting(
blockId,
new BlockInfo(StorageLevel.MEMORY_ONLY, classTag, tellMaster = false)))
- val res = memoryStore.putIterator(blockId, iter, StorageLevel.MEMORY_ONLY, classTag)
+ val res = memoryStore.putIteratorAsValues(blockId, iter, classTag)
blockInfoManager.unlock(blockId)
res
}
// Unroll with plenty of space. This should succeed and cache both blocks.
- val result1 = putIterator("b1", smallIterator, ClassTag.Any)
- val result2 = putIterator("b2", smallIterator, ClassTag.Any)
+ val result1 = putIteratorAsValues("b1", smallIterator, ClassTag.Any)
+ val result2 = putIteratorAsValues("b2", smallIterator, ClassTag.Any)
assert(memoryStore.contains("b1"))
assert(memoryStore.contains("b2"))
assert(result1.isRight) // unroll was successful
@@ -211,11 +212,11 @@ class MemoryStoreSuite
blockInfoManager.lockForWriting("b2")
memoryStore.remove("b2")
blockInfoManager.removeBlock("b2")
- putIterator("b1", smallIterator, ClassTag.Any)
- putIterator("b2", smallIterator, ClassTag.Any)
+ putIteratorAsValues("b1", smallIterator, ClassTag.Any)
+ putIteratorAsValues("b2", smallIterator, ClassTag.Any)
// Unroll with not enough space. This should succeed but kick out b1 in the process.
- val result3 = putIterator("b3", smallIterator, ClassTag.Any)
+ val result3 = putIteratorAsValues("b3", smallIterator, ClassTag.Any)
assert(result3.isRight)
assert(!memoryStore.contains("b1"))
assert(memoryStore.contains("b2"))
@@ -224,10 +225,10 @@ class MemoryStoreSuite
blockInfoManager.lockForWriting("b3")
assert(memoryStore.remove("b3"))
blockInfoManager.removeBlock("b3")
- putIterator("b3", smallIterator, ClassTag.Any)
+ putIteratorAsValues("b3", smallIterator, ClassTag.Any)
// Unroll huge block with not enough space. This should fail and kick out b2 in the process.
- val result4 = putIterator("b4", bigIterator, ClassTag.Any)
+ val result4 = putIteratorAsValues("b4", bigIterator, ClassTag.Any)
assert(result4.isLeft) // unroll was unsuccessful
assert(!memoryStore.contains("b1"))
assert(!memoryStore.contains("b2"))
@@ -238,41 +239,151 @@ class MemoryStoreSuite
assert(memoryStore.currentUnrollMemoryForThisTask === 0) // close released the unroll memory
}
+ test("safely unroll blocks through putIteratorAsBytes") {
+ val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
+ val smallList = List.fill(40)(new Array[Byte](100))
+ val bigList = List.fill(40)(new Array[Byte](1000))
+ def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]]
+ def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+
+ def putIteratorAsBytes[T](
+ blockId: BlockId,
+ iter: Iterator[T],
+ classTag: ClassTag[T]): Either[PartiallySerializedBlock[T], Long] = {
+ assert(blockInfoManager.lockNewBlockForWriting(
+ blockId,
+ new BlockInfo(StorageLevel.MEMORY_ONLY_SER, classTag, tellMaster = false)))
+ val res = memoryStore.putIteratorAsBytes(blockId, iter, classTag)
+ blockInfoManager.unlock(blockId)
+ res
+ }
+
+ // Unroll with plenty of space. This should succeed and cache both blocks.
+ val result1 = putIteratorAsBytes("b1", smallIterator, ClassTag.Any)
+ val result2 = putIteratorAsBytes("b2", smallIterator, ClassTag.Any)
+ assert(memoryStore.contains("b1"))
+ assert(memoryStore.contains("b2"))
+ assert(result1.isRight) // unroll was successful
+ assert(result2.isRight)
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+
+ // Re-put these two blocks so block manager knows about them too. Otherwise, block manager
+ // would not know how to drop them from memory later.
+ blockInfoManager.lockForWriting("b1")
+ memoryStore.remove("b1")
+ blockInfoManager.removeBlock("b1")
+ blockInfoManager.lockForWriting("b2")
+ memoryStore.remove("b2")
+ blockInfoManager.removeBlock("b2")
+ putIteratorAsBytes("b1", smallIterator, ClassTag.Any)
+ putIteratorAsBytes("b2", smallIterator, ClassTag.Any)
+
+ // Unroll with not enough space. This should succeed but kick out b1 in the process.
+ val result3 = putIteratorAsBytes("b3", smallIterator, ClassTag.Any)
+ assert(result3.isRight)
+ assert(!memoryStore.contains("b1"))
+ assert(memoryStore.contains("b2"))
+ assert(memoryStore.contains("b3"))
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+ blockInfoManager.lockForWriting("b3")
+ assert(memoryStore.remove("b3"))
+ blockInfoManager.removeBlock("b3")
+ putIteratorAsBytes("b3", smallIterator, ClassTag.Any)
+
+ // Unroll huge block with not enough space. This should fail and kick out b2 in the process.
+ val result4 = putIteratorAsBytes("b4", bigIterator, ClassTag.Any)
+ assert(result4.isLeft) // unroll was unsuccessful
+ assert(!memoryStore.contains("b1"))
+ assert(!memoryStore.contains("b2"))
+ assert(memoryStore.contains("b3"))
+ assert(!memoryStore.contains("b4"))
+ assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
+ result4.left.get.discard()
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0) // discard released the unroll memory
+ }
+
+ test("PartiallySerializedBlock.valuesIterator") {
+ val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
+ val bigList = List.fill(40)(new Array[Byte](1000))
+ def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
+
+ // Unroll huge block with not enough space. This should fail.
+ assert(blockInfoManager.lockNewBlockForWriting(
+ "b1",
+ new BlockInfo(StorageLevel.MEMORY_ONLY_SER, ClassTag.Any, tellMaster = false)))
+ val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any)
+ blockInfoManager.unlock("b1")
+ assert(res.isLeft)
+ assert(memoryStore.currentUnrollMemoryForThisTask > 0)
+ val valuesReturnedFromFailedPut = res.left.get.valuesIterator.toSeq // force materialization
+ valuesReturnedFromFailedPut.zip(bigList).foreach { case (e, a) =>
+ assert(e === a, "PartiallySerializedBlock.valuesIterator() did not return original values!")
+ }
+ // The unroll memory was freed once the iterator was fully traversed.
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+ }
+
+ test("PartiallySerializedBlock.finishWritingToStream") {
+ val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
+ val bigList = List.fill(40)(new Array[Byte](1000))
+ def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
+
+ // Unroll huge block with not enough space. This should fail.
+ assert(blockInfoManager.lockNewBlockForWriting(
+ "b1",
+ new BlockInfo(StorageLevel.MEMORY_ONLY_SER, ClassTag.Any, tellMaster = false)))
+ val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any)
+ blockInfoManager.unlock("b1")
+ assert(res.isLeft)
+ assert(memoryStore.currentUnrollMemoryForThisTask > 0)
+ val bos = new ByteBufferOutputStream()
+ res.left.get.finishWritingToStream(bos)
+ // The unroll memory was freed once the block was fully written.
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+ val deserializationStream = serializerManager.dataDeserializeStream[Any](
+ "b1", new ByteBufferInputStream(bos.toByteBuffer))(ClassTag.Any)
+ deserializationStream.zip(bigList.iterator).foreach { case (e, a) =>
+ assert(e === a,
+ "PartiallySerializedBlock.finishWritingtoStream() did not write original values!")
+ }
+ }
+
test("multiple unrolls by the same thread") {
val (memoryStore, _) = makeMemoryStore(12000)
val smallList = List.fill(40)(new Array[Byte](100))
def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]]
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- def putIterator(
+ def putIteratorAsValues(
blockId: BlockId,
iter: Iterator[Any]): Either[PartiallyUnrolledIterator[Any], Long] = {
- memoryStore.putIterator(blockId, iter, StorageLevel.MEMORY_ONLY, ClassTag.Any)
+ memoryStore.putIteratorAsValues(blockId, iter, ClassTag.Any)
}
// All unroll memory used is released because putIterator did not return an iterator
- assert(putIterator("b1", smallIterator).isRight)
+ assert(putIteratorAsValues("b1", smallIterator).isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- assert(putIterator("b2", smallIterator).isRight)
+ assert(putIteratorAsValues("b2", smallIterator).isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// Unroll memory is not released because putIterator returned an iterator
// that still depends on the underlying vector used in the process
- assert(putIterator("b3", smallIterator).isLeft)
+ assert(putIteratorAsValues("b3", smallIterator).isLeft)
val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisTask
assert(unrollMemoryAfterB3 > 0)
// The unroll memory owned by this thread builds on top of its value after the previous unrolls
- assert(putIterator("b4", smallIterator).isLeft)
+ assert(putIteratorAsValues("b4", smallIterator).isLeft)
val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisTask
assert(unrollMemoryAfterB4 > unrollMemoryAfterB3)
// ... but only to a certain extent (until we run out of free space to grant new unroll memory)
- assert(putIterator("b5", smallIterator).isLeft)
+ assert(putIteratorAsValues("b5", smallIterator).isLeft)
val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisTask
- assert(putIterator("b6", smallIterator).isLeft)
+ assert(putIteratorAsValues("b6", smallIterator).isLeft)
val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisTask
- assert(putIterator("b7", smallIterator).isLeft)
+ assert(putIteratorAsValues("b7", smallIterator).isLeft)
val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisTask
assert(unrollMemoryAfterB5 === unrollMemoryAfterB4)
assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)