aboutsummaryrefslogtreecommitdiff
path: root/network
diff options
context:
space:
mode:
Diffstat (limited to 'network')
-rw-r--r--network/shuffle/pom.xml16
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java37
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java225
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java8
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java35
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java9
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java2
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java5
-rw-r--r--network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java62
9 files changed, 353 insertions, 46 deletions
diff --git a/network/shuffle/pom.xml b/network/shuffle/pom.xml
index 532463e96f..3d2edf9d94 100644
--- a/network/shuffle/pom.xml
+++ b/network/shuffle/pom.xml
@@ -43,6 +43,22 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.fusesource.leveldbjni</groupId>
+ <artifactId>leveldbjni-all</artifactId>
+ <version>1.8</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
+
<!-- Provided dependencies -->
<dependency>
<groupId>org.slf4j</groupId>
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index db9dc4f17c..0df1dd621f 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -17,11 +17,12 @@
package org.apache.spark.network.shuffle;
+import java.io.File;
+import java.io.IOException;
import java.util.List;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
-import org.apache.spark.network.util.TransportConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,10 +32,10 @@ import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
-import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
-import org.apache.spark.network.shuffle.protocol.OpenBlocks;
-import org.apache.spark.network.shuffle.protocol.RegisterExecutor;
-import org.apache.spark.network.shuffle.protocol.StreamHandle;
+import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId;
+import org.apache.spark.network.shuffle.protocol.*;
+import org.apache.spark.network.util.TransportConf;
+
/**
* RPC Handler for a server which can serve shuffle blocks from outside of an Executor process.
@@ -46,11 +47,13 @@ import org.apache.spark.network.shuffle.protocol.StreamHandle;
public class ExternalShuffleBlockHandler extends RpcHandler {
private final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockHandler.class);
- private final ExternalShuffleBlockResolver blockManager;
+ @VisibleForTesting
+ final ExternalShuffleBlockResolver blockManager;
private final OneForOneStreamManager streamManager;
- public ExternalShuffleBlockHandler(TransportConf conf) {
- this(new OneForOneStreamManager(), new ExternalShuffleBlockResolver(conf));
+ public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile) throws IOException {
+ this(new OneForOneStreamManager(),
+ new ExternalShuffleBlockResolver(conf, registeredExecutorFile));
}
/** Enables mocking out the StreamManager and BlockManager. */
@@ -105,4 +108,22 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
blockManager.applicationRemoved(appId, cleanupLocalDirs);
}
+
+ /**
+ * Register an (application, executor) with the given shuffle info.
+ *
+ * The "re-" is meant to highlight the intended use of this method -- when this service is
+ * restarted, this is used to restore the state of executors from before the restart. Normal
+ * registration will happen via a message handled in receive()
+ *
+ * @param appExecId
+ * @param executorInfo
+ */
+ public void reregisterExecutor(AppExecId appExecId, ExecutorShuffleInfo executorInfo) {
+ blockManager.registerExecutor(appExecId.appId, appExecId.execId, executorInfo);
+ }
+
+ public void close() {
+ blockManager.close();
+ }
}
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index 022ed88a16..79beec4429 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -17,19 +17,24 @@
package org.apache.spark.network.shuffle;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
+import java.io.*;
+import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
import com.google.common.base.Objects;
import com.google.common.collect.Maps;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.fusesource.leveldbjni.internal.NativeDB;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBIterator;
+import org.iq80.leveldb.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,25 +57,87 @@ import org.apache.spark.network.util.TransportConf;
public class ExternalShuffleBlockResolver {
private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockResolver.class);
+ private static final ObjectMapper mapper = new ObjectMapper();
+ /**
+ * This a common prefix to the key for each app registration we stick in leveldb, so they
+ * are easy to find, since leveldb lets you search based on prefix.
+ */
+ private static final String APP_KEY_PREFIX = "AppExecShuffleInfo";
+ private static final StoreVersion CURRENT_VERSION = new StoreVersion(1, 0);
+
// Map containing all registered executors' metadata.
- private final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
+ @VisibleForTesting
+ final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
// Single-threaded Java executor used to perform expensive recursive directory deletion.
private final Executor directoryCleaner;
private final TransportConf conf;
- public ExternalShuffleBlockResolver(TransportConf conf) {
- this(conf, Executors.newSingleThreadExecutor(
+ @VisibleForTesting
+ final File registeredExecutorFile;
+ @VisibleForTesting
+ final DB db;
+
+ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile)
+ throws IOException {
+ this(conf, registeredExecutorFile, Executors.newSingleThreadExecutor(
// Add `spark` prefix because it will run in NM in Yarn mode.
NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner")));
}
// Allows tests to have more control over when directories are cleaned up.
@VisibleForTesting
- ExternalShuffleBlockResolver(TransportConf conf, Executor directoryCleaner) {
+ ExternalShuffleBlockResolver(
+ TransportConf conf,
+ File registeredExecutorFile,
+ Executor directoryCleaner) throws IOException {
this.conf = conf;
- this.executors = Maps.newConcurrentMap();
+ this.registeredExecutorFile = registeredExecutorFile;
+ if (registeredExecutorFile != null) {
+ Options options = new Options();
+ options.createIfMissing(false);
+ options.logger(new LevelDBLogger());
+ DB tmpDb;
+ try {
+ tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options);
+ } catch (NativeDB.DBException e) {
+ if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
+ logger.info("Creating state database at " + registeredExecutorFile);
+ options.createIfMissing(true);
+ try {
+ tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options);
+ } catch (NativeDB.DBException dbExc) {
+ throw new IOException("Unable to create state store", dbExc);
+ }
+ } else {
+ // the leveldb file seems to be corrupt somehow. Lets just blow it away and create a new
+ // one, so we can keep processing new apps
+ logger.error("error opening leveldb file {}. Creating new file, will not be able to " +
+ "recover state for existing applications", registeredExecutorFile, e);
+ if (registeredExecutorFile.isDirectory()) {
+ for (File f : registeredExecutorFile.listFiles()) {
+ f.delete();
+ }
+ }
+ registeredExecutorFile.delete();
+ options.createIfMissing(true);
+ try {
+ tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options);
+ } catch (NativeDB.DBException dbExc) {
+ throw new IOException("Unable to create state store", dbExc);
+ }
+
+ }
+ }
+ // if there is a version mismatch, we throw an exception, which means the service is unusable
+ checkVersion(tmpDb);
+ executors = reloadRegisteredExecutors(tmpDb);
+ db = tmpDb;
+ } else {
+ db = null;
+ executors = Maps.newConcurrentMap();
+ }
this.directoryCleaner = directoryCleaner;
}
@@ -81,6 +148,15 @@ public class ExternalShuffleBlockResolver {
ExecutorShuffleInfo executorInfo) {
AppExecId fullId = new AppExecId(appId, execId);
logger.info("Registered executor {} with {}", fullId, executorInfo);
+ try {
+ if (db != null) {
+ byte[] key = dbAppExecKey(fullId);
+ byte[] value = mapper.writeValueAsString(executorInfo).getBytes(Charsets.UTF_8);
+ db.put(key, value);
+ }
+ } catch (Exception e) {
+ logger.error("Error saving registered executors", e);
+ }
executors.put(fullId, executorInfo);
}
@@ -136,6 +212,13 @@ public class ExternalShuffleBlockResolver {
// Only touch executors associated with the appId that was removed.
if (appId.equals(fullId.appId)) {
it.remove();
+ if (db != null) {
+ try {
+ db.delete(dbAppExecKey(fullId));
+ } catch (IOException e) {
+ logger.error("Error deleting {} from executor state db", appId, e);
+ }
+ }
if (cleanupLocalDirs) {
logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length);
@@ -220,12 +303,23 @@ public class ExternalShuffleBlockResolver {
return new File(new File(localDir, String.format("%02x", subDirId)), filename);
}
+ void close() {
+ if (db != null) {
+ try {
+ db.close();
+ } catch (IOException e) {
+ logger.error("Exception closing leveldb with registered executors", e);
+ }
+ }
+ }
+
/** Simply encodes an executor's full ID, which is appId + execId. */
- private static class AppExecId {
- final String appId;
- final String execId;
+ public static class AppExecId {
+ public final String appId;
+ public final String execId;
- private AppExecId(String appId, String execId) {
+ @JsonCreator
+ public AppExecId(@JsonProperty("appId") String appId, @JsonProperty("execId") String execId) {
this.appId = appId;
this.execId = execId;
}
@@ -252,4 +346,105 @@ public class ExternalShuffleBlockResolver {
.toString();
}
}
+
+ private static byte[] dbAppExecKey(AppExecId appExecId) throws IOException {
+ // we stick a common prefix on all the keys so we can find them in the DB
+ String appExecJson = mapper.writeValueAsString(appExecId);
+ String key = (APP_KEY_PREFIX + ";" + appExecJson);
+ return key.getBytes(Charsets.UTF_8);
+ }
+
+ private static AppExecId parseDbAppExecKey(String s) throws IOException {
+ if (!s.startsWith(APP_KEY_PREFIX)) {
+ throw new IllegalArgumentException("expected a string starting with " + APP_KEY_PREFIX);
+ }
+ String json = s.substring(APP_KEY_PREFIX.length() + 1);
+ AppExecId parsed = mapper.readValue(json, AppExecId.class);
+ return parsed;
+ }
+
+ @VisibleForTesting
+ static ConcurrentMap<AppExecId, ExecutorShuffleInfo> reloadRegisteredExecutors(DB db)
+ throws IOException {
+ ConcurrentMap<AppExecId, ExecutorShuffleInfo> registeredExecutors = Maps.newConcurrentMap();
+ if (db != null) {
+ DBIterator itr = db.iterator();
+ itr.seek(APP_KEY_PREFIX.getBytes(Charsets.UTF_8));
+ while (itr.hasNext()) {
+ Map.Entry<byte[], byte[]> e = itr.next();
+ String key = new String(e.getKey(), Charsets.UTF_8);
+ if (!key.startsWith(APP_KEY_PREFIX)) {
+ break;
+ }
+ AppExecId id = parseDbAppExecKey(key);
+ ExecutorShuffleInfo shuffleInfo = mapper.readValue(e.getValue(), ExecutorShuffleInfo.class);
+ registeredExecutors.put(id, shuffleInfo);
+ }
+ }
+ return registeredExecutors;
+ }
+
+ private static class LevelDBLogger implements org.iq80.leveldb.Logger {
+ private static final Logger LOG = LoggerFactory.getLogger(LevelDBLogger.class);
+
+ @Override
+ public void log(String message) {
+ LOG.info(message);
+ }
+ }
+
+ /**
+ * Simple major.minor versioning scheme. Any incompatible changes should be across major
+ * versions. Minor version differences are allowed -- meaning we should be able to read
+ * dbs that are either earlier *or* later on the minor version.
+ */
+ private static void checkVersion(DB db) throws IOException {
+ byte[] bytes = db.get(StoreVersion.KEY);
+ if (bytes == null) {
+ storeVersion(db);
+ } else {
+ StoreVersion version = mapper.readValue(bytes, StoreVersion.class);
+ if (version.major != CURRENT_VERSION.major) {
+ throw new IOException("cannot read state DB with version " + version + ", incompatible " +
+ "with current version " + CURRENT_VERSION);
+ }
+ storeVersion(db);
+ }
+ }
+
+ private static void storeVersion(DB db) throws IOException {
+ db.put(StoreVersion.KEY, mapper.writeValueAsBytes(CURRENT_VERSION));
+ }
+
+
+ public static class StoreVersion {
+
+ final static byte[] KEY = "StoreVersion".getBytes(Charsets.UTF_8);
+
+ public final int major;
+ public final int minor;
+
+ @JsonCreator public StoreVersion(@JsonProperty("major") int major, @JsonProperty("minor") int minor) {
+ this.major = major;
+ this.minor = minor;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ StoreVersion that = (StoreVersion) o;
+
+ return major == that.major && minor == that.minor;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = major;
+ result = 31 * result + minor;
+ return result;
+ }
+ }
+
}
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
index cadc8e8369..102d4efb8b 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
@@ -19,6 +19,8 @@ package org.apache.spark.network.shuffle.protocol;
import java.util.Arrays;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
import io.netty.buffer.ByteBuf;
@@ -34,7 +36,11 @@ public class ExecutorShuffleInfo implements Encodable {
/** Shuffle manager (SortShuffleManager or HashShuffleManager) that the executor is using. */
public final String shuffleManager;
- public ExecutorShuffleInfo(String[] localDirs, int subDirsPerLocalDir, String shuffleManager) {
+ @JsonCreator
+ public ExecutorShuffleInfo(
+ @JsonProperty("localDirs") String[] localDirs,
+ @JsonProperty("subDirsPerLocalDir") int subDirsPerLocalDir,
+ @JsonProperty("shuffleManager") String shuffleManager) {
this.localDirs = localDirs;
this.subDirsPerLocalDir = subDirsPerLocalDir;
this.shuffleManager = shuffleManager;
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
index d02f4f0fdb..3c6cb367de 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
@@ -21,9 +21,12 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.CharStreams;
+import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.util.SystemPropertyConfigProvider;
import org.apache.spark.network.util.TransportConf;
+import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -59,8 +62,8 @@ public class ExternalShuffleBlockResolverSuite {
}
@Test
- public void testBadRequests() {
- ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf);
+ public void testBadRequests() throws IOException {
+ ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
// Unregistered executor
try {
resolver.getBlockData("app0", "exec1", "shuffle_1_1_0");
@@ -91,7 +94,7 @@ public class ExternalShuffleBlockResolverSuite {
@Test
public void testSortShuffleBlocks() throws IOException {
- ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf);
+ ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
resolver.registerExecutor("app0", "exec0",
dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager"));
@@ -110,7 +113,7 @@ public class ExternalShuffleBlockResolverSuite {
@Test
public void testHashShuffleBlocks() throws IOException {
- ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf);
+ ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
resolver.registerExecutor("app0", "exec0",
dataContext.createExecutorInfo("org.apache.spark.shuffle.hash.HashShuffleManager"));
@@ -126,4 +129,28 @@ public class ExternalShuffleBlockResolverSuite {
block1Stream.close();
assertEquals(hashBlock1, block1);
}
+
+ @Test
+ public void jsonSerializationOfExecutorRegistration() throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ AppExecId appId = new AppExecId("foo", "bar");
+ String appIdJson = mapper.writeValueAsString(appId);
+ AppExecId parsedAppId = mapper.readValue(appIdJson, AppExecId.class);
+ assertEquals(parsedAppId, appId);
+
+ ExecutorShuffleInfo shuffleInfo =
+ new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, "hash");
+ String shuffleJson = mapper.writeValueAsString(shuffleInfo);
+ ExecutorShuffleInfo parsedShuffleInfo =
+ mapper.readValue(shuffleJson, ExecutorShuffleInfo.class);
+ assertEquals(parsedShuffleInfo, shuffleInfo);
+
+ // Intentionally keep these hard-coded strings in here, to check backwards-compatability.
+ // its not legacy yet, but keeping this here in case anybody changes it
+ String legacyAppIdJson = "{\"appId\":\"foo\", \"execId\":\"bar\"}";
+ assertEquals(appId, mapper.readValue(legacyAppIdJson, AppExecId.class));
+ String legacyShuffleJson = "{\"localDirs\": [\"/bippy\", \"/flippy\"], " +
+ "\"subDirsPerLocalDir\": 7, \"shuffleManager\": \"hash\"}";
+ assertEquals(shuffleInfo, mapper.readValue(legacyShuffleJson, ExecutorShuffleInfo.class));
+ }
}
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
index d9d9c1bf2f..2f4f1d0df4 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
@@ -42,7 +42,7 @@ public class ExternalShuffleCleanupSuite {
TestShuffleDataContext dataContext = createSomeData();
ExternalShuffleBlockResolver resolver =
- new ExternalShuffleBlockResolver(conf, sameThreadExecutor);
+ new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
resolver.applicationRemoved("app", false /* cleanup */);
@@ -65,7 +65,8 @@ public class ExternalShuffleCleanupSuite {
@Override public void execute(Runnable runnable) { cleanupCalled.set(true); }
};
- ExternalShuffleBlockResolver manager = new ExternalShuffleBlockResolver(conf, noThreadExecutor);
+ ExternalShuffleBlockResolver manager =
+ new ExternalShuffleBlockResolver(conf, null, noThreadExecutor);
manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
manager.applicationRemoved("app", true);
@@ -83,7 +84,7 @@ public class ExternalShuffleCleanupSuite {
TestShuffleDataContext dataContext1 = createSomeData();
ExternalShuffleBlockResolver resolver =
- new ExternalShuffleBlockResolver(conf, sameThreadExecutor);
+ new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
resolver.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
resolver.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr"));
@@ -99,7 +100,7 @@ public class ExternalShuffleCleanupSuite {
TestShuffleDataContext dataContext1 = createSomeData();
ExternalShuffleBlockResolver resolver =
- new ExternalShuffleBlockResolver(conf, sameThreadExecutor);
+ new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
resolver.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
resolver.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr"));
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index 39aa49911d..a3f9a38b1a 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -92,7 +92,7 @@ public class ExternalShuffleIntegrationSuite {
dataContext1.insertHashShuffleData(1, 0, exec1Blocks);
conf = new TransportConf(new SystemPropertyConfigProvider());
- handler = new ExternalShuffleBlockHandler(conf);
+ handler = new ExternalShuffleBlockHandler(conf, null);
TransportContext transportContext = new TransportContext(conf, handler);
server = transportContext.createServer();
}
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
index d4ec1956c1..aa99efda94 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
@@ -43,8 +43,9 @@ public class ExternalShuffleSecuritySuite {
TransportServer server;
@Before
- public void beforeEach() {
- TransportContext context = new TransportContext(conf, new ExternalShuffleBlockHandler(conf));
+ public void beforeEach() throws IOException {
+ TransportContext context =
+ new TransportContext(conf, new ExternalShuffleBlockHandler(conf, null));
TransportServerBootstrap bootstrap = new SaslServerBootstrap(conf,
new TestSecretKeyHolder("my-app-id", "secret"));
this.server = context.createServer(Arrays.asList(bootstrap));
diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index 463f99ef33..11ea7f3fd3 100644
--- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -17,25 +17,21 @@
package org.apache.spark.network.yarn;
+import java.io.File;
import java.nio.ByteBuffer;
import java.util.List;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.server.api.AuxiliaryService;
-import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
-import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
-import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
-import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
+import org.apache.hadoop.yarn.server.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.sasl.SaslServerBootstrap;
import org.apache.spark.network.sasl.ShuffleSecretManager;
-import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.TransportServerBootstrap;
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
@@ -79,11 +75,26 @@ public class YarnShuffleService extends AuxiliaryService {
private TransportServer shuffleServer = null;
// Handles registering executors and opening shuffle blocks
- private ExternalShuffleBlockHandler blockHandler;
+ @VisibleForTesting
+ ExternalShuffleBlockHandler blockHandler;
+
+ // Where to store & reload executor info for recovering state after an NM restart
+ @VisibleForTesting
+ File registeredExecutorFile;
+
+ // just for testing when you want to find an open port
+ @VisibleForTesting
+ static int boundPort = -1;
+
+ // just for integration tests that want to look at this file -- in general not sensible as
+ // a static
+ @VisibleForTesting
+ static YarnShuffleService instance;
public YarnShuffleService() {
super("spark_shuffle");
logger.info("Initializing YARN shuffle service for Spark");
+ instance = this;
}
/**
@@ -100,11 +111,24 @@ public class YarnShuffleService extends AuxiliaryService {
*/
@Override
protected void serviceInit(Configuration conf) {
+
+ // In case this NM was killed while there were running spark applications, we need to restore
+ // lost state for the existing executors. We look for an existing file in the NM's local dirs.
+ // If we don't find one, then we choose a file to use to save the state next time. Even if
+ // an application was stopped while the NM was down, we expect yarn to call stopApplication()
+ // when it comes back
+ registeredExecutorFile =
+ findRegisteredExecutorFile(conf.getStrings("yarn.nodemanager.local-dirs"));
+
TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf));
// If authentication is enabled, set up the shuffle server to use a
// special RPC handler that filters out unauthenticated fetch requests
boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
- blockHandler = new ExternalShuffleBlockHandler(transportConf);
+ try {
+ blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile);
+ } catch (Exception e) {
+ logger.error("Failed to initialize external shuffle service", e);
+ }
List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
if (authEnabled) {
@@ -116,9 +140,13 @@ public class YarnShuffleService extends AuxiliaryService {
SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
TransportContext transportContext = new TransportContext(transportConf, blockHandler);
shuffleServer = transportContext.createServer(port, bootstraps);
+ // the port should normally be fixed, but for tests its useful to find an open port
+ port = shuffleServer.getPort();
+ boundPort = port;
String authEnabledString = authEnabled ? "enabled" : "not enabled";
logger.info("Started YARN shuffle service for Spark on port {}. " +
- "Authentication is {}.", port, authEnabledString);
+ "Authentication is {}. Registered executor file is {}", port, authEnabledString,
+ registeredExecutorFile);
}
@Override
@@ -161,6 +189,16 @@ public class YarnShuffleService extends AuxiliaryService {
logger.info("Stopping container {}", containerId);
}
+ private File findRegisteredExecutorFile(String[] localDirs) {
+ for (String dir: localDirs) {
+ File f = new File(dir, "registeredExecutors.ldb");
+ if (f.exists()) {
+ return f;
+ }
+ }
+ return new File(localDirs[0], "registeredExecutors.ldb");
+ }
+
/**
* Close the shuffle server to clean up any associated state.
*/
@@ -170,6 +208,9 @@ public class YarnShuffleService extends AuxiliaryService {
if (shuffleServer != null) {
shuffleServer.close();
}
+ if (blockHandler != null) {
+ blockHandler.close();
+ }
} catch (Exception e) {
logger.error("Exception when stopping service", e);
}
@@ -180,5 +221,4 @@ public class YarnShuffleService extends AuxiliaryService {
public ByteBuffer getMetaData() {
return ByteBuffer.allocate(0);
}
-
}