diff options
author | Dongjoon Hyun <dongjoon@apache.org> | 2016-06-13 10:30:17 -0700 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2016-06-13 10:30:17 -0700 |
commit | 938434dc78f35f77cdebd15dcce8d5e7871b396b (patch) | |
tree | 42279122f9d30e151c7b5052664e60d4377d4dd5 /core/src | |
parent | cd47e233749f42b016264569a214cbf67f45f436 (diff) | |
download | spark-938434dc78f35f77cdebd15dcce8d5e7871b396b.tar.gz spark-938434dc78f35f77cdebd15dcce8d5e7871b396b.tar.bz2 spark-938434dc78f35f77cdebd15dcce8d5e7871b396b.zip |
[SPARK-15913][CORE] Dispatcher.stopped should be enclosed by synchronized block.
## What changes were proposed in this pull request?
`Dispatcher.stopped` is guarded by `this`, but it is used without synchronization in `postMessage` function. This PR fixes this and also the exception message became more accurate.
## How was this patch tested?
Pass the existing Jenkins tests.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes #13634 from dongjoon-hyun/SPARK-15913.
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala | 21 |
1 files changed, 8 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala index 4f8fe018b4..d305de2e13 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala @@ -144,25 +144,20 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { endpointName: String, message: InboxMessage, callbackIfStopped: (Exception) => Unit): Unit = { - val shouldCallOnStop = synchronized { + val error = synchronized { val data = endpoints.get(endpointName) - if (stopped || data == null) { - true + if (stopped) { + Some(new RpcEnvStoppedException()) + } else if (data == null) { + Some(new SparkException(s"Could not find $endpointName.")) } else { data.inbox.post(message) receivers.offer(data) - false + None } } - if (shouldCallOnStop) { - // We don't need to call `onStop` in the `synchronized` block - val error = if (stopped) { - new RpcEnvStoppedException() - } else { - new SparkException(s"Could not find $endpointName or it has been stopped.") - } - callbackIfStopped(error) - } + // We don't need to call `onStop` in the `synchronized` block + error.foreach(callbackIfStopped) } def stop(): Unit = { |