From 95dccc63350c45045f038bab9f8a5080b4e1f8cc Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Mon, 3 Aug 2015 01:55:58 -0700 Subject: [SPARK-8873] [MESOS] Clean up shuffle files if external shuffle service is used This patch builds directly on #7820, which is largely written by tnachen. The only addition is one commit for cleaning up the code. There should be no functional differences between this and #7820. Author: Timothy Chen Author: Andrew Or Closes #7881 from andrewor14/tim-cleanup-mesos-shuffle and squashes the following commits: 8894f7d [Andrew Or] Clean up code 2a5fa10 [Andrew Or] Merge branch 'mesos_shuffle_clean' of github.com:tnachen/spark into tim-cleanup-mesos-shuffle fadff89 [Timothy Chen] Address comments. e4d0f1d [Timothy Chen] Clean up external shuffle data on driver exit with Mesos. --- .../main/scala/org/apache/spark/SparkContext.scala | 2 +- .../spark/deploy/ExternalShuffleService.scala | 17 +++- .../deploy/mesos/MesosExternalShuffleService.scala | 107 +++++++++++++++++++++ .../scala/org/apache/spark/rpc/RpcEndpoint.scala | 6 +- .../mesos/CoarseMesosSchedulerBackend.scala | 52 +++++++++- .../mesos/CoarseMesosSchedulerBackendSuite.scala | 5 +- .../spark/launcher/SparkClassCommandBuilder.java | 3 +- .../spark/network/client/TransportClient.java | 5 + .../shuffle/ExternalShuffleBlockHandler.java | 6 ++ .../network/shuffle/ExternalShuffleClient.java | 12 ++- .../shuffle/mesos/MesosExternalShuffleClient.java | 72 ++++++++++++++ .../shuffle/protocol/BlockTransferMessage.java | 4 +- .../shuffle/protocol/mesos/RegisterDriver.java | 60 ++++++++++++ sbin/start-mesos-shuffle-service.sh | 35 +++++++ sbin/stop-mesos-shuffle-service.sh | 25 +++++ 15 files changed, 394 insertions(+), 17 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala create mode 100644 network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java create mode 100644 network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java create mode 100755 sbin/start-mesos-shuffle-service.sh create mode 100755 sbin/stop-mesos-shuffle-service.sh diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index a1c66ef4fc..6f336a7c29 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2658,7 +2658,7 @@ object SparkContext extends Logging { val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false) val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs val backend = if (coarseGrained) { - new CoarseMesosSchedulerBackend(scheduler, sc, url) + new CoarseMesosSchedulerBackend(scheduler, sc, url, sc.env.securityManager) } else { new MesosSchedulerBackend(scheduler, sc, url) } diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index 4089c3e771..20a9faa178 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -27,6 +27,7 @@ import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.sasl.SaslServerBootstrap import org.apache.spark.network.server.TransportServer import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler +import org.apache.spark.network.util.TransportConf import org.apache.spark.util.Utils /** @@ -45,11 +46,16 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana private val useSasl: Boolean = securityManager.isAuthenticationEnabled() private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0) - private val blockHandler = new ExternalShuffleBlockHandler(transportConf) + private val blockHandler = newShuffleBlockHandler(transportConf) private val transportContext: TransportContext = new TransportContext(transportConf, blockHandler) private var server: TransportServer = _ + /** Create a new shuffle block handler. Factored out for subclasses to override. */ + protected def newShuffleBlockHandler(conf: TransportConf): ExternalShuffleBlockHandler = { + new ExternalShuffleBlockHandler(conf) + } + /** Starts the external shuffle service if the user has configured us to. */ def startIfEnabled() { if (enabled) { @@ -93,6 +99,13 @@ object ExternalShuffleService extends Logging { private val barrier = new CountDownLatch(1) def main(args: Array[String]): Unit = { + main(args, (conf: SparkConf, sm: SecurityManager) => new ExternalShuffleService(conf, sm)) + } + + /** A helper main method that allows the caller to call this with a custom shuffle service. */ + private[spark] def main( + args: Array[String], + newShuffleService: (SparkConf, SecurityManager) => ExternalShuffleService): Unit = { val sparkConf = new SparkConf Utils.loadDefaultSparkProperties(sparkConf) val securityManager = new SecurityManager(sparkConf) @@ -100,7 +113,7 @@ object ExternalShuffleService extends Logging { // we override this value since this service is started from the command line // and we assume the user really wants it to be running sparkConf.set("spark.shuffle.service.enabled", "true") - server = new ExternalShuffleService(sparkConf, securityManager) + server = newShuffleService(sparkConf, securityManager) server.start() installShutdownHook() diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala new file mode 100644 index 0000000000..061857476a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.mesos + +import java.net.SocketAddress + +import scala.collection.mutable + +import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.deploy.ExternalShuffleService +import org.apache.spark.network.client.{RpcResponseCallback, TransportClient} +import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler +import org.apache.spark.network.shuffle.protocol.BlockTransferMessage +import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver +import org.apache.spark.network.util.TransportConf + +/** + * An RPC endpoint that receives registration requests from Spark drivers running on Mesos. + * It detects driver termination and calls the cleanup callback to [[ExternalShuffleService]]. + */ +private[mesos] class MesosExternalShuffleBlockHandler(transportConf: TransportConf) + extends ExternalShuffleBlockHandler(transportConf) with Logging { + + // Stores a map of driver socket addresses to app ids + private val connectedApps = new mutable.HashMap[SocketAddress, String] + + protected override def handleMessage( + message: BlockTransferMessage, + client: TransportClient, + callback: RpcResponseCallback): Unit = { + message match { + case RegisterDriverParam(appId) => + val address = client.getSocketAddress + logDebug(s"Received registration request from app $appId (remote address $address).") + if (connectedApps.contains(address)) { + val existingAppId = connectedApps(address) + if (!existingAppId.equals(appId)) { + logError(s"A new app '$appId' has connected to existing address $address, " + + s"removing previously registered app '$existingAppId'.") + applicationRemoved(existingAppId, true) + } + } + connectedApps(address) = appId + callback.onSuccess(new Array[Byte](0)) + case _ => super.handleMessage(message, client, callback) + } + } + + /** + * On connection termination, clean up shuffle files written by the associated application. + */ + override def connectionTerminated(client: TransportClient): Unit = { + val address = client.getSocketAddress + if (connectedApps.contains(address)) { + val appId = connectedApps(address) + logInfo(s"Application $appId disconnected (address was $address).") + applicationRemoved(appId, true /* cleanupLocalDirs */) + connectedApps.remove(address) + } else { + logWarning(s"Unknown $address disconnected.") + } + } + + /** An extractor object for matching [[RegisterDriver]] message. */ + private object RegisterDriverParam { + def unapply(r: RegisterDriver): Option[String] = Some(r.getAppId) + } +} + +/** + * A wrapper of [[ExternalShuffleService]] that provides an additional endpoint for drivers + * to associate with. This allows the shuffle service to detect when a driver is terminated + * and can clean up the associated shuffle files. + */ +private[mesos] class MesosExternalShuffleService(conf: SparkConf, securityManager: SecurityManager) + extends ExternalShuffleService(conf, securityManager) { + + protected override def newShuffleBlockHandler( + conf: TransportConf): ExternalShuffleBlockHandler = { + new MesosExternalShuffleBlockHandler(conf) + } +} + +private[spark] object MesosExternalShuffleService extends Logging { + + def main(args: Array[String]): Unit = { + ExternalShuffleService.main(args, + (conf: SparkConf, sm: SecurityManager) => new MesosExternalShuffleService(conf, sm)) + } +} + + diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala index d2b2baef1d..dfcbc51cdf 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala @@ -47,11 +47,11 @@ private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint * * It is guaranteed that `onStart`, `receive` and `onStop` will be called in sequence. * - * The lift-cycle will be: + * The life-cycle of an endpoint is: * - * constructor onStart receive* onStop + * constructor -> onStart -> receive* -> onStop * - * Note: `receive` can be called concurrently. If you want `receive` is thread-safe, please use + * Note: `receive` can be called concurrently. If you want `receive` to be thread-safe, please use * [[ThreadSafeRpcEndpoint]] * * If any error is thrown from one of [[RpcEndpoint]] methods except `onError`, `onError` will be diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index b7fde0d9b3..15a0915708 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -26,12 +26,15 @@ import scala.collection.mutable.{HashMap, HashSet} import com.google.common.collect.HashBiMap import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} -import org.apache.mesos.{Scheduler => MScheduler, _} +import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver} + +import org.apache.spark.{SecurityManager, SparkContext, SparkEnv, SparkException, TaskState} +import org.apache.spark.network.netty.SparkTransportConf +import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient import org.apache.spark.rpc.RpcAddress import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils -import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState} /** * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds @@ -46,7 +49,8 @@ import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState} private[spark] class CoarseMesosSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext, - master: String) + master: String, + securityManager: SecurityManager) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) with MScheduler with MesosSchedulerUtils { @@ -56,12 +60,19 @@ private[spark] class CoarseMesosSchedulerBackend( // Maximum number of cores to acquire (TODO: we'll need more flexible controls here) val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt + // If shuffle service is enabled, the Spark driver will register with the shuffle service. + // This is for cleaning up shuffle files reliably. + private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) + // Cores we have acquired with each Mesos task ID val coresByTaskId = new HashMap[Int, Int] var totalCoresAcquired = 0 val slaveIdsWithExecutors = new HashSet[String] + // Maping from slave Id to hostname + private val slaveIdToHost = new HashMap[String, String] + val taskIdToSlaveId: HashBiMap[Int, String] = HashBiMap.create[Int, String] // How many times tasks on each slave failed val failuresBySlaveId: HashMap[String, Int] = new HashMap[String, Int] @@ -90,6 +101,19 @@ private[spark] class CoarseMesosSchedulerBackend( private val slaveOfferConstraints = parseConstraintString(sc.conf.get("spark.mesos.constraints", "")) + // A client for talking to the external shuffle service, if it is a + private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = { + if (shuffleServiceEnabled) { + Some(new MesosExternalShuffleClient( + SparkTransportConf.fromSparkConf(conf), + securityManager, + securityManager.isAuthenticationEnabled(), + securityManager.isSaslEncryptionEnabled())) + } else { + None + } + } + var nextMesosTaskId = 0 @volatile var appId: String = _ @@ -188,6 +212,7 @@ private[spark] class CoarseMesosSchedulerBackend( override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { appId = frameworkId.getValue + mesosExternalShuffleClient.foreach(_.init(appId)) logInfo("Registered as framework ID " + appId) markRegistered() } @@ -244,6 +269,7 @@ private[spark] class CoarseMesosSchedulerBackend( // accept the offer and launch the task logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus") + slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname d.launchTasks( Collections.singleton(offer.getId), Collections.singleton(taskBuilder.build()), filters) @@ -261,7 +287,27 @@ private[spark] class CoarseMesosSchedulerBackend( val taskId = status.getTaskId.getValue.toInt val state = status.getState logInfo(s"Mesos task $taskId is now $state") + val slaveId: String = status.getSlaveId.getValue stateLock.synchronized { + // If the shuffle service is enabled, have the driver register with each one of the + // shuffle services. This allows the shuffle services to clean up state associated with + // this application when the driver exits. There is currently not a great way to detect + // this through Mesos, since the shuffle services are set up independently. + if (TaskState.fromMesos(state).equals(TaskState.RUNNING) && + slaveIdToHost.contains(slaveId) && + shuffleServiceEnabled) { + assume(mesosExternalShuffleClient.isDefined, + "External shuffle client was not instantiated even though shuffle service is enabled.") + // TODO: Remove this and allow the MesosExternalShuffleService to detect + // framework termination when new Mesos Framework HTTP API is available. + val externalShufflePort = conf.getInt("spark.shuffle.service.port", 7337) + val hostname = slaveIdToHost.remove(slaveId).get + logDebug(s"Connecting to shuffle service on slave $slaveId, " + + s"host $hostname, port $externalShufflePort for app ${conf.getAppId}") + mesosExternalShuffleClient.get + .registerDriverWithShuffleService(hostname, externalShufflePort) + } + if (TaskState.isFinished(TaskState.fromMesos(state))) { val slaveId = taskIdToSlaveId(taskId) slaveIdsWithExecutors -= slaveId diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala index 4b504df7b8..525ee0d3bd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala @@ -30,7 +30,7 @@ import org.scalatest.mock.MockitoSugar import org.scalatest.BeforeAndAfter import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SecurityManager, SparkFunSuite} class CoarseMesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext @@ -59,7 +59,8 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite private def createSchedulerBackend( taskScheduler: TaskSchedulerImpl, driver: SchedulerDriver): CoarseMesosSchedulerBackend = { - val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master") { + val securityManager = mock[SecurityManager] + val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master", securityManager) { override protected def createSchedulerDriver( masterUrl: String, scheduler: Scheduler, diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java index de85720feb..5f95e2c74f 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java @@ -69,7 +69,8 @@ class SparkClassCommandBuilder extends AbstractCommandBuilder { } else if (className.equals("org.apache.spark.executor.MesosExecutorBackend")) { javaOptsKeys.add("SPARK_EXECUTOR_OPTS"); memKey = "SPARK_EXECUTOR_MEMORY"; - } else if (className.equals("org.apache.spark.deploy.ExternalShuffleService")) { + } else if (className.equals("org.apache.spark.deploy.ExternalShuffleService") || + className.equals("org.apache.spark.deploy.mesos.MesosExternalShuffleService")) { javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); javaOptsKeys.add("SPARK_SHUFFLE_OPTS"); memKey = "SPARK_DAEMON_MEMORY"; diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java index 37f2e34ceb..e8e7f06247 100644 --- a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java +++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java @@ -19,6 +19,7 @@ package org.apache.spark.network.client; import java.io.Closeable; import java.io.IOException; +import java.net.SocketAddress; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -79,6 +80,10 @@ public class TransportClient implements Closeable { return channel.isOpen() || channel.isActive(); } + public SocketAddress getSocketAddress() { + return channel.remoteAddress(); + } + /** * Requests a single chunk from the remote side, from the pre-negotiated streamId. * diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index e4faaf8854..db9dc4f17c 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -65,7 +65,13 @@ public class ExternalShuffleBlockHandler extends RpcHandler { @Override public void receive(TransportClient client, byte[] message, RpcResponseCallback callback) { BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteArray(message); + handleMessage(msgObj, client, callback); + } + protected void handleMessage( + BlockTransferMessage msgObj, + TransportClient client, + RpcResponseCallback callback) { if (msgObj instanceof OpenBlocks) { OpenBlocks msg = (OpenBlocks) msgObj; List blocks = Lists.newArrayList(); diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java index 612bce571a..ea6d248d66 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java @@ -50,8 +50,8 @@ public class ExternalShuffleClient extends ShuffleClient { private final boolean saslEncryptionEnabled; private final SecretKeyHolder secretKeyHolder; - private TransportClientFactory clientFactory; - private String appId; + protected TransportClientFactory clientFactory; + protected String appId; /** * Creates an external shuffle client, with SASL optionally enabled. If SASL is not enabled, @@ -71,6 +71,10 @@ public class ExternalShuffleClient extends ShuffleClient { this.saslEncryptionEnabled = saslEncryptionEnabled; } + protected void checkInit() { + assert appId != null : "Called before init()"; + } + @Override public void init(String appId) { this.appId = appId; @@ -89,7 +93,7 @@ public class ExternalShuffleClient extends ShuffleClient { final String execId, String[] blockIds, BlockFetchingListener listener) { - assert appId != null : "Called before init()"; + checkInit(); logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId); try { RetryingBlockFetcher.BlockFetchStarter blockFetchStarter = @@ -132,7 +136,7 @@ public class ExternalShuffleClient extends ShuffleClient { int port, String execId, ExecutorShuffleInfo executorInfo) throws IOException { - assert appId != null : "Called before init()"; + checkInit(); TransportClient client = clientFactory.createClient(host, port); byte[] registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteArray(); client.sendRpcSync(registerMessage, 5000 /* timeoutMs */); diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java new file mode 100644 index 0000000000..7543b6be4f --- /dev/null +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle.mesos; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.client.RpcResponseCallback; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.sasl.SecretKeyHolder; +import org.apache.spark.network.shuffle.ExternalShuffleClient; +import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver; +import org.apache.spark.network.util.TransportConf; + +/** + * A client for talking to the external shuffle service in Mesos coarse-grained mode. + * + * This is used by the Spark driver to register with each external shuffle service on the cluster. + * The reason why the driver has to talk to the service is for cleaning up shuffle files reliably + * after the application exits. Mesos does not provide a great alternative to do this, so Spark + * has to detect this itself. + */ +public class MesosExternalShuffleClient extends ExternalShuffleClient { + private final Logger logger = LoggerFactory.getLogger(MesosExternalShuffleClient.class); + + /** + * Creates an Mesos external shuffle client that wraps the {@link ExternalShuffleClient}. + * Please refer to docs on {@link ExternalShuffleClient} for more information. + */ + public MesosExternalShuffleClient( + TransportConf conf, + SecretKeyHolder secretKeyHolder, + boolean saslEnabled, + boolean saslEncryptionEnabled) { + super(conf, secretKeyHolder, saslEnabled, saslEncryptionEnabled); + } + + public void registerDriverWithShuffleService(String host, int port) throws IOException { + checkInit(); + byte[] registerDriver = new RegisterDriver(appId).toByteArray(); + TransportClient client = clientFactory.createClient(host, port); + client.sendRpc(registerDriver, new RpcResponseCallback() { + @Override + public void onSuccess(byte[] response) { + logger.info("Successfully registered app " + appId + " with external shuffle service."); + } + + @Override + public void onFailure(Throwable e) { + logger.warn("Unable to register app " + appId + " with external shuffle service. " + + "Please manually remove shuffle data after driver exit. Error: " + e); + } + }); + } +} diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java index 6c1210b332..fcb52363e6 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java @@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import org.apache.spark.network.protocol.Encodable; +import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver; /** * Messages handled by the {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler}, or @@ -37,7 +38,7 @@ public abstract class BlockTransferMessage implements Encodable { /** Preceding every serialized message is its type, which allows us to deserialize it. */ public static enum Type { - OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3); + OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4); private final byte id; @@ -60,6 +61,7 @@ public abstract class BlockTransferMessage implements Encodable { case 1: return UploadBlock.decode(buf); case 2: return RegisterExecutor.decode(buf); case 3: return StreamHandle.decode(buf); + case 4: return RegisterDriver.decode(buf); default: throw new IllegalArgumentException("Unknown message type: " + type); } } diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java new file mode 100644 index 0000000000..1c28fc1dff --- /dev/null +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle.protocol.mesos; + +import com.google.common.base.Objects; +import io.netty.buffer.ByteBuf; + +import org.apache.spark.network.protocol.Encoders; +import org.apache.spark.network.shuffle.protocol.BlockTransferMessage; + +/** + * A message sent from the driver to register with the MesosExternalShuffleService. + */ +public class RegisterDriver extends BlockTransferMessage { + private final String appId; + + public RegisterDriver(String appId) { + this.appId = appId; + } + + public String getAppId() { return appId; } + + @Override + protected Type type() { return Type.REGISTER_DRIVER; } + + @Override + public int encodedLength() { + return Encoders.Strings.encodedLength(appId); + } + + @Override + public void encode(ByteBuf buf) { + Encoders.Strings.encode(buf, appId); + } + + @Override + public int hashCode() { + return Objects.hashCode(appId); + } + + public static RegisterDriver decode(ByteBuf buf) { + String appId = Encoders.Strings.decode(buf); + return new RegisterDriver(appId); + } +} diff --git a/sbin/start-mesos-shuffle-service.sh b/sbin/start-mesos-shuffle-service.sh new file mode 100755 index 0000000000..64580762c5 --- /dev/null +++ b/sbin/start-mesos-shuffle-service.sh @@ -0,0 +1,35 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Starts the Mesos external shuffle server on the machine this script is executed on. +# The Mesos external shuffle service detects when an application exits and automatically +# cleans up its shuffle files. +# +# Usage: start-mesos-shuffle-server.sh +# +# Use the SPARK_SHUFFLE_OPTS environment variable to set shuffle service configuration. +# + +sbin="`dirname "$0"`" +sbin="`cd "$sbin"; pwd`" + +. "$sbin/spark-config.sh" +. "$SPARK_PREFIX/bin/load-spark-env.sh" + +exec "$sbin"/spark-daemon.sh start org.apache.spark.deploy.mesos.MesosExternalShuffleService 1 diff --git a/sbin/stop-mesos-shuffle-service.sh b/sbin/stop-mesos-shuffle-service.sh new file mode 100755 index 0000000000..0e965d5ec5 --- /dev/null +++ b/sbin/stop-mesos-shuffle-service.sh @@ -0,0 +1,25 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Stops the Mesos external shuffle service on the machine this script is executed on. + +sbin="`dirname "$0"`" +sbin="`cd "$sbin"; pwd`" + +"$sbin"/spark-daemon.sh stop org.apache.spark.deploy.mesos.MesosExternalShuffleService 1 -- cgit v1.2.3