aboutsummaryrefslogtreecommitdiff
path: root/common
diff options
context:
space:
mode:
authorMichael Allman <michael@videoamp.com>2016-08-25 11:57:38 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-08-25 11:57:38 -0700
commitf2093107196b9af62908ecf15bac043f3b1e64c4 (patch)
tree8e92e92cf3ae5376c2dfb19853d386fefef1b03c /common
parentd2ae6399ee2f0524b88262735adbbcb2035de8fd (diff)
downloadspark-f2093107196b9af62908ecf15bac043f3b1e64c4.tar.gz
spark-f2093107196b9af62908ecf15bac043f3b1e64c4.tar.bz2
spark-f2093107196b9af62908ecf15bac043f3b1e64c4.zip
[SPARK-17231][CORE] Avoid building debug or trace log messages unless the respective log level is enabled
(This PR addresses https://issues.apache.org/jira/browse/SPARK-17231) ## What changes were proposed in this pull request? While debugging the performance of a large GraphX connected components computation, we found several places in the `network-common` and `network-shuffle` code bases where trace or debug log messages are constructed even if the respective log level is disabled. According to YourKit, these constructions were creating substantial churn in the eden region. Refactoring the respective code to avoid these unnecessary constructions except where necessary led to a modest but measurable reduction in our job's task time, GC time and the ratio thereof. ## How was this patch tested? We computed the connected components of a graph with about 2.6 billion vertices and 1.7 billion edges four times. We used four different EC2 clusters each with 8 r3.8xl worker nodes. Two test runs used Spark master. Two used Spark master + this PR. The results from the first test run, master and master+PR: ![master](https://cloud.githubusercontent.com/assets/833693/17951634/7471cbca-6a18-11e6-9c26-78afe9319685.jpg) ![logging_perf_improvements](https://cloud.githubusercontent.com/assets/833693/17951632/7467844e-6a18-11e6-9a0e-053dc7650413.jpg) The results from the second test run, master and master+PR: ![master 2](https://cloud.githubusercontent.com/assets/833693/17951633/746dd6aa-6a18-11e6-8e27-606680b3f105.jpg) ![logging_perf_improvements 2](https://cloud.githubusercontent.com/assets/833693/17951631/74488710-6a18-11e6-8a32-08692f373386.jpg) Though modest, I believe these results are significant. Author: Michael Allman <michael@videoamp.com> Closes #14798 from mallman/spark-17231-logging_perf_improvements.
Diffstat (limited to 'common')
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java39
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java2
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java15
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/protocol/MessageDecoder.java2
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java6
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java18
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java2
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java14
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java2
9 files changed, 55 insertions, 45 deletions
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
index 64a83171e9..a67683b892 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
@@ -43,7 +43,7 @@ import org.apache.spark.network.protocol.OneWayMessage;
import org.apache.spark.network.protocol.RpcRequest;
import org.apache.spark.network.protocol.StreamChunkId;
import org.apache.spark.network.protocol.StreamRequest;
-import org.apache.spark.network.util.NettyUtils;
+import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
/**
* Client for fetching consecutive chunks of a pre-negotiated stream. This API is intended to allow
@@ -135,9 +135,10 @@ public class TransportClient implements Closeable {
long streamId,
final int chunkIndex,
final ChunkReceivedCallback callback) {
- final String serverAddr = NettyUtils.getRemoteAddress(channel);
final long startTime = System.currentTimeMillis();
- logger.debug("Sending fetch chunk request {} to {}", chunkIndex, serverAddr);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel));
+ }
final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
handler.addFetchRequest(streamChunkId, callback);
@@ -148,11 +149,13 @@ public class TransportClient implements Closeable {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
- logger.trace("Sending request {} to {} took {} ms", streamChunkId, serverAddr,
- timeTaken);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Sending request {} to {} took {} ms", streamChunkId, getRemoteAddress(channel),
+ timeTaken);
+ }
} else {
String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
- serverAddr, future.cause());
+ getRemoteAddress(channel), future.cause());
logger.error(errorMsg, future.cause());
handler.removeFetchRequest(streamChunkId);
channel.close();
@@ -173,9 +176,10 @@ public class TransportClient implements Closeable {
* @param callback Object to call with the stream data.
*/
public void stream(final String streamId, final StreamCallback callback) {
- final String serverAddr = NettyUtils.getRemoteAddress(channel);
final long startTime = System.currentTimeMillis();
- logger.debug("Sending stream request for {} to {}", streamId, serverAddr);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Sending stream request for {} to {}", streamId, getRemoteAddress(channel));
+ }
// Need to synchronize here so that the callback is added to the queue and the RPC is
// written to the socket atomically, so that callbacks are called in the right order
@@ -188,11 +192,13 @@ public class TransportClient implements Closeable {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
- logger.trace("Sending request for {} to {} took {} ms", streamId, serverAddr,
- timeTaken);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Sending request for {} to {} took {} ms", streamId, getRemoteAddress(channel),
+ timeTaken);
+ }
} else {
String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,
- serverAddr, future.cause());
+ getRemoteAddress(channel), future.cause());
logger.error(errorMsg, future.cause());
channel.close();
try {
@@ -215,9 +221,10 @@ public class TransportClient implements Closeable {
* @return The RPC's id.
*/
public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
- final String serverAddr = NettyUtils.getRemoteAddress(channel);
final long startTime = System.currentTimeMillis();
- logger.trace("Sending RPC to {}", serverAddr);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Sending RPC to {}", getRemoteAddress(channel));
+ }
final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
handler.addRpcRequest(requestId, callback);
@@ -228,10 +235,12 @@ public class TransportClient implements Closeable {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
- logger.trace("Sending request {} to {} took {} ms", requestId, serverAddr, timeTaken);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Sending request {} to {} took {} ms", requestId, getRemoteAddress(channel), timeTaken);
+ }
} else {
String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
- serverAddr, future.cause());
+ getRemoteAddress(channel), future.cause());
logger.error(errorMsg, future.cause());
handler.removeRpcRequest(requestId);
channel.close();
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index a27aaf2b27..1c9916baee 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -195,7 +195,7 @@ public class TransportClientFactory implements Closeable {
/** Create a completely new {@link TransportClient} to the remote address. */
private TransportClient createClient(InetSocketAddress address) throws IOException {
- logger.debug("Creating new connection to " + address);
+ logger.debug("Creating new connection to {}", address);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
index 8a69223c88..179667296e 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
@@ -38,7 +38,7 @@ import org.apache.spark.network.protocol.StreamChunkId;
import org.apache.spark.network.protocol.StreamFailure;
import org.apache.spark.network.protocol.StreamResponse;
import org.apache.spark.network.server.MessageHandler;
-import org.apache.spark.network.util.NettyUtils;
+import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
import org.apache.spark.network.util.TransportFrameDecoder;
/**
@@ -122,7 +122,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
@Override
public void channelInactive() {
if (numOutstandingRequests() > 0) {
- String remoteAddress = NettyUtils.getRemoteAddress(channel);
+ String remoteAddress = getRemoteAddress(channel);
logger.error("Still have {} requests outstanding when connection from {} is closed",
numOutstandingRequests(), remoteAddress);
failOutstandingRequests(new IOException("Connection from " + remoteAddress + " closed"));
@@ -132,7 +132,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
@Override
public void exceptionCaught(Throwable cause) {
if (numOutstandingRequests() > 0) {
- String remoteAddress = NettyUtils.getRemoteAddress(channel);
+ String remoteAddress = getRemoteAddress(channel);
logger.error("Still have {} requests outstanding when connection from {} is closed",
numOutstandingRequests(), remoteAddress);
failOutstandingRequests(cause);
@@ -141,13 +141,12 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
@Override
public void handle(ResponseMessage message) throws Exception {
- String remoteAddress = NettyUtils.getRemoteAddress(channel);
if (message instanceof ChunkFetchSuccess) {
ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
if (listener == null) {
logger.warn("Ignoring response for block {} from {} since it is not outstanding",
- resp.streamChunkId, remoteAddress);
+ resp.streamChunkId, getRemoteAddress(channel));
resp.body().release();
} else {
outstandingFetches.remove(resp.streamChunkId);
@@ -159,7 +158,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
if (listener == null) {
logger.warn("Ignoring response for block {} from {} ({}) since it is not outstanding",
- resp.streamChunkId, remoteAddress, resp.errorString);
+ resp.streamChunkId, getRemoteAddress(channel), resp.errorString);
} else {
outstandingFetches.remove(resp.streamChunkId);
listener.onFailure(resp.streamChunkId.chunkIndex, new ChunkFetchFailureException(
@@ -170,7 +169,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
if (listener == null) {
logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",
- resp.requestId, remoteAddress, resp.body().size());
+ resp.requestId, getRemoteAddress(channel), resp.body().size());
} else {
outstandingRpcs.remove(resp.requestId);
try {
@@ -184,7 +183,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
if (listener == null) {
logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding",
- resp.requestId, remoteAddress, resp.errorString);
+ resp.requestId, getRemoteAddress(channel), resp.errorString);
} else {
outstandingRpcs.remove(resp.requestId);
listener.onFailure(new RuntimeException(resp.errorString));
diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageDecoder.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageDecoder.java
index 074780f2b9..f045318618 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageDecoder.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageDecoder.java
@@ -39,7 +39,7 @@ public final class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
Message.Type msgType = Message.Type.decode(in);
Message decoded = decode(msgType, in);
assert decoded.type() == msgType;
- logger.trace("Received message " + msgType + ": " + decoded);
+ logger.trace("Received message {}: {}", msgType, decoded);
out.add(decoded);
}
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
index f2223379a9..884ea7d115 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
@@ -29,7 +29,7 @@ import org.apache.spark.network.client.TransportResponseHandler;
import org.apache.spark.network.protocol.Message;
import org.apache.spark.network.protocol.RequestMessage;
import org.apache.spark.network.protocol.ResponseMessage;
-import org.apache.spark.network.util.NettyUtils;
+import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
/**
* The single Transport-level Channel handler which is used for delegating requests to the
@@ -76,7 +76,7 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler<Message
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- logger.warn("Exception in connection from " + NettyUtils.getRemoteAddress(ctx.channel()),
+ logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()),
cause);
requestHandler.exceptionCaught(cause);
responseHandler.exceptionCaught(cause);
@@ -139,7 +139,7 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler<Message
System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs;
if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) {
if (responseHandler.numOutstandingRequests() > 0) {
- String address = NettyUtils.getRemoteAddress(ctx.channel());
+ String address = getRemoteAddress(ctx.channel());
logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
"requests. Assuming connection is dead; please adjust spark.network.timeout if " +
"this is wrong.", address, requestTimeoutNs / 1000 / 1000);
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index bebe88ec5d..e67a034cb8 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -17,6 +17,7 @@
package org.apache.spark.network.server;
+import java.net.SocketAddress;
import java.nio.ByteBuffer;
import com.google.common.base.Throwables;
@@ -42,7 +43,7 @@ import org.apache.spark.network.protocol.RpcResponse;
import org.apache.spark.network.protocol.StreamFailure;
import org.apache.spark.network.protocol.StreamRequest;
import org.apache.spark.network.protocol.StreamResponse;
-import org.apache.spark.network.util.NettyUtils;
+import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
/**
* A handler that processes requests from clients and writes chunk data back. Each handler is
@@ -114,9 +115,9 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
}
private void processFetchRequest(final ChunkFetchRequest req) {
- final String client = NettyUtils.getRemoteAddress(channel);
-
- logger.trace("Received req from {} to fetch block {}", client, req.streamChunkId);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), req.streamChunkId);
+ }
ManagedBuffer buf;
try {
@@ -125,7 +126,7 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex);
} catch (Exception e) {
logger.error(String.format(
- "Error opening block %s for request from %s", req.streamChunkId, client), e);
+ "Error opening block %s for request from %s", req.streamChunkId, getRemoteAddress(channel)), e);
respond(new ChunkFetchFailure(req.streamChunkId, Throwables.getStackTraceAsString(e)));
return;
}
@@ -134,13 +135,12 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
}
private void processStreamRequest(final StreamRequest req) {
- final String client = NettyUtils.getRemoteAddress(channel);
ManagedBuffer buf;
try {
buf = streamManager.openStream(req.streamId);
} catch (Exception e) {
logger.error(String.format(
- "Error opening stream %s for request from %s", req.streamId, client), e);
+ "Error opening stream %s for request from %s", req.streamId, getRemoteAddress(channel)), e);
respond(new StreamFailure(req.streamId, Throwables.getStackTraceAsString(e)));
return;
}
@@ -189,13 +189,13 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
* it will be logged and the channel closed.
*/
private void respond(final Encodable result) {
- final String remoteAddress = channel.remoteAddress().toString();
+ final SocketAddress remoteAddress = channel.remoteAddress();
channel.writeAndFlush(result).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
- logger.trace(String.format("Sent result %s to client %s", result, remoteAddress));
+ logger.trace("Sent result {} to client {}", result, remoteAddress);
} else {
logger.error(String.format("Error sending result %s to %s; closing connection",
result, remoteAddress), future.cause());
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
index baae235e02..a67db4f69f 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -130,7 +130,7 @@ public class TransportServer implements Closeable {
channelFuture.syncUninterruptibly();
port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
- logger.debug("Shuffle server started on port :" + port);
+ logger.debug("Shuffle server started on port: {}", port);
}
@Override
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index 1270cef621..d05d0ac4d2 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -42,7 +42,7 @@ import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId;
import org.apache.spark.network.shuffle.protocol.*;
-import org.apache.spark.network.util.NettyUtils;
+import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
import org.apache.spark.network.util.TransportConf;
@@ -101,11 +101,13 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
blocks.add(block);
}
long streamId = streamManager.registerStream(client.getClientId(), blocks.iterator());
- logger.trace("Registered streamId {} with {} buffers for client {} from host {}",
- streamId,
- msg.blockIds.length,
- client.getClientId(),
- NettyUtils.getRemoteAddress(client.getChannel()));
+ if (logger.isTraceEnabled()) {
+ logger.trace("Registered streamId {} with {} buffers for client {} from host {}",
+ streamId,
+ msg.blockIds.length,
+ client.getClientId(),
+ getRemoteAddress(client.getChannel()));
+ }
callback.onSuccess(new StreamHandle(streamId, msg.blockIds.length).toByteBuffer());
metrics.blockTransferRateBytes.mark(totalBlockSize);
} finally {
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index 56cf1e2e3e..d436711692 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -267,7 +267,7 @@ public class ExternalShuffleBlockResolver {
for (String localDir : dirs) {
try {
JavaUtils.deleteRecursively(new File(localDir));
- logger.debug("Successfully cleaned up directory: " + localDir);
+ logger.debug("Successfully cleaned up directory: {}", localDir);
} catch (Exception e) {
logger.error("Failed to delete directory: " + localDir, e);
}