aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/RpcCallContext.scala41
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala148
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala119
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala313
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala20
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala6
20 files changed, 394 insertions, 331 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index d65c94e410..16072283ed 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -106,7 +106,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
*/
protected def askTracker[T: ClassTag](message: Any): T = {
try {
- trackerEndpoint.askWithReply[T](message)
+ trackerEndpoint.askWithRetry[T](message)
} catch {
case e: Exception =>
logError("Error communicating with MapOutputTracker", e)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d0cf2a8dd0..5ae8fb81de 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -555,7 +555,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
SparkEnv.executorActorSystemName,
RpcAddress(host, port),
ExecutorEndpoint.EXECUTOR_ENDPOINT_NAME)
- Some(endpointRef.askWithReply[Array[ThreadStackTrace]](TriggerThreadDump))
+ Some(endpointRef.askWithRetry[Array[ThreadStackTrace]](TriggerThreadDump))
}
} catch {
case e: Exception =>
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 8af46f3327..79aed90b53 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -57,7 +57,7 @@ private[spark] class CoarseGrainedExecutorBackend(
logInfo("Connecting to driver: " + driverUrl)
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
driver = Some(ref)
- ref.sendWithReply[RegisteredExecutor.type](
+ ref.ask[RegisteredExecutor.type](
RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
} onComplete {
case Success(msg) => Utils.tryLogNonFatalError {
@@ -154,7 +154,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
executorConf,
new SecurityManager(executorConf))
val driver = fetcher.setupEndpointRefByURI(driverUrl)
- val props = driver.askWithReply[Seq[(String, String)]](RetrieveSparkProps) ++
+ val props = driver.askWithRetry[Seq[(String, String)]](RetrieveSparkProps) ++
Seq[(String, String)](("spark.app.id", appId))
fetcher.shutdown()
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index dd1c48e6cb..8f916e0502 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -441,7 +441,7 @@ private[spark] class Executor(
val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)
try {
- val response = heartbeatReceiverRef.askWithReply[HeartbeatResponse](message)
+ val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](message)
if (response.reregisterBlockManager) {
logWarning("Told to re-register on heartbeat")
env.blockManager.reregister()
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcCallContext.scala b/core/src/main/scala/org/apache/spark/rpc/RpcCallContext.scala
new file mode 100644
index 0000000000..3e5b64265e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcCallContext.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+/**
+ * A callback that [[RpcEndpoint]] can use it to send back a message or failure. It's thread-safe
+ * and can be called in any thread.
+ */
+private[spark] trait RpcCallContext {
+
+ /**
+ * Reply a message to the sender. If the sender is [[RpcEndpoint]], its [[RpcEndpoint.receive]]
+ * will be called.
+ */
+ def reply(response: Any): Unit
+
+ /**
+ * Report a failure to the sender.
+ */
+ def sendFailure(e: Throwable): Unit
+
+ /**
+ * The sender of this message.
+ */
+ def sender: RpcEndpointRef
+}
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala
new file mode 100644
index 0000000000..d2b2baef1d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import org.apache.spark.SparkException
+
+/**
+ * A factory class to create the [[RpcEnv]]. It must have an empty constructor so that it can be
+ * created using Reflection.
+ */
+private[spark] trait RpcEnvFactory {
+
+ def create(config: RpcEnvConfig): RpcEnv
+}
+
+/**
+ * A trait that requires RpcEnv thread-safely sending messages to it.
+ *
+ * Thread-safety means processing of one message happens before processing of the next message by
+ * the same [[ThreadSafeRpcEndpoint]]. In the other words, changes to internal fields of a
+ * [[ThreadSafeRpcEndpoint]] are visible when processing the next message, and fields in the
+ * [[ThreadSafeRpcEndpoint]] need not be volatile or equivalent.
+ *
+ * However, there is no guarantee that the same thread will be executing the same
+ * [[ThreadSafeRpcEndpoint]] for different messages.
+ */
+private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint
+
+
+/**
+ * An end point for the RPC that defines what functions to trigger given a message.
+ *
+ * It is guaranteed that `onStart`, `receive` and `onStop` will be called in sequence.
+ *
+ * The lift-cycle will be:
+ *
+ * constructor onStart receive* onStop
+ *
+ * Note: `receive` can be called concurrently. If you want `receive` is thread-safe, please use
+ * [[ThreadSafeRpcEndpoint]]
+ *
+ * If any error is thrown from one of [[RpcEndpoint]] methods except `onError`, `onError` will be
+ * invoked with the cause. If `onError` throws an error, [[RpcEnv]] will ignore it.
+ */
+private[spark] trait RpcEndpoint {
+
+ /**
+ * The [[RpcEnv]] that this [[RpcEndpoint]] is registered to.
+ */
+ val rpcEnv: RpcEnv
+
+ /**
+ * The [[RpcEndpointRef]] of this [[RpcEndpoint]]. `self` will become valid when `onStart` is
+ * called. And `self` will become `null` when `onStop` is called.
+ *
+ * Note: Because before `onStart`, [[RpcEndpoint]] has not yet been registered and there is not
+ * valid [[RpcEndpointRef]] for it. So don't call `self` before `onStart` is called.
+ */
+ final def self: RpcEndpointRef = {
+ require(rpcEnv != null, "rpcEnv has not been initialized")
+ rpcEnv.endpointRef(this)
+ }
+
+ /**
+ * Process messages from [[RpcEndpointRef.send]] or [[RpcCallContext.reply)]]. If receiving a
+ * unmatched message, [[SparkException]] will be thrown and sent to `onError`.
+ */
+ def receive: PartialFunction[Any, Unit] = {
+ case _ => throw new SparkException(self + " does not implement 'receive'")
+ }
+
+ /**
+ * Process messages from [[RpcEndpointRef.ask]]. If receiving a unmatched message,
+ * [[SparkException]] will be thrown and sent to `onError`.
+ */
+ def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+ case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
+ }
+
+ /**
+ * Invoked when any exception is thrown during handling messages.
+ */
+ def onError(cause: Throwable): Unit = {
+ // By default, throw e and let RpcEnv handle it
+ throw cause
+ }
+
+ /**
+ * Invoked before [[RpcEndpoint]] starts to handle any message.
+ */
+ def onStart(): Unit = {
+ // By default, do nothing.
+ }
+
+ /**
+ * Invoked when [[RpcEndpoint]] is stopping.
+ */
+ def onStop(): Unit = {
+ // By default, do nothing.
+ }
+
+ /**
+ * Invoked when `remoteAddress` is connected to the current node.
+ */
+ def onConnected(remoteAddress: RpcAddress): Unit = {
+ // By default, do nothing.
+ }
+
+ /**
+ * Invoked when `remoteAddress` is lost.
+ */
+ def onDisconnected(remoteAddress: RpcAddress): Unit = {
+ // By default, do nothing.
+ }
+
+ /**
+ * Invoked when some network error happens in the connection between the current node and
+ * `remoteAddress`.
+ */
+ def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = {
+ // By default, do nothing.
+ }
+
+ /**
+ * A convenient method to stop [[RpcEndpoint]].
+ */
+ final def stop(): Unit = {
+ val _self = self
+ if (_self != null) {
+ rpcEnv.stop(_self)
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
new file mode 100644
index 0000000000..69181edb9a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration.FiniteDuration
+import scala.reflect.ClassTag
+
+import org.apache.spark.util.RpcUtils
+import org.apache.spark.{SparkException, Logging, SparkConf}
+
+/**
+ * A reference for a remote [[RpcEndpoint]]. [[RpcEndpointRef]] is thread-safe.
+ */
+private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
+ extends Serializable with Logging {
+
+ private[this] val maxRetries = RpcUtils.numRetries(conf)
+ private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
+ private[this] val defaultAskTimeout = RpcUtils.askTimeout(conf)
+
+ /**
+ * return the address for the [[RpcEndpointRef]]
+ */
+ def address: RpcAddress
+
+ def name: String
+
+ /**
+ * Sends a one-way asynchronous message. Fire-and-forget semantics.
+ */
+ def send(message: Any): Unit
+
+ /**
+ * Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
+ * receive the reply within the specified timeout.
+ *
+ * This method only sends the message once and never retries.
+ */
+ def ask[T: ClassTag](message: Any, timeout: FiniteDuration): Future[T]
+
+ /**
+ * Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
+ * receive the reply within a default timeout.
+ *
+ * This method only sends the message once and never retries.
+ */
+ def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)
+
+ /**
+ * Send a message to the corresponding [[RpcEndpoint]] and get its result within a default
+ * timeout, or throw a SparkException if this fails even after the default number of retries.
+ * The default `timeout` will be used in every trial of calling `sendWithReply`. Because this
+ * method retries, the message handling in the receiver side should be idempotent.
+ *
+ * Note: this is a blocking action which may cost a lot of time, so don't call it in an message
+ * loop of [[RpcEndpoint]].
+ *
+ * @param message the message to send
+ * @tparam T type of the reply message
+ * @return the reply message from the corresponding [[RpcEndpoint]]
+ */
+ def askWithRetry[T: ClassTag](message: Any): T = askWithRetry(message, defaultAskTimeout)
+
+ /**
+ * Send a message to the corresponding [[RpcEndpoint.receive]] and get its result within a
+ * specified timeout, throw a SparkException if this fails even after the specified number of
+ * retries. `timeout` will be used in every trial of calling `sendWithReply`. Because this method
+ * retries, the message handling in the receiver side should be idempotent.
+ *
+ * Note: this is a blocking action which may cost a lot of time, so don't call it in an message
+ * loop of [[RpcEndpoint]].
+ *
+ * @param message the message to send
+ * @param timeout the timeout duration
+ * @tparam T type of the reply message
+ * @return the reply message from the corresponding [[RpcEndpoint]]
+ */
+ def askWithRetry[T: ClassTag](message: Any, timeout: FiniteDuration): T = {
+ // TODO: Consider removing multiple attempts
+ var attempts = 0
+ var lastException: Exception = null
+ while (attempts < maxRetries) {
+ attempts += 1
+ try {
+ val future = ask[T](message, timeout)
+ val result = Await.result(future, timeout)
+ if (result == null) {
+ throw new SparkException("Actor returned null")
+ }
+ return result
+ } catch {
+ case ie: InterruptedException => throw ie
+ case e: Exception =>
+ lastException = e
+ logWarning(s"Error sending message [message = $message] in $attempts attempts", e)
+ }
+ Thread.sleep(retryWaitMs)
+ }
+
+ throw new SparkException(
+ s"Error sending message [message = $message]", lastException)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
index a5336b7563..12b6b28d4d 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
@@ -20,13 +20,41 @@ package org.apache.spark.rpc
import java.net.URI
import scala.concurrent.{Await, Future}
-import scala.concurrent.duration._
import scala.language.postfixOps
-import scala.reflect.ClassTag
-import org.apache.spark.{Logging, SparkException, SecurityManager, SparkConf}
+import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.util.{RpcUtils, Utils}
+
+/**
+ * A RpcEnv implementation must have a [[RpcEnvFactory]] implementation with an empty constructor
+ * so that it can be created via Reflection.
+ */
+private[spark] object RpcEnv {
+
+ private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = {
+ // Add more RpcEnv implementations here
+ val rpcEnvNames = Map("akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory")
+ val rpcEnvName = conf.get("spark.rpc", "akka")
+ val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)
+ Class.forName(rpcEnvFactoryClassName, true, Utils.getContextOrSparkClassLoader).
+ newInstance().asInstanceOf[RpcEnvFactory]
+ }
+
+ def create(
+ name: String,
+ host: String,
+ port: Int,
+ conf: SparkConf,
+ securityManager: SecurityManager): RpcEnv = {
+ // Using Reflection to create the RpcEnv to avoid to depend on Akka directly
+ val config = RpcEnvConfig(conf, name, host, port, securityManager)
+ getRpcEnvFactory(conf).create(config)
+ }
+
+}
+
+
/**
* An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to
* receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote
@@ -112,6 +140,7 @@ private[spark] abstract class RpcEnv(conf: SparkConf) {
def uriOf(systemName: String, address: RpcAddress, endpointName: String): String
}
+
private[spark] case class RpcEnvConfig(
conf: SparkConf,
name: String,
@@ -119,261 +148,9 @@ private[spark] case class RpcEnvConfig(
port: Int,
securityManager: SecurityManager)
-/**
- * A RpcEnv implementation must have a [[RpcEnvFactory]] implementation with an empty constructor
- * so that it can be created via Reflection.
- */
-private[spark] object RpcEnv {
-
- private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = {
- // Add more RpcEnv implementations here
- val rpcEnvNames = Map("akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory")
- val rpcEnvName = conf.get("spark.rpc", "akka")
- val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)
- Class.forName(rpcEnvFactoryClassName, true, Utils.getContextOrSparkClassLoader).
- newInstance().asInstanceOf[RpcEnvFactory]
- }
-
- def create(
- name: String,
- host: String,
- port: Int,
- conf: SparkConf,
- securityManager: SecurityManager): RpcEnv = {
- // Using Reflection to create the RpcEnv to avoid to depend on Akka directly
- val config = RpcEnvConfig(conf, name, host, port, securityManager)
- getRpcEnvFactory(conf).create(config)
- }
-
-}
-
-/**
- * A factory class to create the [[RpcEnv]]. It must have an empty constructor so that it can be
- * created using Reflection.
- */
-private[spark] trait RpcEnvFactory {
-
- def create(config: RpcEnvConfig): RpcEnv
-}
/**
- * An end point for the RPC that defines what functions to trigger given a message.
- *
- * It is guaranteed that `onStart`, `receive` and `onStop` will be called in sequence.
- *
- * The lift-cycle will be:
- *
- * constructor onStart receive* onStop
- *
- * Note: `receive` can be called concurrently. If you want `receive` is thread-safe, please use
- * [[ThreadSafeRpcEndpoint]]
- *
- * If any error is thrown from one of [[RpcEndpoint]] methods except `onError`, `onError` will be
- * invoked with the cause. If `onError` throws an error, [[RpcEnv]] will ignore it.
- */
-private[spark] trait RpcEndpoint {
-
- /**
- * The [[RpcEnv]] that this [[RpcEndpoint]] is registered to.
- */
- val rpcEnv: RpcEnv
-
- /**
- * The [[RpcEndpointRef]] of this [[RpcEndpoint]]. `self` will become valid when `onStart` is
- * called. And `self` will become `null` when `onStop` is called.
- *
- * Note: Because before `onStart`, [[RpcEndpoint]] has not yet been registered and there is not
- * valid [[RpcEndpointRef]] for it. So don't call `self` before `onStart` is called.
- */
- final def self: RpcEndpointRef = {
- require(rpcEnv != null, "rpcEnv has not been initialized")
- rpcEnv.endpointRef(this)
- }
-
- /**
- * Process messages from [[RpcEndpointRef.send]] or [[RpcCallContext.reply)]]. If receiving a
- * unmatched message, [[SparkException]] will be thrown and sent to `onError`.
- */
- def receive: PartialFunction[Any, Unit] = {
- case _ => throw new SparkException(self + " does not implement 'receive'")
- }
-
- /**
- * Process messages from [[RpcEndpointRef.sendWithReply]]. If receiving a unmatched message,
- * [[SparkException]] will be thrown and sent to `onError`.
- */
- def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
- }
-
- /**
- * Call onError when any exception is thrown during handling messages.
- *
- * @param cause
- */
- def onError(cause: Throwable): Unit = {
- // By default, throw e and let RpcEnv handle it
- throw cause
- }
-
- /**
- * Invoked before [[RpcEndpoint]] starts to handle any message.
- */
- def onStart(): Unit = {
- // By default, do nothing.
- }
-
- /**
- * Invoked when [[RpcEndpoint]] is stopping.
- */
- def onStop(): Unit = {
- // By default, do nothing.
- }
-
- /**
- * Invoked when `remoteAddress` is connected to the current node.
- */
- def onConnected(remoteAddress: RpcAddress): Unit = {
- // By default, do nothing.
- }
-
- /**
- * Invoked when `remoteAddress` is lost.
- */
- def onDisconnected(remoteAddress: RpcAddress): Unit = {
- // By default, do nothing.
- }
-
- /**
- * Invoked when some network error happens in the connection between the current node and
- * `remoteAddress`.
- */
- def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = {
- // By default, do nothing.
- }
-
- /**
- * A convenient method to stop [[RpcEndpoint]].
- */
- final def stop(): Unit = {
- val _self = self
- if (_self != null) {
- rpcEnv.stop(_self)
- }
- }
-}
-
-/**
- * A trait that requires RpcEnv thread-safely sending messages to it.
- *
- * Thread-safety means processing of one message happens before processing of the next message by
- * the same [[ThreadSafeRpcEndpoint]]. In the other words, changes to internal fields of a
- * [[ThreadSafeRpcEndpoint]] are visible when processing the next message, and fields in the
- * [[ThreadSafeRpcEndpoint]] need not be volatile or equivalent.
- *
- * However, there is no guarantee that the same thread will be executing the same
- * [[ThreadSafeRpcEndpoint]] for different messages.
- */
-trait ThreadSafeRpcEndpoint extends RpcEndpoint
-
-/**
- * A reference for a remote [[RpcEndpoint]]. [[RpcEndpointRef]] is thread-safe.
- */
-private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
- extends Serializable with Logging {
-
- private[this] val maxRetries = RpcUtils.numRetries(conf)
- private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
- private[this] val defaultAskTimeout = RpcUtils.askTimeout(conf)
-
- /**
- * return the address for the [[RpcEndpointRef]]
- */
- def address: RpcAddress
-
- def name: String
-
- /**
- * Sends a one-way asynchronous message. Fire-and-forget semantics.
- */
- def send(message: Any): Unit
-
- /**
- * Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a `Future` to
- * receive the reply within a default timeout.
- *
- * This method only sends the message once and never retries.
- */
- def sendWithReply[T: ClassTag](message: Any): Future[T] =
- sendWithReply(message, defaultAskTimeout)
-
- /**
- * Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a `Future` to
- * receive the reply within the specified timeout.
- *
- * This method only sends the message once and never retries.
- */
- def sendWithReply[T: ClassTag](message: Any, timeout: FiniteDuration): Future[T]
-
- /**
- * Send a message to the corresponding [[RpcEndpoint]] and get its result within a default
- * timeout, or throw a SparkException if this fails even after the default number of retries.
- * The default `timeout` will be used in every trial of calling `sendWithReply`. Because this
- * method retries, the message handling in the receiver side should be idempotent.
- *
- * Note: this is a blocking action which may cost a lot of time, so don't call it in an message
- * loop of [[RpcEndpoint]].
- *
- * @param message the message to send
- * @tparam T type of the reply message
- * @return the reply message from the corresponding [[RpcEndpoint]]
- */
- def askWithReply[T: ClassTag](message: Any): T = askWithReply(message, defaultAskTimeout)
-
- /**
- * Send a message to the corresponding [[RpcEndpoint.receive]] and get its result within a
- * specified timeout, throw a SparkException if this fails even after the specified number of
- * retries. `timeout` will be used in every trial of calling `sendWithReply`. Because this method
- * retries, the message handling in the receiver side should be idempotent.
- *
- * Note: this is a blocking action which may cost a lot of time, so don't call it in an message
- * loop of [[RpcEndpoint]].
- *
- * @param message the message to send
- * @param timeout the timeout duration
- * @tparam T type of the reply message
- * @return the reply message from the corresponding [[RpcEndpoint]]
- */
- def askWithReply[T: ClassTag](message: Any, timeout: FiniteDuration): T = {
- // TODO: Consider removing multiple attempts
- var attempts = 0
- var lastException: Exception = null
- while (attempts < maxRetries) {
- attempts += 1
- try {
- val future = sendWithReply[T](message, timeout)
- val result = Await.result(future, timeout)
- if (result == null) {
- throw new SparkException("Actor returned null")
- }
- return result
- } catch {
- case ie: InterruptedException => throw ie
- case e: Exception =>
- lastException = e
- logWarning(s"Error sending message [message = $message] in $attempts attempts", e)
- }
- Thread.sleep(retryWaitMs)
- }
-
- throw new SparkException(
- s"Error sending message [message = $message]", lastException)
- }
-
-}
-
-/**
- * Represent a host with a port
+ * Represents a host and port.
*/
private[spark] case class RpcAddress(host: String, port: Int) {
// TODO do we need to add the type of RpcEnv in the address?
@@ -383,6 +160,7 @@ private[spark] case class RpcAddress(host: String, port: Int) {
override val toString: String = hostPort
}
+
private[spark] object RpcAddress {
/**
@@ -404,26 +182,3 @@ private[spark] object RpcAddress {
RpcAddress(host, port)
}
}
-
-/**
- * A callback that [[RpcEndpoint]] can use it to send back a message or failure. It's thread-safe
- * and can be called in any thread.
- */
-private[spark] trait RpcCallContext {
-
- /**
- * Reply a message to the sender. If the sender is [[RpcEndpoint]], its [[RpcEndpoint.receive]]
- * will be called.
- */
- def reply(response: Any): Unit
-
- /**
- * Report a failure to the sender.
- */
- def sendFailure(e: Throwable): Unit
-
- /**
- * The sender of this message.
- */
- def sender: RpcEndpointRef
-}
diff --git a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
index 652e52f2b2..ba0d468f11 100644
--- a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
@@ -293,7 +293,7 @@ private[akka] class AkkaRpcEndpointRef(
actorRef ! AkkaMessage(message, false)
}
- override def sendWithReply[T: ClassTag](message: Any, timeout: FiniteDuration): Future[T] = {
+ override def ask[T: ClassTag](message: Any, timeout: FiniteDuration): Future[T] = {
import scala.concurrent.ExecutionContext.Implicits.global
actorRef.ask(AkkaMessage(message, true))(timeout).flatMap {
case msg @ AkkaMessage(message, reply) =>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b7901c06a1..b511c306ca 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -167,7 +167,7 @@ class DAGScheduler(
taskMetrics: Array[(Long, Int, Int, TaskMetrics)], // (taskId, stageId, stateAttempt, metrics)
blockManagerId: BlockManagerId): Boolean = {
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics))
- blockManagerMaster.driverEndpoint.askWithReply[Boolean](
+ blockManagerMaster.driverEndpoint.askWithRetry[Boolean](
BlockManagerHeartbeat(blockManagerId), 600 seconds)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 7c184b1dcb..0b1d47cff3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -85,7 +85,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
val msg = AskPermissionToCommitOutput(stage, partition, attempt)
coordinatorRef match {
case Some(endpointRef) =>
- endpointRef.askWithReply[Boolean](msg)
+ endpointRef.askWithRetry[Boolean](msg)
case None =>
logError(
"canCommit called after coordinator was stopped (is SparkEnv shutdown in progress)?")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 9656fb7685..7352fa1fe9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -252,7 +252,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
try {
if (driverEndpoint != null) {
logInfo("Shutting down all executors")
- driverEndpoint.askWithReply[Boolean](StopExecutors)
+ driverEndpoint.askWithRetry[Boolean](StopExecutors)
}
} catch {
case e: Exception =>
@@ -264,7 +264,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
stopExecutors()
try {
if (driverEndpoint != null) {
- driverEndpoint.askWithReply[Boolean](StopDriver)
+ driverEndpoint.askWithRetry[Boolean](StopDriver)
}
} catch {
case e: Exception =>
@@ -287,7 +287,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Called by subclasses when notified of a lost worker
def removeExecutor(executorId: String, reason: String) {
try {
- driverEndpoint.askWithReply[Boolean](RemoveExecutor(executorId, reason))
+ driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason))
} catch {
case e: Exception =>
throw new SparkException("Error notifying standalone scheduler's driver endpoint", e)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index d987c7d563..2a3a5d925d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -53,14 +53,14 @@ private[spark] abstract class YarnSchedulerBackend(
* This includes executors already pending or running.
*/
override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
- yarnSchedulerEndpoint.askWithReply[Boolean](RequestExecutors(requestedTotal))
+ yarnSchedulerEndpoint.askWithRetry[Boolean](RequestExecutors(requestedTotal))
}
/**
* Request that the ApplicationMaster kill the specified executors.
*/
override def doKillExecutors(executorIds: Seq[String]): Boolean = {
- yarnSchedulerEndpoint.askWithReply[Boolean](KillExecutors(executorIds))
+ yarnSchedulerEndpoint.askWithRetry[Boolean](KillExecutors(executorIds))
}
override def sufficientResourcesRegistered(): Boolean = {
@@ -115,7 +115,7 @@ private[spark] abstract class YarnSchedulerBackend(
amEndpoint match {
case Some(am) =>
Future {
- context.reply(am.askWithReply[Boolean](r))
+ context.reply(am.askWithRetry[Boolean](r))
} onFailure {
case NonFatal(e) =>
logError(s"Sending $r to AM was unsuccessful", e)
@@ -130,7 +130,7 @@ private[spark] abstract class YarnSchedulerBackend(
amEndpoint match {
case Some(am) =>
Future {
- context.reply(am.askWithReply[Boolean](k))
+ context.reply(am.askWithRetry[Boolean](k))
} onFailure {
case NonFatal(e) =>
logError(s"Sending $k to AM was unsuccessful", e)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index ac5b524517..e64d06c4d3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -123,7 +123,7 @@ private[spark] class LocalBackend(
}
override def stop() {
- localEndpoint.sendWithReply(StopExecutor)
+ localEndpoint.ask(StopExecutor)
}
override def reviveOffers() {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index c798843bd5..9bfc4201d3 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -55,7 +55,7 @@ class BlockManagerMaster(
memSize: Long,
diskSize: Long,
tachyonSize: Long): Boolean = {
- val res = driverEndpoint.askWithReply[Boolean](
+ val res = driverEndpoint.askWithRetry[Boolean](
UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize))
logDebug(s"Updated info of block $blockId")
res
@@ -63,12 +63,12 @@ class BlockManagerMaster(
/** Get locations of the blockId from the driver */
def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
- driverEndpoint.askWithReply[Seq[BlockManagerId]](GetLocations(blockId))
+ driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetLocations(blockId))
}
/** Get locations of multiple blockIds from the driver */
def getLocations(blockIds: Array[BlockId]): Seq[Seq[BlockManagerId]] = {
- driverEndpoint.askWithReply[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds))
+ driverEndpoint.askWithRetry[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds))
}
/**
@@ -81,11 +81,11 @@ class BlockManagerMaster(
/** Get ids of other nodes in the cluster from the driver */
def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
- driverEndpoint.askWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId))
+ driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetPeers(blockManagerId))
}
def getRpcHostPortForExecutor(executorId: String): Option[(String, Int)] = {
- driverEndpoint.askWithReply[Option[(String, Int)]](GetRpcHostPortForExecutor(executorId))
+ driverEndpoint.askWithRetry[Option[(String, Int)]](GetRpcHostPortForExecutor(executorId))
}
/**
@@ -93,12 +93,12 @@ class BlockManagerMaster(
* blocks that the driver knows about.
*/
def removeBlock(blockId: BlockId) {
- driverEndpoint.askWithReply[Boolean](RemoveBlock(blockId))
+ driverEndpoint.askWithRetry[Boolean](RemoveBlock(blockId))
}
/** Remove all blocks belonging to the given RDD. */
def removeRdd(rddId: Int, blocking: Boolean) {
- val future = driverEndpoint.askWithReply[Future[Seq[Int]]](RemoveRdd(rddId))
+ val future = driverEndpoint.askWithRetry[Future[Seq[Int]]](RemoveRdd(rddId))
future.onFailure {
case e: Exception =>
logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}")
@@ -110,7 +110,7 @@ class BlockManagerMaster(
/** Remove all blocks belonging to the given shuffle. */
def removeShuffle(shuffleId: Int, blocking: Boolean) {
- val future = driverEndpoint.askWithReply[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
+ val future = driverEndpoint.askWithRetry[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
future.onFailure {
case e: Exception =>
logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}")
@@ -122,7 +122,7 @@ class BlockManagerMaster(
/** Remove all blocks belonging to the given broadcast. */
def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) {
- val future = driverEndpoint.askWithReply[Future[Seq[Int]]](
+ val future = driverEndpoint.askWithRetry[Future[Seq[Int]]](
RemoveBroadcast(broadcastId, removeFromMaster))
future.onFailure {
case e: Exception =>
@@ -141,11 +141,11 @@ class BlockManagerMaster(
* amount of remaining memory.
*/
def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
- driverEndpoint.askWithReply[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
+ driverEndpoint.askWithRetry[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
}
def getStorageStatus: Array[StorageStatus] = {
- driverEndpoint.askWithReply[Array[StorageStatus]](GetStorageStatus)
+ driverEndpoint.askWithRetry[Array[StorageStatus]](GetStorageStatus)
}
/**
@@ -166,7 +166,7 @@ class BlockManagerMaster(
* master endpoint for a response to a prior message.
*/
val response = driverEndpoint.
- askWithReply[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)
+ askWithRetry[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)
val (blockManagerIds, futures) = response.unzip
val result = Await.result(Future.sequence(futures), timeout)
if (result == null) {
@@ -190,7 +190,7 @@ class BlockManagerMaster(
filter: BlockId => Boolean,
askSlaves: Boolean): Seq[BlockId] = {
val msg = GetMatchingBlockIds(filter, askSlaves)
- val future = driverEndpoint.askWithReply[Future[Seq[BlockId]]](msg)
+ val future = driverEndpoint.askWithRetry[Future[Seq[BlockId]]](msg)
Await.result(future, timeout)
}
@@ -205,7 +205,7 @@ class BlockManagerMaster(
/** Send a one-way message to the master endpoint, to which we expect it to reply with true. */
private def tell(message: Any) {
- if (!driverEndpoint.askWithReply[Boolean](message)) {
+ if (!driverEndpoint.askWithRetry[Boolean](message)) {
throw new SparkException("BlockManagerMasterEndpoint returned false, expected true.")
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 4682167912..7212362df5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -132,7 +132,7 @@ class BlockManagerMasterEndpoint(
val removeMsg = RemoveRdd(rddId)
Future.sequence(
blockManagerInfo.values.map { bm =>
- bm.slaveEndpoint.sendWithReply[Int](removeMsg)
+ bm.slaveEndpoint.ask[Int](removeMsg)
}.toSeq
)
}
@@ -142,7 +142,7 @@ class BlockManagerMasterEndpoint(
val removeMsg = RemoveShuffle(shuffleId)
Future.sequence(
blockManagerInfo.values.map { bm =>
- bm.slaveEndpoint.sendWithReply[Boolean](removeMsg)
+ bm.slaveEndpoint.ask[Boolean](removeMsg)
}.toSeq
)
}
@@ -159,7 +159,7 @@ class BlockManagerMasterEndpoint(
}
Future.sequence(
requiredBlockManagers.map { bm =>
- bm.slaveEndpoint.sendWithReply[Int](removeMsg)
+ bm.slaveEndpoint.ask[Int](removeMsg)
}.toSeq
)
}
@@ -214,7 +214,7 @@ class BlockManagerMasterEndpoint(
// Remove the block from the slave's BlockManager.
// Doesn't actually wait for a confirmation and the message might get lost.
// If message loss becomes frequent, we should add retry logic here.
- blockManager.get.slaveEndpoint.sendWithReply[Boolean](RemoveBlock(blockId))
+ blockManager.get.slaveEndpoint.ask[Boolean](RemoveBlock(blockId))
}
}
}
@@ -253,7 +253,7 @@ class BlockManagerMasterEndpoint(
blockManagerInfo.values.map { info =>
val blockStatusFuture =
if (askSlaves) {
- info.slaveEndpoint.sendWithReply[Option[BlockStatus]](getBlockStatus)
+ info.slaveEndpoint.ask[Option[BlockStatus]](getBlockStatus)
} else {
Future { info.getStatus(blockId) }
}
@@ -277,7 +277,7 @@ class BlockManagerMasterEndpoint(
blockManagerInfo.values.map { info =>
val future =
if (askSlaves) {
- info.slaveEndpoint.sendWithReply[Seq[BlockId]](getMatchingBlockIds)
+ info.slaveEndpoint.ask[Seq[BlockId]](getMatchingBlockIds)
} else {
Future { info.blocks.keys.filter(filter).toSeq }
}
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 0fd570e529..b789912e9e 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -48,7 +48,7 @@ class HeartbeatReceiverSuite extends FunSuite with LocalSparkContext {
val metrics = new TaskMetrics
val blockManagerId = BlockManagerId("executor-1", "localhost", 12345)
- val response = receiverRef.askWithReply[HeartbeatResponse](
+ val response = receiverRef.askWithRetry[HeartbeatResponse](
Heartbeat("executor-1", Array(1L -> metrics), blockManagerId))
verify(scheduler).executorHeartbeatReceived(
@@ -71,7 +71,7 @@ class HeartbeatReceiverSuite extends FunSuite with LocalSparkContext {
val metrics = new TaskMetrics
val blockManagerId = BlockManagerId("executor-1", "localhost", 12345)
- val response = receiverRef.askWithReply[HeartbeatResponse](
+ val response = receiverRef.askWithRetry[HeartbeatResponse](
Heartbeat("executor-1", Array(1L -> metrics), blockManagerId))
verify(scheduler).executorHeartbeatReceived(
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 44c88b00c4..ae3339d80f 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -100,8 +100,8 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
}
val rpcEndpointRef = env.setupEndpoint("send-ref", endpoint)
- val newRpcEndpointRef = rpcEndpointRef.askWithReply[RpcEndpointRef]("Hello")
- val reply = newRpcEndpointRef.askWithReply[String]("Echo")
+ val newRpcEndpointRef = rpcEndpointRef.askWithRetry[RpcEndpointRef]("Hello")
+ val reply = newRpcEndpointRef.askWithRetry[String]("Echo")
assert("Echo" === reply)
}
@@ -115,7 +115,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
}
}
})
- val reply = rpcEndpointRef.askWithReply[String]("hello")
+ val reply = rpcEndpointRef.askWithRetry[String]("hello")
assert("hello" === reply)
}
@@ -134,7 +134,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-remotely")
try {
- val reply = rpcEndpointRef.askWithReply[String]("hello")
+ val reply = rpcEndpointRef.askWithRetry[String]("hello")
assert("hello" === reply)
} finally {
anotherEnv.shutdown()
@@ -162,7 +162,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-timeout")
try {
val e = intercept[Exception] {
- rpcEndpointRef.askWithReply[String]("hello", 1 millis)
+ rpcEndpointRef.askWithRetry[String]("hello", 1 millis)
}
assert(e.isInstanceOf[TimeoutException] || e.getCause.isInstanceOf[TimeoutException])
} finally {
@@ -399,7 +399,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
}
})
- val f = endpointRef.sendWithReply[String]("Hi")
+ val f = endpointRef.ask[String]("Hi")
val ack = Await.result(f, 5 seconds)
assert("ack" === ack)
@@ -419,7 +419,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "sendWithReply-remotely")
try {
- val f = rpcEndpointRef.sendWithReply[String]("hello")
+ val f = rpcEndpointRef.ask[String]("hello")
val ack = Await.result(f, 5 seconds)
assert("ack" === ack)
} finally {
@@ -437,7 +437,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
}
})
- val f = endpointRef.sendWithReply[String]("Hi")
+ val f = endpointRef.ask[String]("Hi")
val e = intercept[SparkException] {
Await.result(f, 5 seconds)
}
@@ -460,7 +460,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
val rpcEndpointRef = anotherEnv.setupEndpointRef(
"local", env.address, "sendWithReply-remotely-error")
try {
- val f = rpcEndpointRef.sendWithReply[String]("hello")
+ val f = rpcEndpointRef.ask[String]("hello")
val e = intercept[SparkException] {
Await.result(f, 5 seconds)
}
@@ -529,7 +529,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
val rpcEndpointRef = anotherEnv.setupEndpointRef(
"local", env.address, "sendWithReply-unserializable-error")
try {
- val f = rpcEndpointRef.sendWithReply[String]("hello")
+ val f = rpcEndpointRef.ask[String]("hello")
intercept[TimeoutException] {
Await.result(f, 1 seconds)
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 6957bc72e9..f5b410f41d 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -356,7 +356,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
master.removeExecutor(store.blockManagerId.executorId)
assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
- val reregister = !master.driverEndpoint.askWithReply[Boolean](
+ val reregister = !master.driverEndpoint.askWithRetry[Boolean](
BlockManagerHeartbeat(store.blockManagerId))
assert(reregister == true)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 89af40330b..f2379366f3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -146,7 +146,7 @@ private[streaming] class ReceiverSupervisorImpl(
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult)
- trackerEndpoint.askWithReply[Boolean](AddBlock(blockInfo))
+ trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
}
@@ -169,13 +169,13 @@ private[streaming] class ReceiverSupervisorImpl(
override protected def onReceiverStart() {
val msg = RegisterReceiver(
streamId, receiver.getClass.getSimpleName, Utils.localHostName(), endpoint)
- trackerEndpoint.askWithReply[Boolean](msg)
+ trackerEndpoint.askWithRetry[Boolean](msg)
}
override protected def onReceiverStop(message: String, error: Option[Throwable]) {
logInfo("Deregistering receiver " + streamId)
val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")
- trackerEndpoint.askWithReply[Boolean](DeregisterReceiver(streamId, message, errorString))
+ trackerEndpoint.askWithRetry[Boolean](DeregisterReceiver(streamId, message, errorString))
logInfo("Stopped receiver " + streamId)
}