aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/MemoryStore.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/MemoryStore.scala257
1 files changed, 257 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
new file mode 100644
index 0000000000..828dc0f22d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.LinkedHashMap
+import java.util.concurrent.ArrayBlockingQueue
+import org.apache.spark.{SizeEstimator, Utils}
+import java.nio.ByteBuffer
+import collection.mutable.ArrayBuffer
+
+/**
+ * Stores blocks in memory, either as ArrayBuffers of deserialized Java objects or as
+ * serialized ByteBuffers.
+ */
+private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
+ extends BlockStore(blockManager) {
+
+ case class Entry(value: Any, size: Long, deserialized: Boolean, var dropPending: Boolean = false)
+
+ private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true)
+ private var currentMemory = 0L
+ // Object used to ensure that only one thread is putting blocks and if necessary, dropping
+ // blocks from the memory store.
+ private val putLock = new Object()
+
+ logInfo("MemoryStore started with capacity %s.".format(Utils.bytesToString(maxMemory)))
+
+ def freeMemory: Long = maxMemory - currentMemory
+
+ override def getSize(blockId: String): Long = {
+ entries.synchronized {
+ entries.get(blockId).size
+ }
+ }
+
+ override def putBytes(blockId: String, _bytes: ByteBuffer, level: StorageLevel) {
+ // Work on a duplicate - since the original input might be used elsewhere.
+ val bytes = _bytes.duplicate()
+ bytes.rewind()
+ if (level.deserialized) {
+ val values = blockManager.dataDeserialize(blockId, bytes)
+ val elements = new ArrayBuffer[Any]
+ elements ++= values
+ val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
+ tryToPut(blockId, elements, sizeEstimate, true)
+ } else {
+ tryToPut(blockId, bytes, bytes.limit, false)
+ }
+ }
+
+ override def putValues(
+ blockId: String,
+ values: ArrayBuffer[Any],
+ level: StorageLevel,
+ returnValues: Boolean)
+ : PutResult = {
+
+ if (level.deserialized) {
+ val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
+ tryToPut(blockId, values, sizeEstimate, true)
+ PutResult(sizeEstimate, Left(values.iterator))
+ } else {
+ val bytes = blockManager.dataSerialize(blockId, values.iterator)
+ tryToPut(blockId, bytes, bytes.limit, false)
+ PutResult(bytes.limit(), Right(bytes.duplicate()))
+ }
+ }
+
+ override def getBytes(blockId: String): Option[ByteBuffer] = {
+ val entry = entries.synchronized {
+ entries.get(blockId)
+ }
+ if (entry == null) {
+ None
+ } else if (entry.deserialized) {
+ Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[ArrayBuffer[Any]].iterator))
+ } else {
+ Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data
+ }
+ }
+
+ override def getValues(blockId: String): Option[Iterator[Any]] = {
+ val entry = entries.synchronized {
+ entries.get(blockId)
+ }
+ if (entry == null) {
+ None
+ } else if (entry.deserialized) {
+ Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
+ } else {
+ val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data
+ Some(blockManager.dataDeserialize(blockId, buffer))
+ }
+ }
+
+ override def remove(blockId: String): Boolean = {
+ entries.synchronized {
+ val entry = entries.get(blockId)
+ if (entry != null) {
+ entries.remove(blockId)
+ currentMemory -= entry.size
+ logInfo("Block %s of size %d dropped from memory (free %d)".format(
+ blockId, entry.size, freeMemory))
+ true
+ } else {
+ false
+ }
+ }
+ }
+
+ override def clear() {
+ entries.synchronized {
+ entries.clear()
+ }
+ logInfo("MemoryStore cleared")
+ }
+
+ /**
+ * Return the RDD ID that a given block ID is from, or null if it is not an RDD block.
+ */
+ private def getRddId(blockId: String): String = {
+ if (blockId.startsWith("rdd_")) {
+ blockId.split('_')(1)
+ } else {
+ null
+ }
+ }
+
+ /**
+ * Try to put in a set of values, if we can free up enough space. The value should either be
+ * an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated)
+ * size must also be passed by the caller.
+ *
+ * Locks on the object putLock to ensure that all the put requests and its associated block
+ * dropping is done by only on thread at a time. Otherwise while one thread is dropping
+ * blocks to free memory for one block, another thread may use up the freed space for
+ * another block.
+ */
+ private def tryToPut(blockId: String, value: Any, size: Long, deserialized: Boolean): Boolean = {
+ // TODO: Its possible to optimize the locking by locking entries only when selecting blocks
+ // to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has been
+ // released, it must be ensured that those to-be-dropped blocks are not double counted for
+ // freeing up more space for another block that needs to be put. Only then the actually dropping
+ // of blocks (and writing to disk if necessary) can proceed in parallel.
+ putLock.synchronized {
+ if (ensureFreeSpace(blockId, size)) {
+ val entry = new Entry(value, size, deserialized)
+ entries.synchronized { entries.put(blockId, entry) }
+ currentMemory += size
+ if (deserialized) {
+ logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
+ blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
+ } else {
+ logInfo("Block %s stored as bytes to memory (size %s, free %s)".format(
+ blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
+ }
+ true
+ } else {
+ // Tell the block manager that we couldn't put it in memory so that it can drop it to
+ // disk if the block allows disk storage.
+ val data = if (deserialized) {
+ Left(value.asInstanceOf[ArrayBuffer[Any]])
+ } else {
+ Right(value.asInstanceOf[ByteBuffer].duplicate())
+ }
+ blockManager.dropFromMemory(blockId, data)
+ false
+ }
+ }
+ }
+
+ /**
+ * Tries to free up a given amount of space to store a particular block, but can fail and return
+ * false if either the block is bigger than our memory or it would require replacing another
+ * block from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
+ * don't fit into memory that we want to avoid).
+ *
+ * Assumes that a lock is held by the caller to ensure only one thread is dropping blocks.
+ * Otherwise, the freed space may fill up before the caller puts in their new value.
+ */
+ private def ensureFreeSpace(blockIdToAdd: String, space: Long): Boolean = {
+
+ logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
+ space, currentMemory, maxMemory))
+
+ if (space > maxMemory) {
+ logInfo("Will not store " + blockIdToAdd + " as it is larger than our memory limit")
+ return false
+ }
+
+ if (maxMemory - currentMemory < space) {
+ val rddToAdd = getRddId(blockIdToAdd)
+ val selectedBlocks = new ArrayBuffer[String]()
+ var selectedMemory = 0L
+
+ // This is synchronized to ensure that the set of entries is not changed
+ // (because of getValue or getBytes) while traversing the iterator, as that
+ // can lead to exceptions.
+ entries.synchronized {
+ val iterator = entries.entrySet().iterator()
+ while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {
+ val pair = iterator.next()
+ val blockId = pair.getKey
+ if (rddToAdd != null && rddToAdd == getRddId(blockId)) {
+ logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " +
+ "block from the same RDD")
+ return false
+ }
+ selectedBlocks += blockId
+ selectedMemory += pair.getValue.size
+ }
+ }
+
+ if (maxMemory - (currentMemory - selectedMemory) >= space) {
+ logInfo(selectedBlocks.size + " blocks selected for dropping")
+ for (blockId <- selectedBlocks) {
+ val entry = entries.synchronized { entries.get(blockId) }
+ // This should never be null as only one thread should be dropping
+ // blocks and removing entries. However the check is still here for
+ // future safety.
+ if (entry != null) {
+ val data = if (entry.deserialized) {
+ Left(entry.value.asInstanceOf[ArrayBuffer[Any]])
+ } else {
+ Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
+ }
+ blockManager.dropFromMemory(blockId, data)
+ }
+ }
+ return true
+ } else {
+ return false
+ }
+ }
+ return true
+ }
+
+ override def contains(blockId: String): Boolean = {
+ entries.synchronized { entries.containsKey(blockId) }
+ }
+}
+