diff options
6 files changed, 13 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 72716567ca..2973d002cc 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -225,7 +225,7 @@ object SparkEnv extends Logging { val blockManagerMaster = new BlockManagerMaster(registerOrLookup( "BlockManagerMaster", - new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf) + new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver) val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf, securityManager, mapOutputTracker, shuffleManager) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index d7d19f6fa3..dd903dc65d 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -123,6 +123,9 @@ private[spark] class Executor( env.metricsSystem.report() isStopped = true threadPool.shutdown() + if (!isLocal) { + env.stop() + } } class TaskRunner( diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index e67b3dc5ce..2e262594b3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -27,7 +27,11 @@ import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.AkkaUtils private[spark] -class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Logging { +class BlockManagerMaster( + var driverActor: ActorRef, + conf: SparkConf, + isDriver: Boolean) + extends Logging { private val AKKA_RETRY_ATTEMPTS: Int = AkkaUtils.numRetries(conf) private val AKKA_RETRY_INTERVAL_MS: Int = AkkaUtils.retryWaitMs(conf) @@ -196,7 +200,7 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log /** Stop the driver actor, called only on the Spark driver node */ def stop() { - if (driverActor != null) { + if (driverActor != null && isDriver) { tell(StopBlockManagerMaster) driverActor = null logInfo("BlockManagerMaster stopped") diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala index aa83ea90ee..7540f0d5e2 100644 --- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala +++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala @@ -99,7 +99,7 @@ private[spark] object ThreadingTest { val serializer = new KryoSerializer(conf) val blockManagerMaster = new BlockManagerMaster( actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))), - conf) + conf, true) val blockManager = new BlockManager( "<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf, new SecurityManager(conf), new MapOutputTrackerMaster(conf), new HashShuffleManager(conf)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 1a42fc1b23..0bb91febde 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -120,7 +120,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F */ val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]] // stub out BlockManagerMaster.getLocations to use our cacheLocations - val blockManagerMaster = new BlockManagerMaster(null, conf) { + val blockManagerMaster = new BlockManagerMaster(null, conf, true) { override def getLocations(blockIds: Array[BlockId]): Seq[Seq[BlockManagerId]] = { blockIds.map { _.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => cacheLocations.get(key)). diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 14ffadab99..c200654162 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -93,7 +93,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter master = new BlockManagerMaster( actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))), - conf) + conf, true) val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() |