aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorjinxing <jinxing@meituan.com>2017-02-19 04:34:07 -0800
committerSean Owen <sowen@cloudera.com>2017-02-19 04:34:07 -0800
commitba8912e5f3d5c5a366cb3d1f6be91f2471d048d2 (patch)
tree0f8db3d54422c584fb03a492283792fff364e39b /streaming
parentdf3cbe3a330f359fbaf7011d7ba9904649d3100d (diff)
downloadspark-ba8912e5f3d5c5a366cb3d1f6be91f2471d048d2.tar.gz
spark-ba8912e5f3d5c5a366cb3d1f6be91f2471d048d2.tar.bz2
spark-ba8912e5f3d5c5a366cb3d1f6be91f2471d048d2.zip
[SPARK-19450] Replace askWithRetry with askSync.
## What changes were proposed in this pull request? `askSync` is already added in `RpcEndpointRef` (see SPARK-19347 and https://github.com/apache/spark/pull/16690#issuecomment-276850068) and `askWithRetry` is marked as deprecated. As mentioned SPARK-18113(https://github.com/apache/spark/pull/16503#event-927953218): >askWithRetry is basically an unneeded API, and a leftover from the akka days that doesn't make sense anymore. It's prone to cause deadlocks (exactly because it's blocking), it imposes restrictions on the caller (e.g. idempotency) and other things that people generally don't pay that much attention to when using it. Since `askWithRetry` is just used inside spark and not in user logic. It might make sense to replace all of them with `askSync`. ## How was this patch tested? This PR doesn't change code logic, existing unit test can cover. Author: jinxing <jinxing@meituan.com> Closes #16790 from jinxing64/SPARK-19450.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala6
2 files changed, 5 insertions, 5 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 722024b8a6..f5c8a88f42 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -188,13 +188,13 @@ private[streaming] class ReceiverSupervisorImpl(
override protected def onReceiverStart(): Boolean = {
val msg = RegisterReceiver(
streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
- trackerEndpoint.askWithRetry[Boolean](msg)
+ trackerEndpoint.askSync[Boolean](msg)
}
override protected def onReceiverStop(message: String, error: Option[Throwable]) {
logInfo("Deregistering receiver " + streamId)
val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")
- trackerEndpoint.askWithRetry[Boolean](DeregisterReceiver(streamId, message, errorString))
+ trackerEndpoint.askSync[Boolean](DeregisterReceiver(streamId, message, errorString))
logInfo("Stopped receiver " + streamId)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 8f55d982a9..bd7ab0b9bf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -170,7 +170,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
trackerState = Stopping
if (!skipReceiverLaunch) {
// Send the stop signal to all the receivers
- endpoint.askWithRetry[Boolean](StopAllReceivers)
+ endpoint.askSync[Boolean](StopAllReceivers)
// Wait for the Spark job that runs the receivers to be over
// That is, for the receivers to quit gracefully.
@@ -183,7 +183,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}
// Check if all the receivers have been deregistered or not
- val receivers = endpoint.askWithRetry[Seq[Int]](AllReceiverIds)
+ val receivers = endpoint.askSync[Seq[Int]](AllReceiverIds)
if (receivers.nonEmpty) {
logWarning("Not all of the receivers have deregistered, " + receivers)
} else {
@@ -249,7 +249,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
*/
def allocatedExecutors(): Map[Int, Option[String]] = synchronized {
if (isTrackerStarted) {
- endpoint.askWithRetry[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues {
+ endpoint.askSync[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues {
_.runningExecutor.map {
_.executorId
}