From 5f54bdf98b4d986e0b03d02ea318f6224cd190d2 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 13 Jun 2012 20:49:00 -0400 Subject: Added shutdown for akka to SparkContext.stop(). Helps a little, but many testsuites still fail. --- core/src/main/scala/spark/SparkContext.scala | 1 + core/src/main/scala/spark/network/ConnectionManager.scala | 14 +++++++------- core/src/main/scala/spark/storage/BlockManager.scala | 1 + core/src/main/scala/spark/storage/BlockManagerMaster.scala | 6 +++++- core/src/main/scala/spark/storage/BlockStore.scala | 1 + sbt/sbt | 2 +- 6 files changed, 16 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index eae71571db..b43aca2b97 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -260,6 +260,7 @@ class SparkContext( // Stop the SparkContext def stop() { + remote.shutdownServerModule() dagScheduler.stop() dagScheduler = null taskScheduler = null diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala index f0b942c492..3222187990 100644 --- a/core/src/main/scala/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/spark/network/ConnectionManager.scala @@ -334,14 +334,14 @@ class ConnectionManager(port: Int) extends Logging { selectorThread.interrupt() selectorThread.join() selector.close() + val connections = connectionsByKey.values + connections.foreach(_.close()) + if (connectionsByKey.size != 0) { + logWarning("All connections not cleaned up") + } + handleMessageExecutor.shutdown() + logInfo("ConnectionManager stopped") } - val connections = connectionsByKey.values - connections.foreach(_.close()) - if (connectionsByKey.size != 0) { - logWarning("All connections not cleaned up") - } - handleMessageExecutor.shutdown() - logInfo("ConnectionManager stopped") } } diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 999bbc2128..9e4816f7ce 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -573,6 +573,7 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging blockInfo.clear() memoryStore.clear() diskStore.clear() + logInfo("BlockManager stopped") } } diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 85edbbe0cd..d8400a1f65 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -352,7 +352,11 @@ object BlockManagerMaster extends Logging { } def stopBlockManagerMaster() { - if (masterActor != null) masterActor.stop() + if (masterActor != null) { + masterActor.stop() + masterActor = null + logInfo("BlockManagerMaster stopped") + } } def notifyADeadHost(host: String) { diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala index 52f2cc32e8..8672a5376e 100644 --- a/core/src/main/scala/spark/storage/BlockStore.scala +++ b/core/src/main/scala/spark/storage/BlockStore.scala @@ -125,6 +125,7 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long) memoryStore.clear() } blockDropper.shutdown() + logInfo("MemoryStore cleared") } private def drop(blockId: String) { diff --git a/sbt/sbt b/sbt/sbt index 714e3d15d7..fab9967286 100755 --- a/sbt/sbt +++ b/sbt/sbt @@ -4,4 +4,4 @@ if [ "$MESOS_HOME" != "" ]; then EXTRA_ARGS="-Djava.library.path=$MESOS_HOME/lib/java" fi export SPARK_HOME=$(cd "$(dirname $0)/.."; pwd) -java -Xmx800M -XX:MaxPermSize=150m $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@" +java -Xmx1200M -XX:MaxPermSize=200m $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@" -- cgit v1.2.3