aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorNishkam Ravi <nishkamravi@gmail.com>2016-01-26 21:14:39 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-01-26 21:14:39 -0800
commitbae3c9a4eb0c320999e5dbafd62692c12823e07d (patch)
tree5bb289c92e28e551767efba7fc8b7a8150a6b87c /core
parent58f5d8c1da6feeb598aa5f74ffe1593d4839d11d (diff)
downloadspark-bae3c9a4eb0c320999e5dbafd62692c12823e07d.tar.gz
spark-bae3c9a4eb0c320999e5dbafd62692c12823e07d.tar.bz2
spark-bae3c9a4eb0c320999e5dbafd62692c12823e07d.zip
[SPARK-12967][NETTY] Avoid NettyRpc error message during sparkContext shutdown
If there's an RPC issue while sparkContext is alive but stopped (which would happen only when executing SparkContext.stop), log a warning instead. This is a common occurrence. vanzin Author: Nishkam Ravi <nishkamravi@gmail.com> Author: nishkamravi2 <nishkamravi@gmail.com> Closes #10881 from nishkamravi2/master_netty.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/RpcEnvStoppedException.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala7
4 files changed, 32 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnvStoppedException.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnvStoppedException.scala
new file mode 100644
index 0000000000..c296cc23f1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnvStoppedException.scala
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.rpc
+
+private[rpc] class RpcEnvStoppedException()
+ extends IllegalStateException("RpcEnv already stopped.")
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
index 19259e0e80..6ceff2c073 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
@@ -106,7 +106,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
val iter = endpoints.keySet().iterator()
while (iter.hasNext) {
val name = iter.next
- postMessage(name, message, (e) => logWarning(s"Message $message dropped.", e))
+ postMessage(name, message, (e) => logWarning(s"Message $message dropped. ${e.getMessage}"))
}
}
@@ -156,7 +156,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
if (shouldCallOnStop) {
// We don't need to call `onStop` in the `synchronized` block
val error = if (stopped) {
- new IllegalStateException("RpcEnv already stopped.")
+ new RpcEnvStoppedException()
} else {
new SparkException(s"Could not find $endpointName or it has been stopped.")
}
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 9ae74d9d7b..89eda857e6 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -182,7 +182,11 @@ private[netty] class NettyRpcEnv(
val remoteAddr = message.receiver.address
if (remoteAddr == address) {
// Message to a local RPC endpoint.
- dispatcher.postOneWayMessage(message)
+ try {
+ dispatcher.postOneWayMessage(message)
+ } catch {
+ case e: RpcEnvStoppedException => logWarning(e.getMessage)
+ }
} else {
// Message to a remote RPC endpoint.
postToOutbox(message.receiver, OneWayOutboxMessage(serialize(message)))
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala
index 2316ebe347..9fd64e8535 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala
@@ -25,7 +25,7 @@ import scala.util.control.NonFatal
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
-import org.apache.spark.rpc.RpcAddress
+import org.apache.spark.rpc.{RpcAddress, RpcEnvStoppedException}
private[netty] sealed trait OutboxMessage {
@@ -43,7 +43,10 @@ private[netty] case class OneWayOutboxMessage(content: ByteBuffer) extends Outbo
}
override def onFailure(e: Throwable): Unit = {
- logWarning(s"Failed to send one-way RPC.", e)
+ e match {
+ case e1: RpcEnvStoppedException => logWarning(e1.getMessage)
+ case e1: Throwable => logWarning(s"Failed to send one-way RPC.", e1)
+ }
}
}