aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java39
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java2
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java15
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/protocol/MessageDecoder.java2
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java6
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java18
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java2
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java14
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java2
9 files changed, 55 insertions, 45 deletions
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
index 64a83171e9..a67683b892 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
@@ -43,7 +43,7 @@ import org.apache.spark.network.protocol.OneWayMessage;
import org.apache.spark.network.protocol.RpcRequest;
import org.apache.spark.network.protocol.StreamChunkId;
import org.apache.spark.network.protocol.StreamRequest;
-import org.apache.spark.network.util.NettyUtils;
+import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
/**
* Client for fetching consecutive chunks of a pre-negotiated stream. This API is intended to allow
@@ -135,9 +135,10 @@ public class TransportClient implements Closeable {
long streamId,
final int chunkIndex,
final ChunkReceivedCallback callback) {
- final String serverAddr = NettyUtils.getRemoteAddress(channel);
final long startTime = System.currentTimeMillis();
- logger.debug("Sending fetch chunk request {} to {}", chunkIndex, serverAddr);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel));
+ }
final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
handler.addFetchRequest(streamChunkId, callback);
@@ -148,11 +149,13 @@ public class TransportClient implements Closeable {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
- logger.trace("Sending request {} to {} took {} ms", streamChunkId, serverAddr,
- timeTaken);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Sending request {} to {} took {} ms", streamChunkId, getRemoteAddress(channel),
+ timeTaken);
+ }
} else {
String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
- serverAddr, future.cause());
+ getRemoteAddress(channel), future.cause());
logger.error(errorMsg, future.cause());
handler.removeFetchRequest(streamChunkId);
channel.close();
@@ -173,9 +176,10 @@ public class TransportClient implements Closeable {
* @param callback Object to call with the stream data.
*/
public void stream(final String streamId, final StreamCallback callback) {
- final String serverAddr = NettyUtils.getRemoteAddress(channel);
final long startTime = System.currentTimeMillis();
- logger.debug("Sending stream request for {} to {}", streamId, serverAddr);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Sending stream request for {} to {}", streamId, getRemoteAddress(channel));
+ }
// Need to synchronize here so that the callback is added to the queue and the RPC is
// written to the socket atomically, so that callbacks are called in the right order
@@ -188,11 +192,13 @@ public class TransportClient implements Closeable {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
- logger.trace("Sending request for {} to {} took {} ms", streamId, serverAddr,
- timeTaken);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Sending request for {} to {} took {} ms", streamId, getRemoteAddress(channel),
+ timeTaken);
+ }
} else {
String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,
- serverAddr, future.cause());
+ getRemoteAddress(channel), future.cause());
logger.error(errorMsg, future.cause());
channel.close();
try {
@@ -215,9 +221,10 @@ public class TransportClient implements Closeable {
* @return The RPC's id.
*/
public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
- final String serverAddr = NettyUtils.getRemoteAddress(channel);
final long startTime = System.currentTimeMillis();
- logger.trace("Sending RPC to {}", serverAddr);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Sending RPC to {}", getRemoteAddress(channel));
+ }
final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
handler.addRpcRequest(requestId, callback);
@@ -228,10 +235,12 @@ public class TransportClient implements Closeable {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
- logger.trace("Sending request {} to {} took {} ms", requestId, serverAddr, timeTaken);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Sending request {} to {} took {} ms", requestId, getRemoteAddress(channel), timeTaken);
+ }
} else {
String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
- serverAddr, future.cause());
+ getRemoteAddress(channel), future.cause());
logger.error(errorMsg, future.cause());
handler.removeRpcRequest(requestId);
channel.close();
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index a27aaf2b27..1c9916baee 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -195,7 +195,7 @@ public class TransportClientFactory implements Closeable {
/** Create a completely new {@link TransportClient} to the remote address. */
private TransportClient createClient(InetSocketAddress address) throws IOException {
- logger.debug("Creating new connection to " + address);
+ logger.debug("Creating new connection to {}", address);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
index 8a69223c88..179667296e 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
@@ -38,7 +38,7 @@ import org.apache.spark.network.protocol.StreamChunkId;
import org.apache.spark.network.protocol.StreamFailure;
import org.apache.spark.network.protocol.StreamResponse;
import org.apache.spark.network.server.MessageHandler;
-import org.apache.spark.network.util.NettyUtils;
+import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
import org.apache.spark.network.util.TransportFrameDecoder;
/**
@@ -122,7 +122,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
@Override
public void channelInactive() {
if (numOutstandingRequests() > 0) {
- String remoteAddress = NettyUtils.getRemoteAddress(channel);
+ String remoteAddress = getRemoteAddress(channel);
logger.error("Still have {} requests outstanding when connection from {} is closed",
numOutstandingRequests(), remoteAddress);
failOutstandingRequests(new IOException("Connection from " + remoteAddress + " closed"));
@@ -132,7 +132,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
@Override
public void exceptionCaught(Throwable cause) {
if (numOutstandingRequests() > 0) {
- String remoteAddress = NettyUtils.getRemoteAddress(channel);
+ String remoteAddress = getRemoteAddress(channel);
logger.error("Still have {} requests outstanding when connection from {} is closed",
numOutstandingRequests(), remoteAddress);
failOutstandingRequests(cause);
@@ -141,13 +141,12 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
@Override
public void handle(ResponseMessage message) throws Exception {
- String remoteAddress = NettyUtils.getRemoteAddress(channel);
if (message instanceof ChunkFetchSuccess) {
ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
if (listener == null) {
logger.warn("Ignoring response for block {} from {} since it is not outstanding",
- resp.streamChunkId, remoteAddress);
+ resp.streamChunkId, getRemoteAddress(channel));
resp.body().release();
} else {
outstandingFetches.remove(resp.streamChunkId);
@@ -159,7 +158,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
if (listener == null) {
logger.warn("Ignoring response for block {} from {} ({}) since it is not outstanding",
- resp.streamChunkId, remoteAddress, resp.errorString);
+ resp.streamChunkId, getRemoteAddress(channel), resp.errorString);
} else {
outstandingFetches.remove(resp.streamChunkId);
listener.onFailure(resp.streamChunkId.chunkIndex, new ChunkFetchFailureException(
@@ -170,7 +169,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
if (listener == null) {
logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",
- resp.requestId, remoteAddress, resp.body().size());
+ resp.requestId, getRemoteAddress(channel), resp.body().size());
} else {
outstandingRpcs.remove(resp.requestId);
try {
@@ -184,7 +183,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
if (listener == null) {
logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding",
- resp.requestId, remoteAddress, resp.errorString);
+ resp.requestId, getRemoteAddress(channel), resp.errorString);
} else {
outstandingRpcs.remove(resp.requestId);
listener.onFailure(new RuntimeException(resp.errorString));
diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageDecoder.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageDecoder.java
index 074780f2b9..f045318618 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageDecoder.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageDecoder.java
@@ -39,7 +39,7 @@ public final class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
Message.Type msgType = Message.Type.decode(in);
Message decoded = decode(msgType, in);
assert decoded.type() == msgType;
- logger.trace("Received message " + msgType + ": " + decoded);
+ logger.trace("Received message {}: {}", msgType, decoded);
out.add(decoded);
}
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
index f2223379a9..884ea7d115 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
@@ -29,7 +29,7 @@ import org.apache.spark.network.client.TransportResponseHandler;
import org.apache.spark.network.protocol.Message;
import org.apache.spark.network.protocol.RequestMessage;
import org.apache.spark.network.protocol.ResponseMessage;
-import org.apache.spark.network.util.NettyUtils;
+import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
/**
* The single Transport-level Channel handler which is used for delegating requests to the
@@ -76,7 +76,7 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler<Message
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- logger.warn("Exception in connection from " + NettyUtils.getRemoteAddress(ctx.channel()),
+ logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()),
cause);
requestHandler.exceptionCaught(cause);
responseHandler.exceptionCaught(cause);
@@ -139,7 +139,7 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler<Message
System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs;
if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) {
if (responseHandler.numOutstandingRequests() > 0) {
- String address = NettyUtils.getRemoteAddress(ctx.channel());
+ String address = getRemoteAddress(ctx.channel());
logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
"requests. Assuming connection is dead; please adjust spark.network.timeout if " +
"this is wrong.", address, requestTimeoutNs / 1000 / 1000);
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index bebe88ec5d..e67a034cb8 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -17,6 +17,7 @@
package org.apache.spark.network.server;
+import java.net.SocketAddress;
import java.nio.ByteBuffer;
import com.google.common.base.Throwables;
@@ -42,7 +43,7 @@ import org.apache.spark.network.protocol.RpcResponse;
import org.apache.spark.network.protocol.StreamFailure;
import org.apache.spark.network.protocol.StreamRequest;
import org.apache.spark.network.protocol.StreamResponse;
-import org.apache.spark.network.util.NettyUtils;
+import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
/**
* A handler that processes requests from clients and writes chunk data back. Each handler is
@@ -114,9 +115,9 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
}
private void processFetchRequest(final ChunkFetchRequest req) {
- final String client = NettyUtils.getRemoteAddress(channel);
-
- logger.trace("Received req from {} to fetch block {}", client, req.streamChunkId);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), req.streamChunkId);
+ }
ManagedBuffer buf;
try {
@@ -125,7 +126,7 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex);
} catch (Exception e) {
logger.error(String.format(
- "Error opening block %s for request from %s", req.streamChunkId, client), e);
+ "Error opening block %s for request from %s", req.streamChunkId, getRemoteAddress(channel)), e);
respond(new ChunkFetchFailure(req.streamChunkId, Throwables.getStackTraceAsString(e)));
return;
}
@@ -134,13 +135,12 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
}
private void processStreamRequest(final StreamRequest req) {
- final String client = NettyUtils.getRemoteAddress(channel);
ManagedBuffer buf;
try {
buf = streamManager.openStream(req.streamId);
} catch (Exception e) {
logger.error(String.format(
- "Error opening stream %s for request from %s", req.streamId, client), e);
+ "Error opening stream %s for request from %s", req.streamId, getRemoteAddress(channel)), e);
respond(new StreamFailure(req.streamId, Throwables.getStackTraceAsString(e)));
return;
}
@@ -189,13 +189,13 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
* it will be logged and the channel closed.
*/
private void respond(final Encodable result) {
- final String remoteAddress = channel.remoteAddress().toString();
+ final SocketAddress remoteAddress = channel.remoteAddress();
channel.writeAndFlush(result).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
- logger.trace(String.format("Sent result %s to client %s", result, remoteAddress));
+ logger.trace("Sent result {} to client {}", result, remoteAddress);
} else {
logger.error(String.format("Error sending result %s to %s; closing connection",
result, remoteAddress), future.cause());
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
index baae235e02..a67db4f69f 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -130,7 +130,7 @@ public class TransportServer implements Closeable {
channelFuture.syncUninterruptibly();
port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
- logger.debug("Shuffle server started on port :" + port);
+ logger.debug("Shuffle server started on port: {}", port);
}
@Override
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index 1270cef621..d05d0ac4d2 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -42,7 +42,7 @@ import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId;
import org.apache.spark.network.shuffle.protocol.*;
-import org.apache.spark.network.util.NettyUtils;
+import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
import org.apache.spark.network.util.TransportConf;
@@ -101,11 +101,13 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
blocks.add(block);
}
long streamId = streamManager.registerStream(client.getClientId(), blocks.iterator());
- logger.trace("Registered streamId {} with {} buffers for client {} from host {}",
- streamId,
- msg.blockIds.length,
- client.getClientId(),
- NettyUtils.getRemoteAddress(client.getChannel()));
+ if (logger.isTraceEnabled()) {
+ logger.trace("Registered streamId {} with {} buffers for client {} from host {}",
+ streamId,
+ msg.blockIds.length,
+ client.getClientId(),
+ getRemoteAddress(client.getChannel()));
+ }
callback.onSuccess(new StreamHandle(streamId, msg.blockIds.length).toByteBuffer());
metrics.blockTransferRateBytes.mark(totalBlockSize);
} finally {
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index 56cf1e2e3e..d436711692 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -267,7 +267,7 @@ public class ExternalShuffleBlockResolver {
for (String localDir : dirs) {
try {
JavaUtils.deleteRecursively(new File(localDir));
- logger.debug("Successfully cleaned up directory: " + localDir);
+ logger.debug("Successfully cleaned up directory: {}", localDir);
} catch (Exception e) {
logger.error("Failed to delete directory: " + localDir, e);
}