From 7bcb08cef5e6438ce8c8efa3da3a8f94f2a1fbf9 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Thu, 27 Sep 2012 17:50:59 -0700 Subject: Renamed storage levels to something cleaner; fixes #223. --- core/src/main/scala/spark/KryoSerializer.scala | 4 ++-- core/src/main/scala/spark/RDD.scala | 4 ++-- .../main/scala/spark/broadcast/BitTorrentBroadcast.scala | 4 ++-- core/src/main/scala/spark/broadcast/HttpBroadcast.scala | 4 ++-- core/src/main/scala/spark/broadcast/TreeBroadcast.scala | 4 ++-- core/src/main/scala/spark/storage/BlockMessage.scala | 2 +- .../src/main/scala/spark/storage/BlockMessageArray.scala | 2 +- core/src/main/scala/spark/storage/StorageLevel.scala | 16 ++++++++-------- 8 files changed, 20 insertions(+), 20 deletions(-) (limited to 'core/src/main') diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala index 4b95e05bd3..244d50f49c 100644 --- a/core/src/main/scala/spark/KryoSerializer.scala +++ b/core/src/main/scala/spark/KryoSerializer.scala @@ -192,8 +192,8 @@ class KryoSerializer extends Serializer with Logging { (1, 1, 1), (1, 1, 1, 1), (1, 1, 1, 1, 1), None, ByteBuffer.allocate(1), - StorageLevel.MEMORY_ONLY_DESER, - PutBlock("1", ByteBuffer.allocate(1), StorageLevel.MEMORY_ONLY_DESER), + StorageLevel.MEMORY_ONLY, + PutBlock("1", ByteBuffer.allocate(1), StorageLevel.MEMORY_ONLY), GotBlock("1", ByteBuffer.allocate(1)), GetBlock("1") ) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index cce0ea2183..b0d86ebbae 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -89,14 +89,14 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial } // Turn on the default caching level for this RDD - def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY_DESER) + def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY) // Turn on the default caching level for this RDD def cache(): RDD[T] = persist() def getStorageLevel = storageLevel - def checkpoint(level: StorageLevel = StorageLevel.DISK_AND_MEMORY_DESER_2): RDD[T] = { + def checkpoint(level: StorageLevel = StorageLevel.MEMORY_AND_DISK_2): RDD[T] = { if (!level.useDisk && level.replication < 2) { throw new Exception("Cannot checkpoint without using disk or replication (level requested was " + level + ")") } diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala index 016dc00fb0..0bb9937992 100644 --- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala @@ -18,7 +18,7 @@ extends Broadcast[T] with Logging with Serializable { MultiTracker.synchronized { SparkEnv.get.blockManager.putSingle( - uuid.toString, value_, StorageLevel.MEMORY_ONLY_DESER, false) + uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) } @transient var arrayOfBlocks: Array[BroadcastBlock] = null @@ -135,7 +135,7 @@ extends Broadcast[T] with Logging with Serializable { if (receptionSucceeded) { value_ = MultiTracker.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks) SparkEnv.get.blockManager.putSingle( - uuid.toString, value_, StorageLevel.MEMORY_ONLY_DESER, false) + uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) } else { logError("Reading Broadcasted variable " + uuid + " failed") } diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala index eacf237508..a98cbb6994 100644 --- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala @@ -19,7 +19,7 @@ extends Broadcast[T] with Logging with Serializable { HttpBroadcast.synchronized { SparkEnv.get.blockManager.putSingle( - uuid.toString, value_, StorageLevel.MEMORY_ONLY_DESER, false) + uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) } if (!isLocal) { @@ -37,7 +37,7 @@ extends Broadcast[T] with Logging with Serializable { val start = System.nanoTime value_ = HttpBroadcast.read[T](uuid) SparkEnv.get.blockManager.putSingle( - uuid.toString, value_, StorageLevel.MEMORY_ONLY_DESER, false) + uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) val time = (System.nanoTime - start) / 1e9 logInfo("Reading broadcast variable " + uuid + " took " + time + " s") } diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala index c9e1e67d87..3b54d570be 100644 --- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala @@ -17,7 +17,7 @@ extends Broadcast[T] with Logging with Serializable { MultiTracker.synchronized { SparkEnv.get.blockManager.putSingle( - uuid.toString, value_, StorageLevel.MEMORY_ONLY_DESER, false) + uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) } @transient var arrayOfBlocks: Array[BroadcastBlock] = null @@ -112,7 +112,7 @@ extends Broadcast[T] with Logging with Serializable { if (receptionSucceeded) { value_ = MultiTracker.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks) SparkEnv.get.blockManager.putSingle( - uuid.toString, value_, StorageLevel.MEMORY_ONLY_DESER, false) + uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) } else { logError("Reading Broadcasted variable " + uuid + " failed") } diff --git a/core/src/main/scala/spark/storage/BlockMessage.scala b/core/src/main/scala/spark/storage/BlockMessage.scala index 5e2ccb199a..4b5cfebba2 100644 --- a/core/src/main/scala/spark/storage/BlockMessage.scala +++ b/core/src/main/scala/spark/storage/BlockMessage.scala @@ -196,7 +196,7 @@ object BlockMessage { def main(args: Array[String]) { val B = new BlockMessage() - B.set(new PutBlock("ABC", ByteBuffer.allocate(10), StorageLevel.DISK_AND_MEMORY_2)) + B.set(new PutBlock("ABC", ByteBuffer.allocate(10), StorageLevel.MEMORY_AND_DISK_SER_2)) val bMsg = B.toBufferMessage val C = new BlockMessage() C.set(bMsg) diff --git a/core/src/main/scala/spark/storage/BlockMessageArray.scala b/core/src/main/scala/spark/storage/BlockMessageArray.scala index 928857056f..64acc7eb47 100644 --- a/core/src/main/scala/spark/storage/BlockMessageArray.scala +++ b/core/src/main/scala/spark/storage/BlockMessageArray.scala @@ -99,7 +99,7 @@ object BlockMessageArray { if (i % 2 == 0) { val buffer = ByteBuffer.allocate(100) buffer.clear - BlockMessage.fromPutBlock(PutBlock(i.toString, buffer, StorageLevel.MEMORY_ONLY)) + BlockMessage.fromPutBlock(PutBlock(i.toString, buffer, StorageLevel.MEMORY_ONLY_SER)) } else { BlockMessage.fromGetBlock(GetBlock(i.toString)) } diff --git a/core/src/main/scala/spark/storage/StorageLevel.scala b/core/src/main/scala/spark/storage/StorageLevel.scala index b168c8e869..2d52fac1ef 100644 --- a/core/src/main/scala/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/spark/storage/StorageLevel.scala @@ -67,12 +67,12 @@ object StorageLevel { val NONE = new StorageLevel(false, false, false) val DISK_ONLY = new StorageLevel(true, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, 2) - val MEMORY_ONLY = new StorageLevel(false, true, false) - val MEMORY_ONLY_2 = new StorageLevel(false, true, false, 2) - val MEMORY_ONLY_DESER = new StorageLevel(false, true, true) - val MEMORY_ONLY_DESER_2 = new StorageLevel(false, true, true, 2) - val DISK_AND_MEMORY = new StorageLevel(true, true, false) - val DISK_AND_MEMORY_2 = new StorageLevel(true, true, false, 2) - val DISK_AND_MEMORY_DESER = new StorageLevel(true, true, true) - val DISK_AND_MEMORY_DESER_2 = new StorageLevel(true, true, true, 2) + val MEMORY_ONLY = new StorageLevel(false, true, true) + val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2) + val MEMORY_ONLY_SER = new StorageLevel(false, true, false) + val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2) + val MEMORY_AND_DISK = new StorageLevel(true, true, true) + val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2) + val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false) + val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2) } -- cgit v1.2.3