aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ExecutorEndpoint.scala43
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala7
-rw-r--r--project/MimaExcludes.scala8
8 files changed, 29 insertions, 67 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 2c10779f2b..b030d3c71d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -48,20 +48,20 @@ import org.apache.mesos.MesosNativeLibrary
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
-import org.apache.spark.executor.{ExecutorEndpoint, TriggerThreadDump}
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat,
FixedLengthBinaryInputFormat}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
-import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
+import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
SparkDeploySchedulerBackend, SimrSchedulerBackend}
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage._
+import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
import org.apache.spark.ui.{SparkUI, ConsoleProgressBar}
import org.apache.spark.ui.jobs.JobProgressListener
import org.apache.spark.util._
@@ -619,11 +619,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
if (executorId == SparkContext.DRIVER_IDENTIFIER) {
Some(Utils.getThreadDump())
} else {
- val (host, port) = env.blockManager.master.getRpcHostPortForExecutor(executorId).get
- val endpointRef = env.rpcEnv.setupEndpointRef(
- SparkEnv.executorActorSystemName,
- RpcAddress(host, port),
- ExecutorEndpoint.EXECUTOR_ENDPOINT_NAME)
+ val endpointRef = env.blockManager.master.getExecutorEndpointRef(executorId).get
Some(endpointRef.askWithRetry[Array[ThreadStackTrace]](TriggerThreadDump))
}
} catch {
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 9e88d488c0..6154f06e3a 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -85,10 +85,6 @@ private[spark] class Executor(
env.blockManager.initialize(conf.getAppId)
}
- // Create an RpcEndpoint for receiving RPCs from the driver
- private val executorEndpoint = env.rpcEnv.setupEndpoint(
- ExecutorEndpoint.EXECUTOR_ENDPOINT_NAME, new ExecutorEndpoint(env.rpcEnv, executorId))
-
// Whether to load classes in user jars before those in Spark jars
private val userClassPathFirst = conf.getBoolean("spark.executor.userClassPathFirst", false)
@@ -136,7 +132,6 @@ private[spark] class Executor(
def stop(): Unit = {
env.metricsSystem.report()
- env.rpcEnv.stop(executorEndpoint)
heartbeater.shutdown()
heartbeater.awaitTermination(10, TimeUnit.SECONDS)
threadPool.shutdown()
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorEndpoint.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorEndpoint.scala
deleted file mode 100644
index cf362f8464..0000000000
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorEndpoint.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.executor
-
-import org.apache.spark.rpc.{RpcEnv, RpcCallContext, RpcEndpoint}
-import org.apache.spark.util.Utils
-
-/**
- * Driver -> Executor message to trigger a thread dump.
- */
-private[spark] case object TriggerThreadDump
-
-/**
- * [[RpcEndpoint]] that runs inside of executors to enable driver -> executor RPC.
- */
-private[spark]
-class ExecutorEndpoint(override val rpcEnv: RpcEnv, executorId: String) extends RpcEndpoint {
-
- override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case TriggerThreadDump =>
- context.reply(Utils.getThreadDump())
- }
-
-}
-
-object ExecutorEndpoint {
- val EXECUTOR_ENDPOINT_NAME = "ExecutorEndpoint"
-}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index f45bff34d4..440c4c18aa 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -87,8 +87,8 @@ class BlockManagerMaster(
driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetPeers(blockManagerId))
}
- def getRpcHostPortForExecutor(executorId: String): Option[(String, Int)] = {
- driverEndpoint.askWithRetry[Option[(String, Int)]](GetRpcHostPortForExecutor(executorId))
+ def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
+ driverEndpoint.askWithRetry[Option[RpcEndpointRef]](GetExecutorEndpointRef(executorId))
}
/**
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 7db6035553..41892b4ffc 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -19,7 +19,6 @@ package org.apache.spark.storage
import java.util.{HashMap => JHashMap}
-import scala.collection.immutable.HashSet
import scala.collection.mutable
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
@@ -75,8 +74,8 @@ class BlockManagerMasterEndpoint(
case GetPeers(blockManagerId) =>
context.reply(getPeers(blockManagerId))
- case GetRpcHostPortForExecutor(executorId) =>
- context.reply(getRpcHostPortForExecutor(executorId))
+ case GetExecutorEndpointRef(executorId) =>
+ context.reply(getExecutorEndpointRef(executorId))
case GetMemoryStatus =>
context.reply(memoryStatus)
@@ -388,15 +387,14 @@ class BlockManagerMasterEndpoint(
}
/**
- * Returns the hostname and port of an executor, based on the [[RpcEnv]] address of its
- * [[BlockManagerSlaveEndpoint]].
+ * Returns an [[RpcEndpointRef]] of the [[BlockManagerSlaveEndpoint]] for sending RPC messages.
*/
- private def getRpcHostPortForExecutor(executorId: String): Option[(String, Int)] = {
+ private def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
for (
blockManagerId <- blockManagerIdByExecutor.get(executorId);
info <- blockManagerInfo.get(blockManagerId)
) yield {
- (info.slaveEndpoint.address.host, info.slaveEndpoint.address.port)
+ info.slaveEndpoint
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 376e9eb488..f392a4a0cd 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -42,6 +42,11 @@ private[spark] object BlockManagerMessages {
case class RemoveBroadcast(broadcastId: Long, removeFromDriver: Boolean = true)
extends ToBlockManagerSlave
+ /**
+ * Driver -> Executor message to trigger a thread dump.
+ */
+ case object TriggerThreadDump extends ToBlockManagerSlave
+
//////////////////////////////////////////////////////////////////////////////////
// Messages from slaves to the master.
//////////////////////////////////////////////////////////////////////////////////
@@ -90,7 +95,7 @@ private[spark] object BlockManagerMessages {
case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
- case class GetRpcHostPortForExecutor(executorId: String) extends ToBlockManagerMaster
+ case class GetExecutorEndpointRef(executorId: String) extends ToBlockManagerMaster
case class RemoveExecutor(execId: String) extends ToBlockManagerMaster
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
index e749631bf6..9eca902f74 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
@@ -19,10 +19,10 @@ package org.apache.spark.storage
import scala.concurrent.{ExecutionContext, Future}
-import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcEnv, RpcCallContext, RpcEndpoint}
-import org.apache.spark.util.ThreadUtils
import org.apache.spark.{Logging, MapOutputTracker, SparkEnv}
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.util.{ThreadUtils, Utils}
/**
* An RpcEndpoint to take commands from the master to execute options. For example,
@@ -70,6 +70,9 @@ class BlockManagerSlaveEndpoint(
case GetMatchingBlockIds(filter, _) =>
context.reply(blockManager.getMatchingBlockIds(filter))
+
+ case TriggerThreadDump =>
+ context.reply(Utils.getThreadDump())
}
private def doAsync[T](actionMessage: String, context: RpcCallContext)(body: => T) {
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 54a9ad956d..566bfe8efb 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -147,6 +147,14 @@ object MimaExcludes {
// SPARK-4557 Changed foreachRDD to use VoidFunction
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.streaming.api.java.JavaDStreamLike.foreachRDD")
+ ) ++ Seq(
+ // SPARK-11996 Make the executor thread dump work again
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.executor.ExecutorEndpoint"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.executor.ExecutorEndpoint$"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.storage.BlockManagerMessages$GetRpcHostPortForExecutor"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.storage.BlockManagerMessages$GetRpcHostPortForExecutor$")
)
case v if v.startsWith("1.5") =>
Seq(