aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-03-21 17:19:39 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-03-21 17:19:39 -0700
commitb5f1ab701a167a728bb006e01b392b203da84391 (patch)
treeaf002e40214b1029e7d6f99b7b61389dbdac6895 /core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
parentb3e5af62a1c9e11556c4721164e6539d7ecce8e7 (diff)
downloadspark-b5f1ab701a167a728bb006e01b392b203da84391.tar.gz
spark-b5f1ab701a167a728bb006e01b392b203da84391.tar.bz2
spark-b5f1ab701a167a728bb006e01b392b203da84391.zip
[SPARK-13990] Automatically pick serializer when caching RDDs
Building on the `SerializerManager` introduced in SPARK-13926/ #11755, this patch Spark modifies Spark's BlockManager to use RDD's ClassTags in order to select the best serializer to use when caching RDD blocks. When storing a local block, the BlockManager `put()` methods use implicits to record ClassTags and stores those tags in the blocks' BlockInfo records. When reading a local block, the stored ClassTag is used to pick the appropriate serializer. When a block is stored with replication, the class tag is written into the block transfer metadata and will also be stored in the remote BlockManager. There are two or three places where we don't properly pass ClassTags, including TorrentBroadcast and BlockRDD. I think this happens to work because the missing ClassTag always happens to be `ClassTag.Any`, but it might be worth looking more carefully at those places to see whether we should be more explicit. Author: Josh Rosen <joshrosen@databricks.com> Closes #11801 from JoshRosen/pick-best-serializer-for-caching.
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala99
1 files changed, 59 insertions, 40 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 94171324f8..d370ee912a 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -21,6 +21,7 @@ import java.util.LinkedHashMap
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.internal.Logging
@@ -30,11 +31,18 @@ import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils}
import org.apache.spark.util.collection.SizeTrackingVector
import org.apache.spark.util.io.ChunkedByteBuffer
-private sealed trait MemoryEntry {
- val size: Long
+private sealed trait MemoryEntry[T] {
+ def size: Long
+ def classTag: ClassTag[T]
}
-private case class DeserializedMemoryEntry(value: Array[Any], size: Long) extends MemoryEntry
-private case class SerializedMemoryEntry(buffer: ChunkedByteBuffer, size: Long) extends MemoryEntry
+private case class DeserializedMemoryEntry[T](
+ value: Array[T],
+ size: Long,
+ classTag: ClassTag[T]) extends MemoryEntry[T]
+private case class SerializedMemoryEntry[T](
+ buffer: ChunkedByteBuffer,
+ size: Long,
+ classTag: ClassTag[T]) extends MemoryEntry[T]
/**
* Stores blocks in memory, either as Arrays of deserialized Java objects or as
@@ -49,7 +57,7 @@ private[spark] class MemoryStore(
// Note: all changes to memory allocations, notably putting blocks, evicting blocks, and
// acquiring or releasing unroll memory, must be synchronized on `memoryManager`!
- private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)
+ private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)
// A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes)
// All accesses of this map are assumed to have manually synchronized on `memoryManager`
@@ -95,13 +103,16 @@ private[spark] class MemoryStore(
*
* @return true if the put() succeeded, false otherwise.
*/
- def putBytes(blockId: BlockId, size: Long, _bytes: () => ChunkedByteBuffer): Boolean = {
+ def putBytes[T: ClassTag](
+ blockId: BlockId,
+ size: Long,
+ _bytes: () => ChunkedByteBuffer): Boolean = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
if (memoryManager.acquireStorageMemory(blockId, size)) {
// We acquired enough memory for the block, so go ahead and put it
val bytes = _bytes()
assert(bytes.size == size)
- val entry = new SerializedMemoryEntry(bytes, size)
+ val entry = new SerializedMemoryEntry[T](bytes, size, implicitly[ClassTag[T]])
entries.synchronized {
entries.put(blockId, entry)
}
@@ -129,10 +140,11 @@ private[spark] class MemoryStore(
* iterator or call `close()` on it in order to free the storage memory consumed by the
* partially-unrolled block.
*/
- private[storage] def putIterator(
+ private[storage] def putIterator[T](
blockId: BlockId,
- values: Iterator[Any],
- level: StorageLevel): Either[PartiallyUnrolledIterator, Long] = {
+ values: Iterator[T],
+ level: StorageLevel,
+ classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
@@ -151,7 +163,7 @@ private[spark] class MemoryStore(
// Keep track of unroll memory used by this particular block / putIterator() operation
var unrollMemoryUsedByThisBlock = 0L
// Underlying vector for unrolling the block
- var vector = new SizeTrackingVector[Any]
+ var vector = new SizeTrackingVector[T]()(classTag)
// Request enough memory to begin unrolling
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
@@ -187,10 +199,10 @@ private[spark] class MemoryStore(
val arrayValues = vector.toArray
vector = null
val entry = if (level.deserialized) {
- new DeserializedMemoryEntry(arrayValues, SizeEstimator.estimate(arrayValues))
+ new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag)
} else {
- val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
- new SerializedMemoryEntry(bytes, bytes.size)
+ val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)(classTag)
+ new SerializedMemoryEntry[T](bytes, bytes.size, classTag)
}
val size = entry.size
def transferUnrollToStorage(amount: Long): Unit = {
@@ -248,19 +260,21 @@ private[spark] class MemoryStore(
val entry = entries.synchronized { entries.get(blockId) }
entry match {
case null => None
- case e: DeserializedMemoryEntry =>
+ case e: DeserializedMemoryEntry[_] =>
throw new IllegalArgumentException("should only call getBytes on serialized blocks")
- case SerializedMemoryEntry(bytes, _) => Some(bytes)
+ case SerializedMemoryEntry(bytes, _, _) => Some(bytes)
}
}
- def getValues(blockId: BlockId): Option[Iterator[Any]] = {
+ def getValues(blockId: BlockId): Option[Iterator[_]] = {
val entry = entries.synchronized { entries.get(blockId) }
entry match {
case null => None
- case e: SerializedMemoryEntry =>
+ case e: SerializedMemoryEntry[_] =>
throw new IllegalArgumentException("should only call getValues on deserialized blocks")
- case DeserializedMemoryEntry(values, _) => Some(values.iterator)
+ case DeserializedMemoryEntry(values, _, _) =>
+ val x = Some(values)
+ x.map(_.iterator)
}
}
@@ -333,6 +347,24 @@ private[spark] class MemoryStore(
}
}
+ def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = {
+ val data = entry match {
+ case DeserializedMemoryEntry(values, _, _) => Left(values)
+ case SerializedMemoryEntry(buffer, _, _) => Right(buffer)
+ }
+ val newEffectiveStorageLevel =
+ blockManager.dropFromMemory(blockId, () => data)(entry.classTag)
+ if (newEffectiveStorageLevel.isValid) {
+ // The block is still present in at least one store, so release the lock
+ // but don't delete the block info
+ blockManager.releaseLock(blockId)
+ } else {
+ // The block isn't present in any store, so delete the block info so that the
+ // block can be stored again
+ blockManager.blockInfoManager.removeBlock(blockId)
+ }
+ }
+
if (freedMemory >= space) {
logInfo(s"${selectedBlocks.size} blocks selected for dropping")
for (blockId <- selectedBlocks) {
@@ -341,20 +373,7 @@ private[spark] class MemoryStore(
// blocks and removing entries. However the check is still here for
// future safety.
if (entry != null) {
- val data = entry match {
- case DeserializedMemoryEntry(values, _) => Left(values)
- case SerializedMemoryEntry(buffer, _) => Right(buffer)
- }
- val newEffectiveStorageLevel = blockManager.dropFromMemory(blockId, () => data)
- if (newEffectiveStorageLevel.isValid) {
- // The block is still present in at least one store, so release the lock
- // but don't delete the block info
- blockManager.releaseLock(blockId)
- } else {
- // The block isn't present in any store, so delete the block info so that the
- // block can be stored again
- blockManager.blockInfoManager.removeBlock(blockId)
- }
+ dropBlock(blockId, entry)
}
}
freedMemory
@@ -470,16 +489,16 @@ private[spark] class MemoryStore(
* @param unrolled an iterator for the partially-unrolled values.
* @param rest the rest of the original iterator passed to [[MemoryStore.putIterator()]].
*/
-private[storage] class PartiallyUnrolledIterator(
+private[storage] class PartiallyUnrolledIterator[T](
memoryStore: MemoryStore,
unrollMemory: Long,
- unrolled: Iterator[Any],
- rest: Iterator[Any])
- extends Iterator[Any] {
+ unrolled: Iterator[T],
+ rest: Iterator[T])
+ extends Iterator[T] {
private[this] var unrolledIteratorIsConsumed: Boolean = false
- private[this] var iter: Iterator[Any] = {
- val completionIterator = CompletionIterator[Any, Iterator[Any]](unrolled, {
+ private[this] var iter: Iterator[T] = {
+ val completionIterator = CompletionIterator[T, Iterator[T]](unrolled, {
unrolledIteratorIsConsumed = true
memoryStore.releaseUnrollMemoryForThisTask(unrollMemory)
})
@@ -487,7 +506,7 @@ private[storage] class PartiallyUnrolledIterator(
}
override def hasNext: Boolean = iter.hasNext
- override def next(): Any = iter.next()
+ override def next(): T = iter.next()
/**
* Called to dispose of this iterator and free its memory.