aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKousuke Saruta <sarutak@oss.nttdata.co.jp>2014-09-03 18:42:01 -0700
committerAndrew Or <andrewor14@gmail.com>2014-09-03 18:42:01 -0700
commit4bba10c41acaf84a1c4a8e2db467c22f5ab7cbb9 (patch)
tree26a3c472c95ba7d5e53792a428236b887eae488f /core
parente08ea7393df46567f552aa67c60a690c231775e4 (diff)
downloadspark-4bba10c41acaf84a1c4a8e2db467c22f5ab7cbb9.tar.gz
spark-4bba10c41acaf84a1c4a8e2db467c22f5ab7cbb9.tar.bz2
spark-4bba10c41acaf84a1c4a8e2db467c22f5ab7cbb9.zip
[SPARK-3233] Executor never stop its SparnEnv, BlockManager, ConnectionManager etc.
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp> Closes #2138 from sarutak/SPARK-3233 and squashes the following commits: c0205b7 [Kousuke Saruta] Merge branch 'SPARK-3233' of github.com:sarutak/spark into SPARK-3233 064679d [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3233 d3005fd [Kousuke Saruta] Modified Class definition format of BlockManagerMaster 039b747 [Kousuke Saruta] Modified style 889e2d1 [Kousuke Saruta] Modified BlockManagerMaster to be able to be past isDriver flag 4da8535 [Kousuke Saruta] Modified BlockManagerMaster#stop to send StopBlockManagerMaster message when sender is Driver 6518c3a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3233 d5ab19a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3233 6bce25c [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3233 6058a58 [Kousuke Saruta] Modified Executor not to invoke SparkEnv#stop in local mode e5ad9d3 [Kousuke Saruta] Modified Executor to stop SparnEnv at the end of itself
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala2
6 files changed, 13 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 72716567ca..2973d002cc 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -225,7 +225,7 @@ object SparkEnv extends Logging {
val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
"BlockManagerMaster",
- new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf)
+ new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
serializer, conf, securityManager, mapOutputTracker, shuffleManager)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index d7d19f6fa3..dd903dc65d 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -123,6 +123,9 @@ private[spark] class Executor(
env.metricsSystem.report()
isStopped = true
threadPool.shutdown()
+ if (!isLocal) {
+ env.stop()
+ }
}
class TaskRunner(
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index e67b3dc5ce..2e262594b3 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -27,7 +27,11 @@ import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.AkkaUtils
private[spark]
-class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Logging {
+class BlockManagerMaster(
+ var driverActor: ActorRef,
+ conf: SparkConf,
+ isDriver: Boolean)
+ extends Logging {
private val AKKA_RETRY_ATTEMPTS: Int = AkkaUtils.numRetries(conf)
private val AKKA_RETRY_INTERVAL_MS: Int = AkkaUtils.retryWaitMs(conf)
@@ -196,7 +200,7 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
/** Stop the driver actor, called only on the Spark driver node */
def stop() {
- if (driverActor != null) {
+ if (driverActor != null && isDriver) {
tell(StopBlockManagerMaster)
driverActor = null
logInfo("BlockManagerMaster stopped")
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index aa83ea90ee..7540f0d5e2 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -99,7 +99,7 @@ private[spark] object ThreadingTest {
val serializer = new KryoSerializer(conf)
val blockManagerMaster = new BlockManagerMaster(
actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
- conf)
+ conf, true)
val blockManager = new BlockManager(
"<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf,
new SecurityManager(conf), new MapOutputTrackerMaster(conf), new HashShuffleManager(conf))
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 1a42fc1b23..0bb91febde 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -120,7 +120,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
*/
val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]]
// stub out BlockManagerMaster.getLocations to use our cacheLocations
- val blockManagerMaster = new BlockManagerMaster(null, conf) {
+ val blockManagerMaster = new BlockManagerMaster(null, conf, true) {
override def getLocations(blockIds: Array[BlockId]): Seq[Seq[BlockManagerId]] = {
blockIds.map {
_.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => cacheLocations.get(key)).
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 14ffadab99..c200654162 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -93,7 +93,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
master = new BlockManagerMaster(
actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
- conf)
+ conf, true)
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()