aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org
diff options
context:
space:
mode:
authorjinxing <jinxing@meituan.com>2017-02-19 04:34:07 -0800
committerSean Owen <sowen@cloudera.com>2017-02-19 04:34:07 -0800
commitba8912e5f3d5c5a366cb3d1f6be91f2471d048d2 (patch)
tree0f8db3d54422c584fb03a492283792fff364e39b /sql/core/src/main/scala/org
parentdf3cbe3a330f359fbaf7011d7ba9904649d3100d (diff)
downloadspark-ba8912e5f3d5c5a366cb3d1f6be91f2471d048d2.tar.gz
spark-ba8912e5f3d5c5a366cb3d1f6be91f2471d048d2.tar.bz2
spark-ba8912e5f3d5c5a366cb3d1f6be91f2471d048d2.zip
[SPARK-19450] Replace askWithRetry with askSync.
## What changes were proposed in this pull request? `askSync` is already added in `RpcEndpointRef` (see SPARK-19347 and https://github.com/apache/spark/pull/16690#issuecomment-276850068) and `askWithRetry` is marked as deprecated. As mentioned SPARK-18113(https://github.com/apache/spark/pull/16503#event-927953218): >askWithRetry is basically an unneeded API, and a leftover from the akka days that doesn't make sense anymore. It's prone to cause deadlocks (exactly because it's blocking), it imposes restrictions on the caller (e.g. idempotency) and other things that people generally don't pay that much attention to when using it. Since `askWithRetry` is just used inside spark and not in user logic. It might make sense to replace all of them with `askSync`. ## How was this patch tested? This PR doesn't change code logic, existing unit test can cover. Author: jinxing <jinxing@meituan.com> Closes #16790 from jinxing64/SPARK-19450.
Diffstat (limited to 'sql/core/src/main/scala/org')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala8
1 files changed, 4 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
index 267d17623d..d0f81887e6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
@@ -88,21 +88,21 @@ class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointRef) {
/** Verify whether the given executor has the active instance of a state store */
private[state] def verifyIfInstanceActive(storeId: StateStoreId, executorId: String): Boolean = {
- rpcEndpointRef.askWithRetry[Boolean](VerifyIfInstanceActive(storeId, executorId))
+ rpcEndpointRef.askSync[Boolean](VerifyIfInstanceActive(storeId, executorId))
}
/** Get the location of the state store */
private[state] def getLocation(storeId: StateStoreId): Option[String] = {
- rpcEndpointRef.askWithRetry[Option[String]](GetLocation(storeId))
+ rpcEndpointRef.askSync[Option[String]](GetLocation(storeId))
}
/** Deactivate instances related to a set of operator */
private[state] def deactivateInstances(storeRootLocation: String): Unit = {
- rpcEndpointRef.askWithRetry[Boolean](DeactivateInstances(storeRootLocation))
+ rpcEndpointRef.askSync[Boolean](DeactivateInstances(storeRootLocation))
}
private[state] def stop(): Unit = {
- rpcEndpointRef.askWithRetry[Boolean](StopCoordinator)
+ rpcEndpointRef.askSync[Boolean](StopCoordinator)
}
}