aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorDongjoon Hyun <dongjoon@apache.org>2016-06-13 10:30:17 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-06-13 10:30:17 -0700
commit938434dc78f35f77cdebd15dcce8d5e7871b396b (patch)
tree42279122f9d30e151c7b5052664e60d4377d4dd5 /core/src
parentcd47e233749f42b016264569a214cbf67f45f436 (diff)
downloadspark-938434dc78f35f77cdebd15dcce8d5e7871b396b.tar.gz
spark-938434dc78f35f77cdebd15dcce8d5e7871b396b.tar.bz2
spark-938434dc78f35f77cdebd15dcce8d5e7871b396b.zip
[SPARK-15913][CORE] Dispatcher.stopped should be enclosed by synchronized block.
## What changes were proposed in this pull request? `Dispatcher.stopped` is guarded by `this`, but it is used without synchronization in `postMessage` function. This PR fixes this and also the exception message became more accurate. ## How was this patch tested? Pass the existing Jenkins tests. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #13634 from dongjoon-hyun/SPARK-15913.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala21
1 files changed, 8 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
index 4f8fe018b4..d305de2e13 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
@@ -144,25 +144,20 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
endpointName: String,
message: InboxMessage,
callbackIfStopped: (Exception) => Unit): Unit = {
- val shouldCallOnStop = synchronized {
+ val error = synchronized {
val data = endpoints.get(endpointName)
- if (stopped || data == null) {
- true
+ if (stopped) {
+ Some(new RpcEnvStoppedException())
+ } else if (data == null) {
+ Some(new SparkException(s"Could not find $endpointName."))
} else {
data.inbox.post(message)
receivers.offer(data)
- false
+ None
}
}
- if (shouldCallOnStop) {
- // We don't need to call `onStop` in the `synchronized` block
- val error = if (stopped) {
- new RpcEnvStoppedException()
- } else {
- new SparkException(s"Could not find $endpointName or it has been stopped.")
- }
- callbackIfStopped(error)
- }
+ // We don't need to call `onStop` in the `synchronized` block
+ error.foreach(callbackIfStopped)
}
def stop(): Unit = {