From 83143f9a5f92ca5c341332c809f0adf7e58885b6 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sun, 30 Sep 2012 21:19:39 -0700 Subject: Fixed several bugs that caused weird behavior with files in spark-shell: - SizeEstimator was following through a ClassLoader field of Hadoop JobConfs, which referenced the whole interpreter, Scala compiler, etc. Chaos ensued, giving an estimated size in the tens of gigabytes. - Broadcast variables in local mode were only stored as MEMORY_ONLY and never made accessible over a server, so they fell out of the cache when they were deemed too large and couldn't be reloaded. --- core/src/main/scala/spark/SizeEstimator.scala | 10 +++++++--- core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala | 8 ++++---- core/src/main/scala/spark/broadcast/HttpBroadcast.scala | 6 +++--- core/src/main/scala/spark/broadcast/TreeBroadcast.scala | 4 ++-- core/src/main/scala/spark/storage/BlockManager.scala | 4 +++- 5 files changed, 19 insertions(+), 13 deletions(-) (limited to 'core/src/main') diff --git a/core/src/main/scala/spark/SizeEstimator.scala b/core/src/main/scala/spark/SizeEstimator.scala index aadd475868..71b9c1f62a 100644 --- a/core/src/main/scala/spark/SizeEstimator.scala +++ b/core/src/main/scala/spark/SizeEstimator.scala @@ -77,10 +77,10 @@ object SizeEstimator extends Logging { return System.getProperty("spark.test.useCompressedOops").toBoolean } try { - val hotSpotMBeanName = "com.sun.management:type=HotSpotDiagnostic"; - val server = ManagementFactory.getPlatformMBeanServer(); + val hotSpotMBeanName = "com.sun.management:type=HotSpotDiagnostic" + val server = ManagementFactory.getPlatformMBeanServer() val bean = ManagementFactory.newPlatformMXBeanProxy(server, - hotSpotMBeanName, classOf[HotSpotDiagnosticMXBean]); + hotSpotMBeanName, classOf[HotSpotDiagnosticMXBean]) return bean.getVMOption("UseCompressedOops").getValue.toBoolean } catch { case e: Exception => { @@ -142,6 +142,10 @@ object SizeEstimator extends Logging { val cls = obj.getClass if (cls.isArray) { visitArray(obj, cls, state) + } else if (classOf[ClassLoader].isAssignableFrom(cls) || classOf[Class].isAssignableFrom(cls)) { + // Hadoop JobConfs created in the interpreter have a ClassLoader, which greatly confuses + // the size estimator since it references the whole REPL. Do nothing in this case. In + // general all ClassLoaders and Classes will be shared between objects anyway. } else { val classInfo = getClassInfo(cls) state.size += classInfo.shellSize diff --git a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala index 0bb9937992..0b9647d168 100644 --- a/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala @@ -18,7 +18,7 @@ extends Broadcast[T] with Logging with Serializable { MultiTracker.synchronized { SparkEnv.get.blockManager.putSingle( - uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) + uuid.toString, value_, StorageLevel.MEMORY_AND_DISK, false) } @transient var arrayOfBlocks: Array[BroadcastBlock] = null @@ -53,7 +53,7 @@ extends Broadcast[T] with Logging with Serializable { // Must call this after all the variables have been created/initialized if (!isLocal) { - sendBroadcast + sendBroadcast() } def sendBroadcast() { @@ -119,7 +119,7 @@ extends Broadcast[T] with Logging with Serializable { logInfo("Started reading broadcast variable " + uuid) // Initializing everything because Master will only send null/0 values // Only the 1st worker in a node can be here. Others will get from cache - initializeWorkerVariables + initializeWorkerVariables() logInfo("Local host address: " + hostAddress) @@ -135,7 +135,7 @@ extends Broadcast[T] with Logging with Serializable { if (receptionSucceeded) { value_ = MultiTracker.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks) SparkEnv.get.blockManager.putSingle( - uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) + uuid.toString, value_, StorageLevel.MEMORY_AND_DISK, false) } else { logError("Reading Broadcasted variable " + uuid + " failed") } diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala index 64037fb2d5..f5f2b3dbf2 100644 --- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala @@ -19,7 +19,7 @@ extends Broadcast[T] with Logging with Serializable { HttpBroadcast.synchronized { SparkEnv.get.blockManager.putSingle( - uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) + uuid.toString, value_, StorageLevel.MEMORY_AND_DISK, false) } if (!isLocal) { @@ -37,7 +37,7 @@ extends Broadcast[T] with Logging with Serializable { val start = System.nanoTime value_ = HttpBroadcast.read[T](uuid) SparkEnv.get.blockManager.putSingle( - uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) + uuid.toString, value_, StorageLevel.MEMORY_AND_DISK, false) val time = (System.nanoTime - start) / 1e9 logInfo("Reading broadcast variable " + uuid + " took " + time + " s") } @@ -80,8 +80,8 @@ private object HttpBroadcast extends Logging { if (server != null) { server.stop() server = null - initialized = false } + initialized = false } } diff --git a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala index 3b54d570be..574477a5fc 100644 --- a/core/src/main/scala/spark/broadcast/TreeBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/TreeBroadcast.scala @@ -17,7 +17,7 @@ extends Broadcast[T] with Logging with Serializable { MultiTracker.synchronized { SparkEnv.get.blockManager.putSingle( - uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) + uuid.toString, value_, StorageLevel.MEMORY_AND_DISK, false) } @transient var arrayOfBlocks: Array[BroadcastBlock] = null @@ -112,7 +112,7 @@ extends Broadcast[T] with Logging with Serializable { if (receptionSucceeded) { value_ = MultiTracker.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks) SparkEnv.get.blockManager.putSingle( - uuid.toString, value_, StorageLevel.MEMORY_ONLY, false) + uuid.toString, value_, StorageLevel.MEMORY_AND_DISK, false) } else { logError("Reading Broadcasted variable " + uuid + " failed") } diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 224c55d9d7..5384274d65 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -545,9 +545,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } if (level.useMemory) { + bytes.rewind() memoryStore.putBytes(blockId, bytes, level) } if (level.useDisk) { + bytes.rewind() diskStore.putBytes(blockId, bytes, level) } @@ -639,7 +641,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m logWarning("Block " + blockId + " cannot be removed from memory as it is not in memory") return } - memoryStore.remove(blockId) + memoryStore.remove(blockId) val newLevel = new StorageLevel(level.useDisk, false, level.deserialized, level.replication) setLevelAndTellMaster(blockId, newLevel) } -- cgit v1.2.3