aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala107
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala52
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala5
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java3
-rw-r--r--network/common/src/main/java/org/apache/spark/network/client/TransportClient.java5
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java6
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java12
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java72
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java4
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java60
-rwxr-xr-xsbin/start-mesos-shuffle-service.sh35
-rwxr-xr-xsbin/stop-mesos-shuffle-service.sh25
15 files changed, 394 insertions, 17 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index a1c66ef4fc..6f336a7c29 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2658,7 +2658,7 @@ object SparkContext extends Logging {
val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false)
val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
val backend = if (coarseGrained) {
- new CoarseMesosSchedulerBackend(scheduler, sc, url)
+ new CoarseMesosSchedulerBackend(scheduler, sc, url, sc.env.securityManager)
} else {
new MesosSchedulerBackend(scheduler, sc, url)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index 4089c3e771..20a9faa178 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -27,6 +27,7 @@ import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.sasl.SaslServerBootstrap
import org.apache.spark.network.server.TransportServer
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
+import org.apache.spark.network.util.TransportConf
import org.apache.spark.util.Utils
/**
@@ -45,11 +46,16 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
private val useSasl: Boolean = securityManager.isAuthenticationEnabled()
private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0)
- private val blockHandler = new ExternalShuffleBlockHandler(transportConf)
+ private val blockHandler = newShuffleBlockHandler(transportConf)
private val transportContext: TransportContext = new TransportContext(transportConf, blockHandler)
private var server: TransportServer = _
+ /** Create a new shuffle block handler. Factored out for subclasses to override. */
+ protected def newShuffleBlockHandler(conf: TransportConf): ExternalShuffleBlockHandler = {
+ new ExternalShuffleBlockHandler(conf)
+ }
+
/** Starts the external shuffle service if the user has configured us to. */
def startIfEnabled() {
if (enabled) {
@@ -93,6 +99,13 @@ object ExternalShuffleService extends Logging {
private val barrier = new CountDownLatch(1)
def main(args: Array[String]): Unit = {
+ main(args, (conf: SparkConf, sm: SecurityManager) => new ExternalShuffleService(conf, sm))
+ }
+
+ /** A helper main method that allows the caller to call this with a custom shuffle service. */
+ private[spark] def main(
+ args: Array[String],
+ newShuffleService: (SparkConf, SecurityManager) => ExternalShuffleService): Unit = {
val sparkConf = new SparkConf
Utils.loadDefaultSparkProperties(sparkConf)
val securityManager = new SecurityManager(sparkConf)
@@ -100,7 +113,7 @@ object ExternalShuffleService extends Logging {
// we override this value since this service is started from the command line
// and we assume the user really wants it to be running
sparkConf.set("spark.shuffle.service.enabled", "true")
- server = new ExternalShuffleService(sparkConf, securityManager)
+ server = newShuffleService(sparkConf, securityManager)
server.start()
installShutdownHook()
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
new file mode 100644
index 0000000000..061857476a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos
+
+import java.net.SocketAddress
+
+import scala.collection.mutable
+
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.deploy.ExternalShuffleService
+import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
+import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage
+import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver
+import org.apache.spark.network.util.TransportConf
+
+/**
+ * An RPC endpoint that receives registration requests from Spark drivers running on Mesos.
+ * It detects driver termination and calls the cleanup callback to [[ExternalShuffleService]].
+ */
+private[mesos] class MesosExternalShuffleBlockHandler(transportConf: TransportConf)
+ extends ExternalShuffleBlockHandler(transportConf) with Logging {
+
+ // Stores a map of driver socket addresses to app ids
+ private val connectedApps = new mutable.HashMap[SocketAddress, String]
+
+ protected override def handleMessage(
+ message: BlockTransferMessage,
+ client: TransportClient,
+ callback: RpcResponseCallback): Unit = {
+ message match {
+ case RegisterDriverParam(appId) =>
+ val address = client.getSocketAddress
+ logDebug(s"Received registration request from app $appId (remote address $address).")
+ if (connectedApps.contains(address)) {
+ val existingAppId = connectedApps(address)
+ if (!existingAppId.equals(appId)) {
+ logError(s"A new app '$appId' has connected to existing address $address, " +
+ s"removing previously registered app '$existingAppId'.")
+ applicationRemoved(existingAppId, true)
+ }
+ }
+ connectedApps(address) = appId
+ callback.onSuccess(new Array[Byte](0))
+ case _ => super.handleMessage(message, client, callback)
+ }
+ }
+
+ /**
+ * On connection termination, clean up shuffle files written by the associated application.
+ */
+ override def connectionTerminated(client: TransportClient): Unit = {
+ val address = client.getSocketAddress
+ if (connectedApps.contains(address)) {
+ val appId = connectedApps(address)
+ logInfo(s"Application $appId disconnected (address was $address).")
+ applicationRemoved(appId, true /* cleanupLocalDirs */)
+ connectedApps.remove(address)
+ } else {
+ logWarning(s"Unknown $address disconnected.")
+ }
+ }
+
+ /** An extractor object for matching [[RegisterDriver]] message. */
+ private object RegisterDriverParam {
+ def unapply(r: RegisterDriver): Option[String] = Some(r.getAppId)
+ }
+}
+
+/**
+ * A wrapper of [[ExternalShuffleService]] that provides an additional endpoint for drivers
+ * to associate with. This allows the shuffle service to detect when a driver is terminated
+ * and can clean up the associated shuffle files.
+ */
+private[mesos] class MesosExternalShuffleService(conf: SparkConf, securityManager: SecurityManager)
+ extends ExternalShuffleService(conf, securityManager) {
+
+ protected override def newShuffleBlockHandler(
+ conf: TransportConf): ExternalShuffleBlockHandler = {
+ new MesosExternalShuffleBlockHandler(conf)
+ }
+}
+
+private[spark] object MesosExternalShuffleService extends Logging {
+
+ def main(args: Array[String]): Unit = {
+ ExternalShuffleService.main(args,
+ (conf: SparkConf, sm: SecurityManager) => new MesosExternalShuffleService(conf, sm))
+ }
+}
+
+
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala
index d2b2baef1d..dfcbc51cdf 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala
@@ -47,11 +47,11 @@ private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint
*
* It is guaranteed that `onStart`, `receive` and `onStop` will be called in sequence.
*
- * The lift-cycle will be:
+ * The life-cycle of an endpoint is:
*
- * constructor onStart receive* onStop
+ * constructor -> onStart -> receive* -> onStop
*
- * Note: `receive` can be called concurrently. If you want `receive` is thread-safe, please use
+ * Note: `receive` can be called concurrently. If you want `receive` to be thread-safe, please use
* [[ThreadSafeRpcEndpoint]]
*
* If any error is thrown from one of [[RpcEndpoint]] methods except `onError`, `onError` will be
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index b7fde0d9b3..15a0915708 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -26,12 +26,15 @@ import scala.collection.mutable.{HashMap, HashSet}
import com.google.common.collect.HashBiMap
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
-import org.apache.mesos.{Scheduler => MScheduler, _}
+import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver}
+
+import org.apache.spark.{SecurityManager, SparkContext, SparkEnv, SparkException, TaskState}
+import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.RpcAddress
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
-import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
@@ -46,7 +49,8 @@ import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
private[spark] class CoarseMesosSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext,
- master: String)
+ master: String,
+ securityManager: SecurityManager)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
with MScheduler
with MesosSchedulerUtils {
@@ -56,12 +60,19 @@ private[spark] class CoarseMesosSchedulerBackend(
// Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
+ // If shuffle service is enabled, the Spark driver will register with the shuffle service.
+ // This is for cleaning up shuffle files reliably.
+ private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
+
// Cores we have acquired with each Mesos task ID
val coresByTaskId = new HashMap[Int, Int]
var totalCoresAcquired = 0
val slaveIdsWithExecutors = new HashSet[String]
+ // Maping from slave Id to hostname
+ private val slaveIdToHost = new HashMap[String, String]
+
val taskIdToSlaveId: HashBiMap[Int, String] = HashBiMap.create[Int, String]
// How many times tasks on each slave failed
val failuresBySlaveId: HashMap[String, Int] = new HashMap[String, Int]
@@ -90,6 +101,19 @@ private[spark] class CoarseMesosSchedulerBackend(
private val slaveOfferConstraints =
parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
+ // A client for talking to the external shuffle service, if it is a
+ private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = {
+ if (shuffleServiceEnabled) {
+ Some(new MesosExternalShuffleClient(
+ SparkTransportConf.fromSparkConf(conf),
+ securityManager,
+ securityManager.isAuthenticationEnabled(),
+ securityManager.isSaslEncryptionEnabled()))
+ } else {
+ None
+ }
+ }
+
var nextMesosTaskId = 0
@volatile var appId: String = _
@@ -188,6 +212,7 @@ private[spark] class CoarseMesosSchedulerBackend(
override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
appId = frameworkId.getValue
+ mesosExternalShuffleClient.foreach(_.init(appId))
logInfo("Registered as framework ID " + appId)
markRegistered()
}
@@ -244,6 +269,7 @@ private[spark] class CoarseMesosSchedulerBackend(
// accept the offer and launch the task
logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
+ slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname
d.launchTasks(
Collections.singleton(offer.getId),
Collections.singleton(taskBuilder.build()), filters)
@@ -261,7 +287,27 @@ private[spark] class CoarseMesosSchedulerBackend(
val taskId = status.getTaskId.getValue.toInt
val state = status.getState
logInfo(s"Mesos task $taskId is now $state")
+ val slaveId: String = status.getSlaveId.getValue
stateLock.synchronized {
+ // If the shuffle service is enabled, have the driver register with each one of the
+ // shuffle services. This allows the shuffle services to clean up state associated with
+ // this application when the driver exits. There is currently not a great way to detect
+ // this through Mesos, since the shuffle services are set up independently.
+ if (TaskState.fromMesos(state).equals(TaskState.RUNNING) &&
+ slaveIdToHost.contains(slaveId) &&
+ shuffleServiceEnabled) {
+ assume(mesosExternalShuffleClient.isDefined,
+ "External shuffle client was not instantiated even though shuffle service is enabled.")
+ // TODO: Remove this and allow the MesosExternalShuffleService to detect
+ // framework termination when new Mesos Framework HTTP API is available.
+ val externalShufflePort = conf.getInt("spark.shuffle.service.port", 7337)
+ val hostname = slaveIdToHost.remove(slaveId).get
+ logDebug(s"Connecting to shuffle service on slave $slaveId, " +
+ s"host $hostname, port $externalShufflePort for app ${conf.getAppId}")
+ mesosExternalShuffleClient.get
+ .registerDriverWithShuffleService(hostname, externalShufflePort)
+ }
+
if (TaskState.isFinished(TaskState.fromMesos(state))) {
val slaveId = taskIdToSlaveId(taskId)
slaveIdsWithExecutors -= slaveId
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
index 4b504df7b8..525ee0d3bd 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
@@ -30,7 +30,7 @@ import org.scalatest.mock.MockitoSugar
import org.scalatest.BeforeAndAfter
import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SecurityManager, SparkFunSuite}
class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
with LocalSparkContext
@@ -59,7 +59,8 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
private def createSchedulerBackend(
taskScheduler: TaskSchedulerImpl,
driver: SchedulerDriver): CoarseMesosSchedulerBackend = {
- val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master") {
+ val securityManager = mock[SecurityManager]
+ val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master", securityManager) {
override protected def createSchedulerDriver(
masterUrl: String,
scheduler: Scheduler,
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
index de85720feb..5f95e2c74f 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
@@ -69,7 +69,8 @@ class SparkClassCommandBuilder extends AbstractCommandBuilder {
} else if (className.equals("org.apache.spark.executor.MesosExecutorBackend")) {
javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
memKey = "SPARK_EXECUTOR_MEMORY";
- } else if (className.equals("org.apache.spark.deploy.ExternalShuffleService")) {
+ } else if (className.equals("org.apache.spark.deploy.ExternalShuffleService") ||
+ className.equals("org.apache.spark.deploy.mesos.MesosExternalShuffleService")) {
javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
javaOptsKeys.add("SPARK_SHUFFLE_OPTS");
memKey = "SPARK_DAEMON_MEMORY";
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
index 37f2e34ceb..e8e7f06247 100644
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
+++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
@@ -19,6 +19,7 @@ package org.apache.spark.network.client;
import java.io.Closeable;
import java.io.IOException;
+import java.net.SocketAddress;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -79,6 +80,10 @@ public class TransportClient implements Closeable {
return channel.isOpen() || channel.isActive();
}
+ public SocketAddress getSocketAddress() {
+ return channel.remoteAddress();
+ }
+
/**
* Requests a single chunk from the remote side, from the pre-negotiated streamId.
*
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index e4faaf8854..db9dc4f17c 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -65,7 +65,13 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
@Override
public void receive(TransportClient client, byte[] message, RpcResponseCallback callback) {
BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteArray(message);
+ handleMessage(msgObj, client, callback);
+ }
+ protected void handleMessage(
+ BlockTransferMessage msgObj,
+ TransportClient client,
+ RpcResponseCallback callback) {
if (msgObj instanceof OpenBlocks) {
OpenBlocks msg = (OpenBlocks) msgObj;
List<ManagedBuffer> blocks = Lists.newArrayList();
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index 612bce571a..ea6d248d66 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -50,8 +50,8 @@ public class ExternalShuffleClient extends ShuffleClient {
private final boolean saslEncryptionEnabled;
private final SecretKeyHolder secretKeyHolder;
- private TransportClientFactory clientFactory;
- private String appId;
+ protected TransportClientFactory clientFactory;
+ protected String appId;
/**
* Creates an external shuffle client, with SASL optionally enabled. If SASL is not enabled,
@@ -71,6 +71,10 @@ public class ExternalShuffleClient extends ShuffleClient {
this.saslEncryptionEnabled = saslEncryptionEnabled;
}
+ protected void checkInit() {
+ assert appId != null : "Called before init()";
+ }
+
@Override
public void init(String appId) {
this.appId = appId;
@@ -89,7 +93,7 @@ public class ExternalShuffleClient extends ShuffleClient {
final String execId,
String[] blockIds,
BlockFetchingListener listener) {
- assert appId != null : "Called before init()";
+ checkInit();
logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
try {
RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
@@ -132,7 +136,7 @@ public class ExternalShuffleClient extends ShuffleClient {
int port,
String execId,
ExecutorShuffleInfo executorInfo) throws IOException {
- assert appId != null : "Called before init()";
+ checkInit();
TransportClient client = clientFactory.createClient(host, port);
byte[] registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteArray();
client.sendRpcSync(registerMessage, 5000 /* timeoutMs */);
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
new file mode 100644
index 0000000000..7543b6be4f
--- /dev/null
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.mesos;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.sasl.SecretKeyHolder;
+import org.apache.spark.network.shuffle.ExternalShuffleClient;
+import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
+import org.apache.spark.network.util.TransportConf;
+
+/**
+ * A client for talking to the external shuffle service in Mesos coarse-grained mode.
+ *
+ * This is used by the Spark driver to register with each external shuffle service on the cluster.
+ * The reason why the driver has to talk to the service is for cleaning up shuffle files reliably
+ * after the application exits. Mesos does not provide a great alternative to do this, so Spark
+ * has to detect this itself.
+ */
+public class MesosExternalShuffleClient extends ExternalShuffleClient {
+ private final Logger logger = LoggerFactory.getLogger(MesosExternalShuffleClient.class);
+
+ /**
+ * Creates an Mesos external shuffle client that wraps the {@link ExternalShuffleClient}.
+ * Please refer to docs on {@link ExternalShuffleClient} for more information.
+ */
+ public MesosExternalShuffleClient(
+ TransportConf conf,
+ SecretKeyHolder secretKeyHolder,
+ boolean saslEnabled,
+ boolean saslEncryptionEnabled) {
+ super(conf, secretKeyHolder, saslEnabled, saslEncryptionEnabled);
+ }
+
+ public void registerDriverWithShuffleService(String host, int port) throws IOException {
+ checkInit();
+ byte[] registerDriver = new RegisterDriver(appId).toByteArray();
+ TransportClient client = clientFactory.createClient(host, port);
+ client.sendRpc(registerDriver, new RpcResponseCallback() {
+ @Override
+ public void onSuccess(byte[] response) {
+ logger.info("Successfully registered app " + appId + " with external shuffle service.");
+ }
+
+ @Override
+ public void onFailure(Throwable e) {
+ logger.warn("Unable to register app " + appId + " with external shuffle service. " +
+ "Please manually remove shuffle data after driver exit. Error: " + e);
+ }
+ });
+ }
+}
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
index 6c1210b332..fcb52363e6 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
@@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.spark.network.protocol.Encodable;
+import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
/**
* Messages handled by the {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler}, or
@@ -37,7 +38,7 @@ public abstract class BlockTransferMessage implements Encodable {
/** Preceding every serialized message is its type, which allows us to deserialize it. */
public static enum Type {
- OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3);
+ OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4);
private final byte id;
@@ -60,6 +61,7 @@ public abstract class BlockTransferMessage implements Encodable {
case 1: return UploadBlock.decode(buf);
case 2: return RegisterExecutor.decode(buf);
case 3: return StreamHandle.decode(buf);
+ case 4: return RegisterDriver.decode(buf);
default: throw new IllegalArgumentException("Unknown message type: " + type);
}
}
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java
new file mode 100644
index 0000000000..1c28fc1dff
--- /dev/null
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol.mesos;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.protocol.Encoders;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
+
+/**
+ * A message sent from the driver to register with the MesosExternalShuffleService.
+ */
+public class RegisterDriver extends BlockTransferMessage {
+ private final String appId;
+
+ public RegisterDriver(String appId) {
+ this.appId = appId;
+ }
+
+ public String getAppId() { return appId; }
+
+ @Override
+ protected Type type() { return Type.REGISTER_DRIVER; }
+
+ @Override
+ public int encodedLength() {
+ return Encoders.Strings.encodedLength(appId);
+ }
+
+ @Override
+ public void encode(ByteBuf buf) {
+ Encoders.Strings.encode(buf, appId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(appId);
+ }
+
+ public static RegisterDriver decode(ByteBuf buf) {
+ String appId = Encoders.Strings.decode(buf);
+ return new RegisterDriver(appId);
+ }
+}
diff --git a/sbin/start-mesos-shuffle-service.sh b/sbin/start-mesos-shuffle-service.sh
new file mode 100755
index 0000000000..64580762c5
--- /dev/null
+++ b/sbin/start-mesos-shuffle-service.sh
@@ -0,0 +1,35 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Starts the Mesos external shuffle server on the machine this script is executed on.
+# The Mesos external shuffle service detects when an application exits and automatically
+# cleans up its shuffle files.
+#
+# Usage: start-mesos-shuffle-server.sh
+#
+# Use the SPARK_SHUFFLE_OPTS environment variable to set shuffle service configuration.
+#
+
+sbin="`dirname "$0"`"
+sbin="`cd "$sbin"; pwd`"
+
+. "$sbin/spark-config.sh"
+. "$SPARK_PREFIX/bin/load-spark-env.sh"
+
+exec "$sbin"/spark-daemon.sh start org.apache.spark.deploy.mesos.MesosExternalShuffleService 1
diff --git a/sbin/stop-mesos-shuffle-service.sh b/sbin/stop-mesos-shuffle-service.sh
new file mode 100755
index 0000000000..0e965d5ec5
--- /dev/null
+++ b/sbin/stop-mesos-shuffle-service.sh
@@ -0,0 +1,25 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Stops the Mesos external shuffle service on the machine this script is executed on.
+
+sbin="`dirname "$0"`"
+sbin="`cd "$sbin"; pwd`"
+
+"$sbin"/spark-daemon.sh stop org.apache.spark.deploy.mesos.MesosExternalShuffleService 1