diff options
author | jinxing <jinxing@meituan.com> | 2017-02-19 04:34:07 -0800 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2017-02-19 04:34:07 -0800 |
commit | ba8912e5f3d5c5a366cb3d1f6be91f2471d048d2 (patch) | |
tree | 0f8db3d54422c584fb03a492283792fff364e39b /streaming | |
parent | df3cbe3a330f359fbaf7011d7ba9904649d3100d (diff) | |
download | spark-ba8912e5f3d5c5a366cb3d1f6be91f2471d048d2.tar.gz spark-ba8912e5f3d5c5a366cb3d1f6be91f2471d048d2.tar.bz2 spark-ba8912e5f3d5c5a366cb3d1f6be91f2471d048d2.zip |
[SPARK-19450] Replace askWithRetry with askSync.
## What changes were proposed in this pull request?
`askSync` is already added in `RpcEndpointRef` (see SPARK-19347 and https://github.com/apache/spark/pull/16690#issuecomment-276850068) and `askWithRetry` is marked as deprecated.
As mentioned SPARK-18113(https://github.com/apache/spark/pull/16503#event-927953218):
>askWithRetry is basically an unneeded API, and a leftover from the akka days that doesn't make sense anymore. It's prone to cause deadlocks (exactly because it's blocking), it imposes restrictions on the caller (e.g. idempotency) and other things that people generally don't pay that much attention to when using it.
Since `askWithRetry` is just used inside spark and not in user logic. It might make sense to replace all of them with `askSync`.
## How was this patch tested?
This PR doesn't change code logic, existing unit test can cover.
Author: jinxing <jinxing@meituan.com>
Closes #16790 from jinxing64/SPARK-19450.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala | 4 | ||||
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala | 6 |
2 files changed, 5 insertions, 5 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 722024b8a6..f5c8a88f42 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -188,13 +188,13 @@ private[streaming] class ReceiverSupervisorImpl( override protected def onReceiverStart(): Boolean = { val msg = RegisterReceiver( streamId, receiver.getClass.getSimpleName, host, executorId, endpoint) - trackerEndpoint.askWithRetry[Boolean](msg) + trackerEndpoint.askSync[Boolean](msg) } override protected def onReceiverStop(message: String, error: Option[Throwable]) { logInfo("Deregistering receiver " + streamId) val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("") - trackerEndpoint.askWithRetry[Boolean](DeregisterReceiver(streamId, message, errorString)) + trackerEndpoint.askSync[Boolean](DeregisterReceiver(streamId, message, errorString)) logInfo("Stopped receiver " + streamId) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 8f55d982a9..bd7ab0b9bf 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -170,7 +170,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false trackerState = Stopping if (!skipReceiverLaunch) { // Send the stop signal to all the receivers - endpoint.askWithRetry[Boolean](StopAllReceivers) + endpoint.askSync[Boolean](StopAllReceivers) // Wait for the Spark job that runs the receivers to be over // That is, for the receivers to quit gracefully. @@ -183,7 +183,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false } // Check if all the receivers have been deregistered or not - val receivers = endpoint.askWithRetry[Seq[Int]](AllReceiverIds) + val receivers = endpoint.askSync[Seq[Int]](AllReceiverIds) if (receivers.nonEmpty) { logWarning("Not all of the receivers have deregistered, " + receivers) } else { @@ -249,7 +249,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false */ def allocatedExecutors(): Map[Int, Option[String]] = synchronized { if (isTrackerStarted) { - endpoint.askWithRetry[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues { + endpoint.askSync[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues { _.runningExecutor.map { _.executorId } |