diff options
author | Nishkam Ravi <nishkamravi@gmail.com> | 2016-01-26 21:14:39 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-01-26 21:14:39 -0800 |
commit | bae3c9a4eb0c320999e5dbafd62692c12823e07d (patch) | |
tree | 5bb289c92e28e551767efba7fc8b7a8150a6b87c | |
parent | 58f5d8c1da6feeb598aa5f74ffe1593d4839d11d (diff) | |
download | spark-bae3c9a4eb0c320999e5dbafd62692c12823e07d.tar.gz spark-bae3c9a4eb0c320999e5dbafd62692c12823e07d.tar.bz2 spark-bae3c9a4eb0c320999e5dbafd62692c12823e07d.zip |
[SPARK-12967][NETTY] Avoid NettyRpc error message during sparkContext shutdown
If there's an RPC issue while sparkContext is alive but stopped (which would happen only when executing SparkContext.stop), log a warning instead. This is a common occurrence.
vanzin
Author: Nishkam Ravi <nishkamravi@gmail.com>
Author: nishkamravi2 <nishkamravi@gmail.com>
Closes #10881 from nishkamravi2/master_netty.
4 files changed, 32 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnvStoppedException.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnvStoppedException.scala new file mode 100644 index 0000000000..c296cc23f1 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnvStoppedException.scala @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.rpc + +private[rpc] class RpcEnvStoppedException() + extends IllegalStateException("RpcEnv already stopped.") diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala index 19259e0e80..6ceff2c073 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala @@ -106,7 +106,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { val iter = endpoints.keySet().iterator() while (iter.hasNext) { val name = iter.next - postMessage(name, message, (e) => logWarning(s"Message $message dropped.", e)) + postMessage(name, message, (e) => logWarning(s"Message $message dropped. ${e.getMessage}")) } } @@ -156,7 +156,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { if (shouldCallOnStop) { // We don't need to call `onStop` in the `synchronized` block val error = if (stopped) { - new IllegalStateException("RpcEnv already stopped.") + new RpcEnvStoppedException() } else { new SparkException(s"Could not find $endpointName or it has been stopped.") } diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index 9ae74d9d7b..89eda857e6 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -182,7 +182,11 @@ private[netty] class NettyRpcEnv( val remoteAddr = message.receiver.address if (remoteAddr == address) { // Message to a local RPC endpoint. - dispatcher.postOneWayMessage(message) + try { + dispatcher.postOneWayMessage(message) + } catch { + case e: RpcEnvStoppedException => logWarning(e.getMessage) + } } else { // Message to a remote RPC endpoint. postToOutbox(message.receiver, OneWayOutboxMessage(serialize(message))) diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala index 2316ebe347..9fd64e8535 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala @@ -25,7 +25,7 @@ import scala.util.control.NonFatal import org.apache.spark.{Logging, SparkException} import org.apache.spark.network.client.{RpcResponseCallback, TransportClient} -import org.apache.spark.rpc.RpcAddress +import org.apache.spark.rpc.{RpcAddress, RpcEnvStoppedException} private[netty] sealed trait OutboxMessage { @@ -43,7 +43,10 @@ private[netty] case class OneWayOutboxMessage(content: ByteBuffer) extends Outbo } override def onFailure(e: Throwable): Unit = { - logWarning(s"Failed to send one-way RPC.", e) + e match { + case e1: RpcEnvStoppedException => logWarning(e1.getMessage) + case e1: Throwable => logWarning(s"Failed to send one-way RPC.", e1) + } } } |