aboutsummaryrefslogtreecommitdiff
path: root/network
diff options
context:
space:
mode:
authorImran Rashid <irashid@cloudera.com>2015-08-21 08:41:36 -0500
committerTom Graves <tgraves@yahoo-inc.com>2015-08-21 08:41:36 -0500
commit708036c1de52d674ceff30ac465e1dcedeb8dde8 (patch)
treec9ecf08801d862c96c166d7db5915a66f4c89b56 /network
parentbb220f6570aa0b95598b30524224a3e82c1effbc (diff)
downloadspark-708036c1de52d674ceff30ac465e1dcedeb8dde8.tar.gz
spark-708036c1de52d674ceff30ac465e1dcedeb8dde8.tar.bz2
spark-708036c1de52d674ceff30ac465e1dcedeb8dde8.zip
[SPARK-9439] [YARN] External shuffle service robust to NM restarts using leveldb
https://issues.apache.org/jira/browse/SPARK-9439 In general, Yarn apps should be robust to NodeManager restarts. However, if you run spark with the external shuffle service on, after a NM restart all shuffles fail, b/c the shuffle service has lost some state with info on each executor. (Note the shuffle data is perfectly fine on disk across a NM restart, the problem is we've lost the small bit of state that lets us *find* those files.) The solution proposed here is that the external shuffle service can write out its state to leveldb (backed by a local file) every time an executor is added. When running with yarn, that file is in the NM's local dir. Whenever the service is started, it looks for that file, and if it exists, it reads the file and re-registers all executors there. Nothing is changed in non-yarn modes with this patch. The service is not given a place to save the state to, so it operates the same as before. This should make it easy to update other cluster managers as well, by just supplying the right file & the equivalent of yarn's `initializeApplication` -- I'm not familiar enough with those modes to know how to do that. Author: Imran Rashid <irashid@cloudera.com> Closes #7943 from squito/leveldb_external_shuffle_service_NM_restart and squashes the following commits: 0d285d3 [Imran Rashid] review feedback 70951d6 [Imran Rashid] Merge branch 'master' into leveldb_external_shuffle_service_NM_restart 5c71c8c [Imran Rashid] save executor to db before registering; style 2499c8c [Imran Rashid] explicit dependency on jackson-annotations 795d28f [Imran Rashid] review feedback 81f80e2 [Imran Rashid] Merge branch 'master' into leveldb_external_shuffle_service_NM_restart 594d520 [Imran Rashid] use json to serialize application executor info 1a7980b [Imran Rashid] version 8267d2a [Imran Rashid] style e9f99e8 [Imran Rashid] cleanup the handling of bad dbs a little 9378ba3 [Imran Rashid] fail gracefully on corrupt leveldb files acedb62 [Imran Rashid] switch to writing out one record per executor 79922b7 [Imran Rashid] rely on yarn to call stopApplication; assorted cleanup 12b6a35 [Imran Rashid] save registered executors when apps are removed; add tests c878fbe [Imran Rashid] better explanation of shuffle service port handling 694934c [Imran Rashid] only open leveldb connection once per service d596410 [Imran Rashid] store executor data in leveldb 59800b7 [Imran Rashid] Files.move in case renaming is unsupported 32fe5ae [Imran Rashid] Merge branch 'master' into external_shuffle_service_NM_restart d7450f0 [Imran Rashid] style f729e2b [Imran Rashid] debugging 4492835 [Imran Rashid] lol, dont use a PrintWriter b/c of scalastyle checks 0a39b98 [Imran Rashid] Merge branch 'master' into external_shuffle_service_NM_restart 55f49fc [Imran Rashid] make sure the service doesnt die if the registered executor file is corrupt; add tests 245db19 [Imran Rashid] style 62586a6 [Imran Rashid] just serialize the whole executors map bdbbf0d [Imran Rashid] comments, remove some unnecessary changes 857331a [Imran Rashid] better tests & comments bb9d1e6 [Imran Rashid] formatting bdc4b32 [Imran Rashid] rename 86e0cb9 [Imran Rashid] for tests, shuffle service finds an open port 23994ff [Imran Rashid] style 7504de8 [Imran Rashid] style a36729c [Imran Rashid] cleanup efb6195 [Imran Rashid] proper unit test, and no longer leak if apps stop during NM restart dd93dc0 [Imran Rashid] test for shuffle service w/ NM restarts d596969 [Imran Rashid] cleanup imports 0e9d69b [Imran Rashid] better names 9eae119 [Imran Rashid] cleanup lots of duplication 1136f44 [Imran Rashid] test needs to have an actual shuffle 0b588bd [Imran Rashid] more fixes ... ad122ef [Imran Rashid] more fixes 5e5a7c3 [Imran Rashid] fix build c69f46b [Imran Rashid] maybe working version, needs tests & cleanup ... bb3ba49 [Imran Rashid] minor cleanup 36127d3 [Imran Rashid] wip b9d2ced [Imran Rashid] incomplete setup for external shuffle service tests
Diffstat (limited to 'network')
-rw-r--r--network/shuffle/pom.xml16
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java37
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java225
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java8
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java35
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java9
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java2
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java5
-rw-r--r--network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java62
9 files changed, 353 insertions, 46 deletions
diff --git a/network/shuffle/pom.xml b/network/shuffle/pom.xml
index 532463e96f..3d2edf9d94 100644
--- a/network/shuffle/pom.xml
+++ b/network/shuffle/pom.xml
@@ -43,6 +43,22 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.fusesource.leveldbjni</groupId>
+ <artifactId>leveldbjni-all</artifactId>
+ <version>1.8</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
+
<!-- Provided dependencies -->
<dependency>
<groupId>org.slf4j</groupId>
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index db9dc4f17c..0df1dd621f 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -17,11 +17,12 @@
package org.apache.spark.network.shuffle;
+import java.io.File;
+import java.io.IOException;
import java.util.List;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
-import org.apache.spark.network.util.TransportConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,10 +32,10 @@ import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
-import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
-import org.apache.spark.network.shuffle.protocol.OpenBlocks;
-import org.apache.spark.network.shuffle.protocol.RegisterExecutor;
-import org.apache.spark.network.shuffle.protocol.StreamHandle;
+import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId;
+import org.apache.spark.network.shuffle.protocol.*;
+import org.apache.spark.network.util.TransportConf;
+
/**
* RPC Handler for a server which can serve shuffle blocks from outside of an Executor process.
@@ -46,11 +47,13 @@ import org.apache.spark.network.shuffle.protocol.StreamHandle;
public class ExternalShuffleBlockHandler extends RpcHandler {
private final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockHandler.class);
- private final ExternalShuffleBlockResolver blockManager;
+ @VisibleForTesting
+ final ExternalShuffleBlockResolver blockManager;
private final OneForOneStreamManager streamManager;
- public ExternalShuffleBlockHandler(TransportConf conf) {
- this(new OneForOneStreamManager(), new ExternalShuffleBlockResolver(conf));
+ public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile) throws IOException {
+ this(new OneForOneStreamManager(),
+ new ExternalShuffleBlockResolver(conf, registeredExecutorFile));
}
/** Enables mocking out the StreamManager and BlockManager. */
@@ -105,4 +108,22 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
blockManager.applicationRemoved(appId, cleanupLocalDirs);
}
+
+ /**
+ * Register an (application, executor) with the given shuffle info.
+ *
+ * The "re-" is meant to highlight the intended use of this method -- when this service is
+ * restarted, this is used to restore the state of executors from before the restart. Normal
+ * registration will happen via a message handled in receive()
+ *
+ * @param appExecId
+ * @param executorInfo
+ */
+ public void reregisterExecutor(AppExecId appExecId, ExecutorShuffleInfo executorInfo) {
+ blockManager.registerExecutor(appExecId.appId, appExecId.execId, executorInfo);
+ }
+
+ public void close() {
+ blockManager.close();
+ }
}
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index 022ed88a16..79beec4429 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -17,19 +17,24 @@
package org.apache.spark.network.shuffle;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
+import java.io.*;
+import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
import com.google.common.base.Objects;
import com.google.common.collect.Maps;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.fusesource.leveldbjni.internal.NativeDB;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBIterator;
+import org.iq80.leveldb.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,25 +57,87 @@ import org.apache.spark.network.util.TransportConf;
public class ExternalShuffleBlockResolver {
private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockResolver.class);
+ private static final ObjectMapper mapper = new ObjectMapper();
+ /**
+ * This a common prefix to the key for each app registration we stick in leveldb, so they
+ * are easy to find, since leveldb lets you search based on prefix.
+ */
+ private static final String APP_KEY_PREFIX = "AppExecShuffleInfo";
+ private static final StoreVersion CURRENT_VERSION = new StoreVersion(1, 0);
+
// Map containing all registered executors' metadata.
- private final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
+ @VisibleForTesting
+ final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
// Single-threaded Java executor used to perform expensive recursive directory deletion.
private final Executor directoryCleaner;
private final TransportConf conf;
- public ExternalShuffleBlockResolver(TransportConf conf) {
- this(conf, Executors.newSingleThreadExecutor(
+ @VisibleForTesting
+ final File registeredExecutorFile;
+ @VisibleForTesting
+ final DB db;
+
+ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile)
+ throws IOException {
+ this(conf, registeredExecutorFile, Executors.newSingleThreadExecutor(
// Add `spark` prefix because it will run in NM in Yarn mode.
NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner")));
}
// Allows tests to have more control over when directories are cleaned up.
@VisibleForTesting
- ExternalShuffleBlockResolver(TransportConf conf, Executor directoryCleaner) {
+ ExternalShuffleBlockResolver(
+ TransportConf conf,
+ File registeredExecutorFile,
+ Executor directoryCleaner) throws IOException {
this.conf = conf;
- this.executors = Maps.newConcurrentMap();
+ this.registeredExecutorFile = registeredExecutorFile;
+ if (registeredExecutorFile != null) {
+ Options options = new Options();
+ options.createIfMissing(false);
+ options.logger(new LevelDBLogger());
+ DB tmpDb;
+ try {
+ tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options);
+ } catch (NativeDB.DBException e) {
+ if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
+ logger.info("Creating state database at " + registeredExecutorFile);
+ options.createIfMissing(true);
+ try {
+ tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options);
+ } catch (NativeDB.DBException dbExc) {
+ throw new IOException("Unable to create state store", dbExc);
+ }
+ } else {
+ // the leveldb file seems to be corrupt somehow. Lets just blow it away and create a new
+ // one, so we can keep processing new apps
+ logger.error("error opening leveldb file {}. Creating new file, will not be able to " +
+ "recover state for existing applications", registeredExecutorFile, e);
+ if (registeredExecutorFile.isDirectory()) {
+ for (File f : registeredExecutorFile.listFiles()) {
+ f.delete();
+ }
+ }
+ registeredExecutorFile.delete();
+ options.createIfMissing(true);
+ try {
+ tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options);
+ } catch (NativeDB.DBException dbExc) {
+ throw new IOException("Unable to create state store", dbExc);
+ }
+
+ }
+ }
+ // if there is a version mismatch, we throw an exception, which means the service is unusable
+ checkVersion(tmpDb);
+ executors = reloadRegisteredExecutors(tmpDb);
+ db = tmpDb;
+ } else {
+ db = null;
+ executors = Maps.newConcurrentMap();
+ }
this.directoryCleaner = directoryCleaner;
}
@@ -81,6 +148,15 @@ public class ExternalShuffleBlockResolver {
ExecutorShuffleInfo executorInfo) {
AppExecId fullId = new AppExecId(appId, execId);
logger.info("Registered executor {} with {}", fullId, executorInfo);
+ try {
+ if (db != null) {
+ byte[] key = dbAppExecKey(fullId);
+ byte[] value = mapper.writeValueAsString(executorInfo).getBytes(Charsets.UTF_8);
+ db.put(key, value);
+ }
+ } catch (Exception e) {
+ logger.error("Error saving registered executors", e);
+ }
executors.put(fullId, executorInfo);
}
@@ -136,6 +212,13 @@ public class ExternalShuffleBlockResolver {
// Only touch executors associated with the appId that was removed.
if (appId.equals(fullId.appId)) {
it.remove();
+ if (db != null) {
+ try {
+ db.delete(dbAppExecKey(fullId));
+ } catch (IOException e) {
+ logger.error("Error deleting {} from executor state db", appId, e);
+ }
+ }
if (cleanupLocalDirs) {
logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length);
@@ -220,12 +303,23 @@ public class ExternalShuffleBlockResolver {
return new File(new File(localDir, String.format("%02x", subDirId)), filename);
}
+ void close() {
+ if (db != null) {
+ try {
+ db.close();
+ } catch (IOException e) {
+ logger.error("Exception closing leveldb with registered executors", e);
+ }
+ }
+ }
+
/** Simply encodes an executor's full ID, which is appId + execId. */
- private static class AppExecId {
- final String appId;
- final String execId;
+ public static class AppExecId {
+ public final String appId;
+ public final String execId;
- private AppExecId(String appId, String execId) {
+ @JsonCreator
+ public AppExecId(@JsonProperty("appId") String appId, @JsonProperty("execId") String execId) {
this.appId = appId;
this.execId = execId;
}
@@ -252,4 +346,105 @@ public class ExternalShuffleBlockResolver {
.toString();
}
}
+
+ private static byte[] dbAppExecKey(AppExecId appExecId) throws IOException {
+ // we stick a common prefix on all the keys so we can find them in the DB
+ String appExecJson = mapper.writeValueAsString(appExecId);
+ String key = (APP_KEY_PREFIX + ";" + appExecJson);
+ return key.getBytes(Charsets.UTF_8);
+ }
+
+ private static AppExecId parseDbAppExecKey(String s) throws IOException {
+ if (!s.startsWith(APP_KEY_PREFIX)) {
+ throw new IllegalArgumentException("expected a string starting with " + APP_KEY_PREFIX);
+ }
+ String json = s.substring(APP_KEY_PREFIX.length() + 1);
+ AppExecId parsed = mapper.readValue(json, AppExecId.class);
+ return parsed;
+ }
+
+ @VisibleForTesting
+ static ConcurrentMap<AppExecId, ExecutorShuffleInfo> reloadRegisteredExecutors(DB db)
+ throws IOException {
+ ConcurrentMap<AppExecId, ExecutorShuffleInfo> registeredExecutors = Maps.newConcurrentMap();
+ if (db != null) {
+ DBIterator itr = db.iterator();
+ itr.seek(APP_KEY_PREFIX.getBytes(Charsets.UTF_8));
+ while (itr.hasNext()) {
+ Map.Entry<byte[], byte[]> e = itr.next();
+ String key = new String(e.getKey(), Charsets.UTF_8);
+ if (!key.startsWith(APP_KEY_PREFIX)) {
+ break;
+ }
+ AppExecId id = parseDbAppExecKey(key);
+ ExecutorShuffleInfo shuffleInfo = mapper.readValue(e.getValue(), ExecutorShuffleInfo.class);
+ registeredExecutors.put(id, shuffleInfo);
+ }
+ }
+ return registeredExecutors;
+ }
+
+ private static class LevelDBLogger implements org.iq80.leveldb.Logger {
+ private static final Logger LOG = LoggerFactory.getLogger(LevelDBLogger.class);
+
+ @Override
+ public void log(String message) {
+ LOG.info(message);
+ }
+ }
+
+ /**
+ * Simple major.minor versioning scheme. Any incompatible changes should be across major
+ * versions. Minor version differences are allowed -- meaning we should be able to read
+ * dbs that are either earlier *or* later on the minor version.
+ */
+ private static void checkVersion(DB db) throws IOException {
+ byte[] bytes = db.get(StoreVersion.KEY);
+ if (bytes == null) {
+ storeVersion(db);
+ } else {
+ StoreVersion version = mapper.readValue(bytes, StoreVersion.class);
+ if (version.major != CURRENT_VERSION.major) {
+ throw new IOException("cannot read state DB with version " + version + ", incompatible " +
+ "with current version " + CURRENT_VERSION);
+ }
+ storeVersion(db);
+ }
+ }
+
+ private static void storeVersion(DB db) throws IOException {
+ db.put(StoreVersion.KEY, mapper.writeValueAsBytes(CURRENT_VERSION));
+ }
+
+
+ public static class StoreVersion {
+
+ final static byte[] KEY = "StoreVersion".getBytes(Charsets.UTF_8);
+
+ public final int major;
+ public final int minor;
+
+ @JsonCreator public StoreVersion(@JsonProperty("major") int major, @JsonProperty("minor") int minor) {
+ this.major = major;
+ this.minor = minor;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ StoreVersion that = (StoreVersion) o;
+
+ return major == that.major && minor == that.minor;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = major;
+ result = 31 * result + minor;
+ return result;
+ }
+ }
+
}
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
index cadc8e8369..102d4efb8b 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
@@ -19,6 +19,8 @@ package org.apache.spark.network.shuffle.protocol;
import java.util.Arrays;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
import io.netty.buffer.ByteBuf;
@@ -34,7 +36,11 @@ public class ExecutorShuffleInfo implements Encodable {
/** Shuffle manager (SortShuffleManager or HashShuffleManager) that the executor is using. */
public final String shuffleManager;
- public ExecutorShuffleInfo(String[] localDirs, int subDirsPerLocalDir, String shuffleManager) {
+ @JsonCreator
+ public ExecutorShuffleInfo(
+ @JsonProperty("localDirs") String[] localDirs,
+ @JsonProperty("subDirsPerLocalDir") int subDirsPerLocalDir,
+ @JsonProperty("shuffleManager") String shuffleManager) {
this.localDirs = localDirs;
this.subDirsPerLocalDir = subDirsPerLocalDir;
this.shuffleManager = shuffleManager;
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
index d02f4f0fdb..3c6cb367de 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
@@ -21,9 +21,12 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.CharStreams;
+import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.util.SystemPropertyConfigProvider;
import org.apache.spark.network.util.TransportConf;
+import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -59,8 +62,8 @@ public class ExternalShuffleBlockResolverSuite {
}
@Test
- public void testBadRequests() {
- ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf);
+ public void testBadRequests() throws IOException {
+ ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
// Unregistered executor
try {
resolver.getBlockData("app0", "exec1", "shuffle_1_1_0");
@@ -91,7 +94,7 @@ public class ExternalShuffleBlockResolverSuite {
@Test
public void testSortShuffleBlocks() throws IOException {
- ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf);
+ ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
resolver.registerExecutor("app0", "exec0",
dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager"));
@@ -110,7 +113,7 @@ public class ExternalShuffleBlockResolverSuite {
@Test
public void testHashShuffleBlocks() throws IOException {
- ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf);
+ ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
resolver.registerExecutor("app0", "exec0",
dataContext.createExecutorInfo("org.apache.spark.shuffle.hash.HashShuffleManager"));
@@ -126,4 +129,28 @@ public class ExternalShuffleBlockResolverSuite {
block1Stream.close();
assertEquals(hashBlock1, block1);
}
+
+ @Test
+ public void jsonSerializationOfExecutorRegistration() throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ AppExecId appId = new AppExecId("foo", "bar");
+ String appIdJson = mapper.writeValueAsString(appId);
+ AppExecId parsedAppId = mapper.readValue(appIdJson, AppExecId.class);
+ assertEquals(parsedAppId, appId);
+
+ ExecutorShuffleInfo shuffleInfo =
+ new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, "hash");
+ String shuffleJson = mapper.writeValueAsString(shuffleInfo);
+ ExecutorShuffleInfo parsedShuffleInfo =
+ mapper.readValue(shuffleJson, ExecutorShuffleInfo.class);
+ assertEquals(parsedShuffleInfo, shuffleInfo);
+
+ // Intentionally keep these hard-coded strings in here, to check backwards-compatability.
+ // its not legacy yet, but keeping this here in case anybody changes it
+ String legacyAppIdJson = "{\"appId\":\"foo\", \"execId\":\"bar\"}";
+ assertEquals(appId, mapper.readValue(legacyAppIdJson, AppExecId.class));
+ String legacyShuffleJson = "{\"localDirs\": [\"/bippy\", \"/flippy\"], " +
+ "\"subDirsPerLocalDir\": 7, \"shuffleManager\": \"hash\"}";
+ assertEquals(shuffleInfo, mapper.readValue(legacyShuffleJson, ExecutorShuffleInfo.class));
+ }
}
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
index d9d9c1bf2f..2f4f1d0df4 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
@@ -42,7 +42,7 @@ public class ExternalShuffleCleanupSuite {
TestShuffleDataContext dataContext = createSomeData();
ExternalShuffleBlockResolver resolver =
- new ExternalShuffleBlockResolver(conf, sameThreadExecutor);
+ new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
resolver.applicationRemoved("app", false /* cleanup */);
@@ -65,7 +65,8 @@ public class ExternalShuffleCleanupSuite {
@Override public void execute(Runnable runnable) { cleanupCalled.set(true); }
};
- ExternalShuffleBlockResolver manager = new ExternalShuffleBlockResolver(conf, noThreadExecutor);
+ ExternalShuffleBlockResolver manager =
+ new ExternalShuffleBlockResolver(conf, null, noThreadExecutor);
manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
manager.applicationRemoved("app", true);
@@ -83,7 +84,7 @@ public class ExternalShuffleCleanupSuite {
TestShuffleDataContext dataContext1 = createSomeData();
ExternalShuffleBlockResolver resolver =
- new ExternalShuffleBlockResolver(conf, sameThreadExecutor);
+ new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
resolver.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
resolver.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr"));
@@ -99,7 +100,7 @@ public class ExternalShuffleCleanupSuite {
TestShuffleDataContext dataContext1 = createSomeData();
ExternalShuffleBlockResolver resolver =
- new ExternalShuffleBlockResolver(conf, sameThreadExecutor);
+ new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
resolver.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
resolver.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr"));
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index 39aa49911d..a3f9a38b1a 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -92,7 +92,7 @@ public class ExternalShuffleIntegrationSuite {
dataContext1.insertHashShuffleData(1, 0, exec1Blocks);
conf = new TransportConf(new SystemPropertyConfigProvider());
- handler = new ExternalShuffleBlockHandler(conf);
+ handler = new ExternalShuffleBlockHandler(conf, null);
TransportContext transportContext = new TransportContext(conf, handler);
server = transportContext.createServer();
}
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
index d4ec1956c1..aa99efda94 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
@@ -43,8 +43,9 @@ public class ExternalShuffleSecuritySuite {
TransportServer server;
@Before
- public void beforeEach() {
- TransportContext context = new TransportContext(conf, new ExternalShuffleBlockHandler(conf));
+ public void beforeEach() throws IOException {
+ TransportContext context =
+ new TransportContext(conf, new ExternalShuffleBlockHandler(conf, null));
TransportServerBootstrap bootstrap = new SaslServerBootstrap(conf,
new TestSecretKeyHolder("my-app-id", "secret"));
this.server = context.createServer(Arrays.asList(bootstrap));
diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index 463f99ef33..11ea7f3fd3 100644
--- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -17,25 +17,21 @@
package org.apache.spark.network.yarn;
+import java.io.File;
import java.nio.ByteBuffer;
import java.util.List;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.server.api.AuxiliaryService;
-import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
-import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
-import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
-import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
+import org.apache.hadoop.yarn.server.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.sasl.SaslServerBootstrap;
import org.apache.spark.network.sasl.ShuffleSecretManager;
-import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.TransportServerBootstrap;
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
@@ -79,11 +75,26 @@ public class YarnShuffleService extends AuxiliaryService {
private TransportServer shuffleServer = null;
// Handles registering executors and opening shuffle blocks
- private ExternalShuffleBlockHandler blockHandler;
+ @VisibleForTesting
+ ExternalShuffleBlockHandler blockHandler;
+
+ // Where to store & reload executor info for recovering state after an NM restart
+ @VisibleForTesting
+ File registeredExecutorFile;
+
+ // just for testing when you want to find an open port
+ @VisibleForTesting
+ static int boundPort = -1;
+
+ // just for integration tests that want to look at this file -- in general not sensible as
+ // a static
+ @VisibleForTesting
+ static YarnShuffleService instance;
public YarnShuffleService() {
super("spark_shuffle");
logger.info("Initializing YARN shuffle service for Spark");
+ instance = this;
}
/**
@@ -100,11 +111,24 @@ public class YarnShuffleService extends AuxiliaryService {
*/
@Override
protected void serviceInit(Configuration conf) {
+
+ // In case this NM was killed while there were running spark applications, we need to restore
+ // lost state for the existing executors. We look for an existing file in the NM's local dirs.
+ // If we don't find one, then we choose a file to use to save the state next time. Even if
+ // an application was stopped while the NM was down, we expect yarn to call stopApplication()
+ // when it comes back
+ registeredExecutorFile =
+ findRegisteredExecutorFile(conf.getStrings("yarn.nodemanager.local-dirs"));
+
TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf));
// If authentication is enabled, set up the shuffle server to use a
// special RPC handler that filters out unauthenticated fetch requests
boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
- blockHandler = new ExternalShuffleBlockHandler(transportConf);
+ try {
+ blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile);
+ } catch (Exception e) {
+ logger.error("Failed to initialize external shuffle service", e);
+ }
List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
if (authEnabled) {
@@ -116,9 +140,13 @@ public class YarnShuffleService extends AuxiliaryService {
SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
TransportContext transportContext = new TransportContext(transportConf, blockHandler);
shuffleServer = transportContext.createServer(port, bootstraps);
+ // the port should normally be fixed, but for tests its useful to find an open port
+ port = shuffleServer.getPort();
+ boundPort = port;
String authEnabledString = authEnabled ? "enabled" : "not enabled";
logger.info("Started YARN shuffle service for Spark on port {}. " +
- "Authentication is {}.", port, authEnabledString);
+ "Authentication is {}. Registered executor file is {}", port, authEnabledString,
+ registeredExecutorFile);
}
@Override
@@ -161,6 +189,16 @@ public class YarnShuffleService extends AuxiliaryService {
logger.info("Stopping container {}", containerId);
}
+ private File findRegisteredExecutorFile(String[] localDirs) {
+ for (String dir: localDirs) {
+ File f = new File(dir, "registeredExecutors.ldb");
+ if (f.exists()) {
+ return f;
+ }
+ }
+ return new File(localDirs[0], "registeredExecutors.ldb");
+ }
+
/**
* Close the shuffle server to clean up any associated state.
*/
@@ -170,6 +208,9 @@ public class YarnShuffleService extends AuxiliaryService {
if (shuffleServer != null) {
shuffleServer.close();
}
+ if (blockHandler != null) {
+ blockHandler.close();
+ }
} catch (Exception e) {
logger.error("Exception when stopping service", e);
}
@@ -180,5 +221,4 @@ public class YarnShuffleService extends AuxiliaryService {
public ByteBuffer getMetaData() {
return ByteBuffer.allocate(0);
}
-
}