aboutsummaryrefslogtreecommitdiff
path: root/common/network-shuffle
diff options
context:
space:
mode:
authorYangyang Liu <yangyangliu@fb.com>2016-07-12 10:13:58 -0700
committerReynold Xin <rxin@databricks.com>2016-07-12 10:13:58 -0700
commit68df47aca55e99406b7b67ef3d4b1008abf1b8b6 (patch)
tree3f55af3bf21e5079e4b33e1f7e0e523e1b2e1fc5 /common/network-shuffle
parentd513c99c19e229f72d03006e251725a43c13fefd (diff)
downloadspark-68df47aca55e99406b7b67ef3d4b1008abf1b8b6.tar.gz
spark-68df47aca55e99406b7b67ef3d4b1008abf1b8b6.tar.bz2
spark-68df47aca55e99406b7b67ef3d4b1008abf1b8b6.zip
[SPARK-16405] Add metrics and source for external shuffle service
## What changes were proposed in this pull request? Since externalShuffleService is essential for spark, better monitoring for shuffle service is necessary. In order to do so, we added various metrics in shuffle service and imported into ExternalShuffleServiceSource for metric system. Metrics added in shuffle service: * registeredExecutorsSize * openBlockRequestLatencyMillis * registerExecutorRequestLatencyMillis * blockTransferRateBytes JIRA Issue: https://issues.apache.org/jira/browse/SPARK-16405 ## How was this patch tested? Some test cases are added to verify metrics as expected in metric system. Those unit test cases are shown in `ExternalShuffleBlockHandlerSuite ` Author: Yangyang Liu <yangyangliu@fb.com> Closes #14080 from lovexi/yangyang-metrics.
Diffstat (limited to 'common/network-shuffle')
-rw-r--r--common/network-shuffle/pom.xml5
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java92
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java4
-rw-r--r--common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java21
4 files changed, 105 insertions, 17 deletions
diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml
index 3c4f32933e..51c06b9e5a 100644
--- a/common/network-shuffle/pom.xml
+++ b/common/network-shuffle/pom.xml
@@ -55,6 +55,11 @@
</dependency>
<dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index 22fd592a32..1cc0fb65d7 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -20,8 +20,15 @@ package org.apache.spark.network.shuffle;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
+import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
@@ -52,6 +59,7 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
@VisibleForTesting
final ExternalShuffleBlockResolver blockManager;
private final OneForOneStreamManager streamManager;
+ private final ShuffleMetrics metrics;
public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile)
throws IOException {
@@ -64,6 +72,7 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
public ExternalShuffleBlockHandler(
OneForOneStreamManager streamManager,
ExternalShuffleBlockResolver blockManager) {
+ this.metrics = new ShuffleMetrics();
this.streamManager = streamManager;
this.blockManager = blockManager;
}
@@ -79,32 +88,50 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
TransportClient client,
RpcResponseCallback callback) {
if (msgObj instanceof OpenBlocks) {
- OpenBlocks msg = (OpenBlocks) msgObj;
- checkAuth(client, msg.appId);
-
- List<ManagedBuffer> blocks = Lists.newArrayList();
- for (String blockId : msg.blockIds) {
- blocks.add(blockManager.getBlockData(msg.appId, msg.execId, blockId));
+ final Timer.Context responseDelayContext = metrics.openBlockRequestLatencyMillis.time();
+ try {
+ OpenBlocks msg = (OpenBlocks) msgObj;
+ checkAuth(client, msg.appId);
+
+ List<ManagedBuffer> blocks = Lists.newArrayList();
+ long totalBlockSize = 0;
+ for (String blockId : msg.blockIds) {
+ final ManagedBuffer block = blockManager.getBlockData(msg.appId, msg.execId, blockId);
+ totalBlockSize += block != null ? block.size() : 0;
+ blocks.add(block);
+ }
+ long streamId = streamManager.registerStream(client.getClientId(), blocks.iterator());
+ logger.trace("Registered streamId {} with {} buffers for client {} from host {}",
+ streamId,
+ msg.blockIds.length,
+ client.getClientId(),
+ NettyUtils.getRemoteAddress(client.getChannel()));
+ callback.onSuccess(new StreamHandle(streamId, msg.blockIds.length).toByteBuffer());
+ metrics.blockTransferRateBytes.mark(totalBlockSize);
+ } finally {
+ responseDelayContext.stop();
}
- long streamId = streamManager.registerStream(client.getClientId(), blocks.iterator());
- logger.trace("Registered streamId {} with {} buffers for client {} from host {}",
- streamId,
- msg.blockIds.length,
- client.getClientId(),
- NettyUtils.getRemoteAddress(client.getChannel()));
- callback.onSuccess(new StreamHandle(streamId, msg.blockIds.length).toByteBuffer());
} else if (msgObj instanceof RegisterExecutor) {
- RegisterExecutor msg = (RegisterExecutor) msgObj;
- checkAuth(client, msg.appId);
- blockManager.registerExecutor(msg.appId, msg.execId, msg.executorInfo);
- callback.onSuccess(ByteBuffer.wrap(new byte[0]));
+ final Timer.Context responseDelayContext = metrics.registerExecutorRequestLatencyMillis.time();
+ try {
+ RegisterExecutor msg = (RegisterExecutor) msgObj;
+ checkAuth(client, msg.appId);
+ blockManager.registerExecutor(msg.appId, msg.execId, msg.executorInfo);
+ callback.onSuccess(ByteBuffer.wrap(new byte[0]));
+ } finally {
+ responseDelayContext.stop();
+ }
} else {
throw new UnsupportedOperationException("Unexpected message: " + msgObj);
}
}
+ public MetricSet getAllMetrics() {
+ return metrics;
+ }
+
@Override
public StreamManager getStreamManager() {
return streamManager;
@@ -143,4 +170,35 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
}
}
+ /**
+ * A simple class to wrap all shuffle service wrapper metrics
+ */
+ private class ShuffleMetrics implements MetricSet {
+ private final Map<String, Metric> allMetrics;
+ // Time latency for open block request in ms
+ private final Timer openBlockRequestLatencyMillis = new Timer();
+ // Time latency for executor registration latency in ms
+ private final Timer registerExecutorRequestLatencyMillis = new Timer();
+ // Block transfer rate in byte per second
+ private final Meter blockTransferRateBytes = new Meter();
+
+ private ShuffleMetrics() {
+ allMetrics = new HashMap<>();
+ allMetrics.put("openBlockRequestLatencyMillis", openBlockRequestLatencyMillis);
+ allMetrics.put("registerExecutorRequestLatencyMillis", registerExecutorRequestLatencyMillis);
+ allMetrics.put("blockTransferRateBytes", blockTransferRateBytes);
+ allMetrics.put("registeredExecutorsSize", new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return blockManager.getRegisteredExecutorsSize();
+ }
+ });
+ }
+
+ @Override
+ public Map<String, Metric> getMetrics() {
+ return allMetrics;
+ }
+ }
+
}
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index 54e870a9b5..7eefccaaed 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -146,6 +146,10 @@ public class ExternalShuffleBlockResolver {
this.directoryCleaner = directoryCleaner;
}
+ public int getRegisteredExecutorsSize() {
+ return executors.size();
+ }
+
/** Registers a new Executor with all the configuration we need to find its shuffle files. */
public void registerExecutor(
String appId,
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
index c2e0b7447f..c036bc2e8d 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
@@ -20,6 +20,8 @@ package org.apache.spark.network.shuffle;
import java.nio.ByteBuffer;
import java.util.Iterator;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -66,6 +68,12 @@ public class ExternalShuffleBlockHandlerSuite {
verify(callback, times(1)).onSuccess(any(ByteBuffer.class));
verify(callback, never()).onFailure(any(Throwable.class));
+ // Verify register executor request latency metrics
+ Timer registerExecutorRequestLatencyMillis = (Timer) ((ExternalShuffleBlockHandler) handler)
+ .getAllMetrics()
+ .getMetrics()
+ .get("registerExecutorRequestLatencyMillis");
+ assertEquals(1, registerExecutorRequestLatencyMillis.getCount());
}
@SuppressWarnings("unchecked")
@@ -99,6 +107,19 @@ public class ExternalShuffleBlockHandlerSuite {
assertEquals(block0Marker, buffers.next());
assertEquals(block1Marker, buffers.next());
assertFalse(buffers.hasNext());
+
+ // Verify open block request latency metrics
+ Timer openBlockRequestLatencyMillis = (Timer) ((ExternalShuffleBlockHandler) handler)
+ .getAllMetrics()
+ .getMetrics()
+ .get("openBlockRequestLatencyMillis");
+ assertEquals(1, openBlockRequestLatencyMillis.getCount());
+ // Verify block transfer metrics
+ Meter blockTransferRateBytes = (Meter) ((ExternalShuffleBlockHandler) handler)
+ .getAllMetrics()
+ .getMetrics()
+ .get("blockTransferRateBytes");
+ assertEquals(10, blockTransferRateBytes.getCount());
}
@Test