aboutsummaryrefslogtreecommitdiff
path: root/network/shuffle/src
diff options
context:
space:
mode:
authorTimothy Chen <tnachen@gmail.com>2015-08-03 01:55:58 -0700
committerAndrew Or <andrew@databricks.com>2015-08-03 01:55:58 -0700
commit95dccc63350c45045f038bab9f8a5080b4e1f8cc (patch)
treec6ac9b266234a3d349ce4ada3a80a17adc7744fc /network/shuffle/src
parent1ebd41b141a95ec264bd2dd50f0fe24cd459035d (diff)
downloadspark-95dccc63350c45045f038bab9f8a5080b4e1f8cc.tar.gz
spark-95dccc63350c45045f038bab9f8a5080b4e1f8cc.tar.bz2
spark-95dccc63350c45045f038bab9f8a5080b4e1f8cc.zip
[SPARK-8873] [MESOS] Clean up shuffle files if external shuffle service is used
This patch builds directly on #7820, which is largely written by tnachen. The only addition is one commit for cleaning up the code. There should be no functional differences between this and #7820. Author: Timothy Chen <tnachen@gmail.com> Author: Andrew Or <andrew@databricks.com> Closes #7881 from andrewor14/tim-cleanup-mesos-shuffle and squashes the following commits: 8894f7d [Andrew Or] Clean up code 2a5fa10 [Andrew Or] Merge branch 'mesos_shuffle_clean' of github.com:tnachen/spark into tim-cleanup-mesos-shuffle fadff89 [Timothy Chen] Address comments. e4d0f1d [Timothy Chen] Clean up external shuffle data on driver exit with Mesos.
Diffstat (limited to 'network/shuffle/src')
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java6
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java12
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java72
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java4
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java60
5 files changed, 149 insertions, 5 deletions
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index e4faaf8854..db9dc4f17c 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -65,7 +65,13 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
@Override
public void receive(TransportClient client, byte[] message, RpcResponseCallback callback) {
BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteArray(message);
+ handleMessage(msgObj, client, callback);
+ }
+ protected void handleMessage(
+ BlockTransferMessage msgObj,
+ TransportClient client,
+ RpcResponseCallback callback) {
if (msgObj instanceof OpenBlocks) {
OpenBlocks msg = (OpenBlocks) msgObj;
List<ManagedBuffer> blocks = Lists.newArrayList();
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index 612bce571a..ea6d248d66 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -50,8 +50,8 @@ public class ExternalShuffleClient extends ShuffleClient {
private final boolean saslEncryptionEnabled;
private final SecretKeyHolder secretKeyHolder;
- private TransportClientFactory clientFactory;
- private String appId;
+ protected TransportClientFactory clientFactory;
+ protected String appId;
/**
* Creates an external shuffle client, with SASL optionally enabled. If SASL is not enabled,
@@ -71,6 +71,10 @@ public class ExternalShuffleClient extends ShuffleClient {
this.saslEncryptionEnabled = saslEncryptionEnabled;
}
+ protected void checkInit() {
+ assert appId != null : "Called before init()";
+ }
+
@Override
public void init(String appId) {
this.appId = appId;
@@ -89,7 +93,7 @@ public class ExternalShuffleClient extends ShuffleClient {
final String execId,
String[] blockIds,
BlockFetchingListener listener) {
- assert appId != null : "Called before init()";
+ checkInit();
logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
try {
RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
@@ -132,7 +136,7 @@ public class ExternalShuffleClient extends ShuffleClient {
int port,
String execId,
ExecutorShuffleInfo executorInfo) throws IOException {
- assert appId != null : "Called before init()";
+ checkInit();
TransportClient client = clientFactory.createClient(host, port);
byte[] registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteArray();
client.sendRpcSync(registerMessage, 5000 /* timeoutMs */);
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
new file mode 100644
index 0000000000..7543b6be4f
--- /dev/null
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.mesos;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.sasl.SecretKeyHolder;
+import org.apache.spark.network.shuffle.ExternalShuffleClient;
+import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
+import org.apache.spark.network.util.TransportConf;
+
+/**
+ * A client for talking to the external shuffle service in Mesos coarse-grained mode.
+ *
+ * This is used by the Spark driver to register with each external shuffle service on the cluster.
+ * The reason why the driver has to talk to the service is for cleaning up shuffle files reliably
+ * after the application exits. Mesos does not provide a great alternative to do this, so Spark
+ * has to detect this itself.
+ */
+public class MesosExternalShuffleClient extends ExternalShuffleClient {
+ private final Logger logger = LoggerFactory.getLogger(MesosExternalShuffleClient.class);
+
+ /**
+ * Creates an Mesos external shuffle client that wraps the {@link ExternalShuffleClient}.
+ * Please refer to docs on {@link ExternalShuffleClient} for more information.
+ */
+ public MesosExternalShuffleClient(
+ TransportConf conf,
+ SecretKeyHolder secretKeyHolder,
+ boolean saslEnabled,
+ boolean saslEncryptionEnabled) {
+ super(conf, secretKeyHolder, saslEnabled, saslEncryptionEnabled);
+ }
+
+ public void registerDriverWithShuffleService(String host, int port) throws IOException {
+ checkInit();
+ byte[] registerDriver = new RegisterDriver(appId).toByteArray();
+ TransportClient client = clientFactory.createClient(host, port);
+ client.sendRpc(registerDriver, new RpcResponseCallback() {
+ @Override
+ public void onSuccess(byte[] response) {
+ logger.info("Successfully registered app " + appId + " with external shuffle service.");
+ }
+
+ @Override
+ public void onFailure(Throwable e) {
+ logger.warn("Unable to register app " + appId + " with external shuffle service. " +
+ "Please manually remove shuffle data after driver exit. Error: " + e);
+ }
+ });
+ }
+}
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
index 6c1210b332..fcb52363e6 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
@@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.spark.network.protocol.Encodable;
+import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
/**
* Messages handled by the {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler}, or
@@ -37,7 +38,7 @@ public abstract class BlockTransferMessage implements Encodable {
/** Preceding every serialized message is its type, which allows us to deserialize it. */
public static enum Type {
- OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3);
+ OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4);
private final byte id;
@@ -60,6 +61,7 @@ public abstract class BlockTransferMessage implements Encodable {
case 1: return UploadBlock.decode(buf);
case 2: return RegisterExecutor.decode(buf);
case 3: return StreamHandle.decode(buf);
+ case 4: return RegisterDriver.decode(buf);
default: throw new IllegalArgumentException("Unknown message type: " + type);
}
}
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java
new file mode 100644
index 0000000000..1c28fc1dff
--- /dev/null
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol.mesos;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.protocol.Encoders;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
+
+/**
+ * A message sent from the driver to register with the MesosExternalShuffleService.
+ */
+public class RegisterDriver extends BlockTransferMessage {
+ private final String appId;
+
+ public RegisterDriver(String appId) {
+ this.appId = appId;
+ }
+
+ public String getAppId() { return appId; }
+
+ @Override
+ protected Type type() { return Type.REGISTER_DRIVER; }
+
+ @Override
+ public int encodedLength() {
+ return Encoders.Strings.encodedLength(appId);
+ }
+
+ @Override
+ public void encode(ByteBuf buf) {
+ Encoders.Strings.encode(buf, appId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(appId);
+ }
+
+ public static RegisterDriver decode(ByteBuf buf) {
+ String appId = Encoders.Strings.decode(buf);
+ return new RegisterDriver(appId);
+ }
+}