diff options
author | Kousuke Saruta <sarutak@oss.nttdata.co.jp> | 2014-09-03 18:42:01 -0700 |
---|---|---|
committer | Andrew Or <andrewor14@gmail.com> | 2014-09-03 18:42:01 -0700 |
commit | 4bba10c41acaf84a1c4a8e2db467c22f5ab7cbb9 (patch) | |
tree | 26a3c472c95ba7d5e53792a428236b887eae488f | |
parent | e08ea7393df46567f552aa67c60a690c231775e4 (diff) | |
download | spark-4bba10c41acaf84a1c4a8e2db467c22f5ab7cbb9.tar.gz spark-4bba10c41acaf84a1c4a8e2db467c22f5ab7cbb9.tar.bz2 spark-4bba10c41acaf84a1c4a8e2db467c22f5ab7cbb9.zip |
[SPARK-3233] Executor never stop its SparnEnv, BlockManager, ConnectionManager etc.
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Closes #2138 from sarutak/SPARK-3233 and squashes the following commits:
c0205b7 [Kousuke Saruta] Merge branch 'SPARK-3233' of github.com:sarutak/spark into SPARK-3233
064679d [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3233
d3005fd [Kousuke Saruta] Modified Class definition format of BlockManagerMaster
039b747 [Kousuke Saruta] Modified style
889e2d1 [Kousuke Saruta] Modified BlockManagerMaster to be able to be past isDriver flag
4da8535 [Kousuke Saruta] Modified BlockManagerMaster#stop to send StopBlockManagerMaster message when sender is Driver
6518c3a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3233
d5ab19a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3233
6bce25c [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3233
6058a58 [Kousuke Saruta] Modified Executor not to invoke SparkEnv#stop in local mode
e5ad9d3 [Kousuke Saruta] Modified Executor to stop SparnEnv at the end of itself
6 files changed, 13 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 72716567ca..2973d002cc 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -225,7 +225,7 @@ object SparkEnv extends Logging { val blockManagerMaster = new BlockManagerMaster(registerOrLookup( "BlockManagerMaster", - new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf) + new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver) val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf, securityManager, mapOutputTracker, shuffleManager) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index d7d19f6fa3..dd903dc65d 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -123,6 +123,9 @@ private[spark] class Executor( env.metricsSystem.report() isStopped = true threadPool.shutdown() + if (!isLocal) { + env.stop() + } } class TaskRunner( diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index e67b3dc5ce..2e262594b3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -27,7 +27,11 @@ import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.AkkaUtils private[spark] -class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Logging { +class BlockManagerMaster( + var driverActor: ActorRef, + conf: SparkConf, + isDriver: Boolean) + extends Logging { private val AKKA_RETRY_ATTEMPTS: Int = AkkaUtils.numRetries(conf) private val AKKA_RETRY_INTERVAL_MS: Int = AkkaUtils.retryWaitMs(conf) @@ -196,7 +200,7 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log /** Stop the driver actor, called only on the Spark driver node */ def stop() { - if (driverActor != null) { + if (driverActor != null && isDriver) { tell(StopBlockManagerMaster) driverActor = null logInfo("BlockManagerMaster stopped") diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala index aa83ea90ee..7540f0d5e2 100644 --- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala +++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala @@ -99,7 +99,7 @@ private[spark] object ThreadingTest { val serializer = new KryoSerializer(conf) val blockManagerMaster = new BlockManagerMaster( actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))), - conf) + conf, true) val blockManager = new BlockManager( "<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf, new SecurityManager(conf), new MapOutputTrackerMaster(conf), new HashShuffleManager(conf)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 1a42fc1b23..0bb91febde 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -120,7 +120,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F */ val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]] // stub out BlockManagerMaster.getLocations to use our cacheLocations - val blockManagerMaster = new BlockManagerMaster(null, conf) { + val blockManagerMaster = new BlockManagerMaster(null, conf, true) { override def getLocations(blockIds: Array[BlockId]): Seq[Seq[BlockManagerId]] = { blockIds.map { _.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => cacheLocations.get(key)). diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 14ffadab99..c200654162 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -93,7 +93,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter master = new BlockManagerMaster( actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))), - conf) + conf, true) val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() |