aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala10
1 files changed, 5 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
index f1a8273f15..7bf44a6565 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
@@ -66,7 +66,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
}
val data = endpoints.get(name)
endpointRefs.put(data.endpoint, data.ref)
- receivers.put(data) // for the OnStart message
+ receivers.offer(data) // for the OnStart message
}
endpointRef
}
@@ -80,7 +80,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
val data = endpoints.remove(name)
if (data != null) {
data.inbox.stop()
- receivers.put(data) // for the OnStop message
+ receivers.offer(data) // for the OnStop message
}
// Don't clean `endpointRefs` here because it's possible that some messages are being processed
// now and they can use `getRpcEndpointRef`. So `endpointRefs` will be cleaned in Inbox via
@@ -163,7 +163,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
true
} else {
data.inbox.post(createMessageFn(data.ref))
- receivers.put(data)
+ receivers.offer(data)
false
}
}
@@ -183,7 +183,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
// Stop all endpoints. This will queue all endpoints for processing by the message loops.
endpoints.keySet().asScala.foreach(unregisterRpcEndpoint)
// Enqueue a message that tells the message loops to stop.
- receivers.put(PoisonPill)
+ receivers.offer(PoisonPill)
threadpool.shutdown()
}
@@ -218,7 +218,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
val data = receivers.take()
if (data == PoisonPill) {
// Put PoisonPill back so that other MessageLoops can see it.
- receivers.put(PoisonPill)
+ receivers.offer(PoisonPill)
return
}
data.inbox.process(Dispatcher.this)