diff options
Diffstat (limited to 'network')
9 files changed, 353 insertions, 46 deletions
diff --git a/network/shuffle/pom.xml b/network/shuffle/pom.xml index 532463e96f..3d2edf9d94 100644 --- a/network/shuffle/pom.xml +++ b/network/shuffle/pom.xml @@ -43,6 +43,22 @@ <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.fusesource.leveldbjni</groupId> + <artifactId>leveldbjni-all</artifactId> + <version>1.8</version> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </dependency> + <!-- Provided dependencies --> <dependency> <groupId>org.slf4j</groupId> diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index db9dc4f17c..0df1dd621f 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -17,11 +17,12 @@ package org.apache.spark.network.shuffle; +import java.io.File; +import java.io.IOException; import java.util.List; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; -import org.apache.spark.network.util.TransportConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,10 +32,10 @@ import org.apache.spark.network.client.TransportClient; import org.apache.spark.network.server.OneForOneStreamManager; import org.apache.spark.network.server.RpcHandler; import org.apache.spark.network.server.StreamManager; -import org.apache.spark.network.shuffle.protocol.BlockTransferMessage; -import org.apache.spark.network.shuffle.protocol.OpenBlocks; -import org.apache.spark.network.shuffle.protocol.RegisterExecutor; -import org.apache.spark.network.shuffle.protocol.StreamHandle; +import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId; +import org.apache.spark.network.shuffle.protocol.*; +import org.apache.spark.network.util.TransportConf; + /** * RPC Handler for a server which can serve shuffle blocks from outside of an Executor process. @@ -46,11 +47,13 @@ import org.apache.spark.network.shuffle.protocol.StreamHandle; public class ExternalShuffleBlockHandler extends RpcHandler { private final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockHandler.class); - private final ExternalShuffleBlockResolver blockManager; + @VisibleForTesting + final ExternalShuffleBlockResolver blockManager; private final OneForOneStreamManager streamManager; - public ExternalShuffleBlockHandler(TransportConf conf) { - this(new OneForOneStreamManager(), new ExternalShuffleBlockResolver(conf)); + public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile) throws IOException { + this(new OneForOneStreamManager(), + new ExternalShuffleBlockResolver(conf, registeredExecutorFile)); } /** Enables mocking out the StreamManager and BlockManager. */ @@ -105,4 +108,22 @@ public class ExternalShuffleBlockHandler extends RpcHandler { public void applicationRemoved(String appId, boolean cleanupLocalDirs) { blockManager.applicationRemoved(appId, cleanupLocalDirs); } + + /** + * Register an (application, executor) with the given shuffle info. + * + * The "re-" is meant to highlight the intended use of this method -- when this service is + * restarted, this is used to restore the state of executors from before the restart. Normal + * registration will happen via a message handled in receive() + * + * @param appExecId + * @param executorInfo + */ + public void reregisterExecutor(AppExecId appExecId, ExecutorShuffleInfo executorInfo) { + blockManager.registerExecutor(appExecId.appId, appExecId.execId, executorInfo); + } + + public void close() { + blockManager.close(); + } } diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 022ed88a16..79beec4429 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -17,19 +17,24 @@ package org.apache.spark.network.shuffle; -import java.io.DataInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.Iterator; -import java.util.Map; +import java.io.*; +import java.util.*; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Charsets; import com.google.common.base.Objects; import com.google.common.collect.Maps; +import org.fusesource.leveldbjni.JniDBFactory; +import org.fusesource.leveldbjni.internal.NativeDB; +import org.iq80.leveldb.DB; +import org.iq80.leveldb.DBIterator; +import org.iq80.leveldb.Options; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,25 +57,87 @@ import org.apache.spark.network.util.TransportConf; public class ExternalShuffleBlockResolver { private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockResolver.class); + private static final ObjectMapper mapper = new ObjectMapper(); + /** + * This a common prefix to the key for each app registration we stick in leveldb, so they + * are easy to find, since leveldb lets you search based on prefix. + */ + private static final String APP_KEY_PREFIX = "AppExecShuffleInfo"; + private static final StoreVersion CURRENT_VERSION = new StoreVersion(1, 0); + // Map containing all registered executors' metadata. - private final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors; + @VisibleForTesting + final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors; // Single-threaded Java executor used to perform expensive recursive directory deletion. private final Executor directoryCleaner; private final TransportConf conf; - public ExternalShuffleBlockResolver(TransportConf conf) { - this(conf, Executors.newSingleThreadExecutor( + @VisibleForTesting + final File registeredExecutorFile; + @VisibleForTesting + final DB db; + + public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile) + throws IOException { + this(conf, registeredExecutorFile, Executors.newSingleThreadExecutor( // Add `spark` prefix because it will run in NM in Yarn mode. NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner"))); } // Allows tests to have more control over when directories are cleaned up. @VisibleForTesting - ExternalShuffleBlockResolver(TransportConf conf, Executor directoryCleaner) { + ExternalShuffleBlockResolver( + TransportConf conf, + File registeredExecutorFile, + Executor directoryCleaner) throws IOException { this.conf = conf; - this.executors = Maps.newConcurrentMap(); + this.registeredExecutorFile = registeredExecutorFile; + if (registeredExecutorFile != null) { + Options options = new Options(); + options.createIfMissing(false); + options.logger(new LevelDBLogger()); + DB tmpDb; + try { + tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options); + } catch (NativeDB.DBException e) { + if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { + logger.info("Creating state database at " + registeredExecutorFile); + options.createIfMissing(true); + try { + tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options); + } catch (NativeDB.DBException dbExc) { + throw new IOException("Unable to create state store", dbExc); + } + } else { + // the leveldb file seems to be corrupt somehow. Lets just blow it away and create a new + // one, so we can keep processing new apps + logger.error("error opening leveldb file {}. Creating new file, will not be able to " + + "recover state for existing applications", registeredExecutorFile, e); + if (registeredExecutorFile.isDirectory()) { + for (File f : registeredExecutorFile.listFiles()) { + f.delete(); + } + } + registeredExecutorFile.delete(); + options.createIfMissing(true); + try { + tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options); + } catch (NativeDB.DBException dbExc) { + throw new IOException("Unable to create state store", dbExc); + } + + } + } + // if there is a version mismatch, we throw an exception, which means the service is unusable + checkVersion(tmpDb); + executors = reloadRegisteredExecutors(tmpDb); + db = tmpDb; + } else { + db = null; + executors = Maps.newConcurrentMap(); + } this.directoryCleaner = directoryCleaner; } @@ -81,6 +148,15 @@ public class ExternalShuffleBlockResolver { ExecutorShuffleInfo executorInfo) { AppExecId fullId = new AppExecId(appId, execId); logger.info("Registered executor {} with {}", fullId, executorInfo); + try { + if (db != null) { + byte[] key = dbAppExecKey(fullId); + byte[] value = mapper.writeValueAsString(executorInfo).getBytes(Charsets.UTF_8); + db.put(key, value); + } + } catch (Exception e) { + logger.error("Error saving registered executors", e); + } executors.put(fullId, executorInfo); } @@ -136,6 +212,13 @@ public class ExternalShuffleBlockResolver { // Only touch executors associated with the appId that was removed. if (appId.equals(fullId.appId)) { it.remove(); + if (db != null) { + try { + db.delete(dbAppExecKey(fullId)); + } catch (IOException e) { + logger.error("Error deleting {} from executor state db", appId, e); + } + } if (cleanupLocalDirs) { logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length); @@ -220,12 +303,23 @@ public class ExternalShuffleBlockResolver { return new File(new File(localDir, String.format("%02x", subDirId)), filename); } + void close() { + if (db != null) { + try { + db.close(); + } catch (IOException e) { + logger.error("Exception closing leveldb with registered executors", e); + } + } + } + /** Simply encodes an executor's full ID, which is appId + execId. */ - private static class AppExecId { - final String appId; - final String execId; + public static class AppExecId { + public final String appId; + public final String execId; - private AppExecId(String appId, String execId) { + @JsonCreator + public AppExecId(@JsonProperty("appId") String appId, @JsonProperty("execId") String execId) { this.appId = appId; this.execId = execId; } @@ -252,4 +346,105 @@ public class ExternalShuffleBlockResolver { .toString(); } } + + private static byte[] dbAppExecKey(AppExecId appExecId) throws IOException { + // we stick a common prefix on all the keys so we can find them in the DB + String appExecJson = mapper.writeValueAsString(appExecId); + String key = (APP_KEY_PREFIX + ";" + appExecJson); + return key.getBytes(Charsets.UTF_8); + } + + private static AppExecId parseDbAppExecKey(String s) throws IOException { + if (!s.startsWith(APP_KEY_PREFIX)) { + throw new IllegalArgumentException("expected a string starting with " + APP_KEY_PREFIX); + } + String json = s.substring(APP_KEY_PREFIX.length() + 1); + AppExecId parsed = mapper.readValue(json, AppExecId.class); + return parsed; + } + + @VisibleForTesting + static ConcurrentMap<AppExecId, ExecutorShuffleInfo> reloadRegisteredExecutors(DB db) + throws IOException { + ConcurrentMap<AppExecId, ExecutorShuffleInfo> registeredExecutors = Maps.newConcurrentMap(); + if (db != null) { + DBIterator itr = db.iterator(); + itr.seek(APP_KEY_PREFIX.getBytes(Charsets.UTF_8)); + while (itr.hasNext()) { + Map.Entry<byte[], byte[]> e = itr.next(); + String key = new String(e.getKey(), Charsets.UTF_8); + if (!key.startsWith(APP_KEY_PREFIX)) { + break; + } + AppExecId id = parseDbAppExecKey(key); + ExecutorShuffleInfo shuffleInfo = mapper.readValue(e.getValue(), ExecutorShuffleInfo.class); + registeredExecutors.put(id, shuffleInfo); + } + } + return registeredExecutors; + } + + private static class LevelDBLogger implements org.iq80.leveldb.Logger { + private static final Logger LOG = LoggerFactory.getLogger(LevelDBLogger.class); + + @Override + public void log(String message) { + LOG.info(message); + } + } + + /** + * Simple major.minor versioning scheme. Any incompatible changes should be across major + * versions. Minor version differences are allowed -- meaning we should be able to read + * dbs that are either earlier *or* later on the minor version. + */ + private static void checkVersion(DB db) throws IOException { + byte[] bytes = db.get(StoreVersion.KEY); + if (bytes == null) { + storeVersion(db); + } else { + StoreVersion version = mapper.readValue(bytes, StoreVersion.class); + if (version.major != CURRENT_VERSION.major) { + throw new IOException("cannot read state DB with version " + version + ", incompatible " + + "with current version " + CURRENT_VERSION); + } + storeVersion(db); + } + } + + private static void storeVersion(DB db) throws IOException { + db.put(StoreVersion.KEY, mapper.writeValueAsBytes(CURRENT_VERSION)); + } + + + public static class StoreVersion { + + final static byte[] KEY = "StoreVersion".getBytes(Charsets.UTF_8); + + public final int major; + public final int minor; + + @JsonCreator public StoreVersion(@JsonProperty("major") int major, @JsonProperty("minor") int minor) { + this.major = major; + this.minor = minor; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + StoreVersion that = (StoreVersion) o; + + return major == that.major && minor == that.minor; + } + + @Override + public int hashCode() { + int result = major; + result = 31 * result + minor; + return result; + } + } + } diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java index cadc8e8369..102d4efb8b 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java @@ -19,6 +19,8 @@ package org.apache.spark.network.shuffle.protocol; import java.util.Arrays; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Objects; import io.netty.buffer.ByteBuf; @@ -34,7 +36,11 @@ public class ExecutorShuffleInfo implements Encodable { /** Shuffle manager (SortShuffleManager or HashShuffleManager) that the executor is using. */ public final String shuffleManager; - public ExecutorShuffleInfo(String[] localDirs, int subDirsPerLocalDir, String shuffleManager) { + @JsonCreator + public ExecutorShuffleInfo( + @JsonProperty("localDirs") String[] localDirs, + @JsonProperty("subDirsPerLocalDir") int subDirsPerLocalDir, + @JsonProperty("shuffleManager") String shuffleManager) { this.localDirs = localDirs; this.subDirsPerLocalDir = subDirsPerLocalDir; this.shuffleManager = shuffleManager; diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java index d02f4f0fdb..3c6cb367de 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java @@ -21,9 +21,12 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.io.CharStreams; +import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo; import org.apache.spark.network.util.SystemPropertyConfigProvider; import org.apache.spark.network.util.TransportConf; +import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -59,8 +62,8 @@ public class ExternalShuffleBlockResolverSuite { } @Test - public void testBadRequests() { - ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf); + public void testBadRequests() throws IOException { + ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null); // Unregistered executor try { resolver.getBlockData("app0", "exec1", "shuffle_1_1_0"); @@ -91,7 +94,7 @@ public class ExternalShuffleBlockResolverSuite { @Test public void testSortShuffleBlocks() throws IOException { - ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf); + ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null); resolver.registerExecutor("app0", "exec0", dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager")); @@ -110,7 +113,7 @@ public class ExternalShuffleBlockResolverSuite { @Test public void testHashShuffleBlocks() throws IOException { - ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf); + ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null); resolver.registerExecutor("app0", "exec0", dataContext.createExecutorInfo("org.apache.spark.shuffle.hash.HashShuffleManager")); @@ -126,4 +129,28 @@ public class ExternalShuffleBlockResolverSuite { block1Stream.close(); assertEquals(hashBlock1, block1); } + + @Test + public void jsonSerializationOfExecutorRegistration() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + AppExecId appId = new AppExecId("foo", "bar"); + String appIdJson = mapper.writeValueAsString(appId); + AppExecId parsedAppId = mapper.readValue(appIdJson, AppExecId.class); + assertEquals(parsedAppId, appId); + + ExecutorShuffleInfo shuffleInfo = + new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, "hash"); + String shuffleJson = mapper.writeValueAsString(shuffleInfo); + ExecutorShuffleInfo parsedShuffleInfo = + mapper.readValue(shuffleJson, ExecutorShuffleInfo.class); + assertEquals(parsedShuffleInfo, shuffleInfo); + + // Intentionally keep these hard-coded strings in here, to check backwards-compatability. + // its not legacy yet, but keeping this here in case anybody changes it + String legacyAppIdJson = "{\"appId\":\"foo\", \"execId\":\"bar\"}"; + assertEquals(appId, mapper.readValue(legacyAppIdJson, AppExecId.class)); + String legacyShuffleJson = "{\"localDirs\": [\"/bippy\", \"/flippy\"], " + + "\"subDirsPerLocalDir\": 7, \"shuffleManager\": \"hash\"}"; + assertEquals(shuffleInfo, mapper.readValue(legacyShuffleJson, ExecutorShuffleInfo.class)); + } } diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java index d9d9c1bf2f..2f4f1d0df4 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java @@ -42,7 +42,7 @@ public class ExternalShuffleCleanupSuite { TestShuffleDataContext dataContext = createSomeData(); ExternalShuffleBlockResolver resolver = - new ExternalShuffleBlockResolver(conf, sameThreadExecutor); + new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor); resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr")); resolver.applicationRemoved("app", false /* cleanup */); @@ -65,7 +65,8 @@ public class ExternalShuffleCleanupSuite { @Override public void execute(Runnable runnable) { cleanupCalled.set(true); } }; - ExternalShuffleBlockResolver manager = new ExternalShuffleBlockResolver(conf, noThreadExecutor); + ExternalShuffleBlockResolver manager = + new ExternalShuffleBlockResolver(conf, null, noThreadExecutor); manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr")); manager.applicationRemoved("app", true); @@ -83,7 +84,7 @@ public class ExternalShuffleCleanupSuite { TestShuffleDataContext dataContext1 = createSomeData(); ExternalShuffleBlockResolver resolver = - new ExternalShuffleBlockResolver(conf, sameThreadExecutor); + new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor); resolver.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr")); resolver.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr")); @@ -99,7 +100,7 @@ public class ExternalShuffleCleanupSuite { TestShuffleDataContext dataContext1 = createSomeData(); ExternalShuffleBlockResolver resolver = - new ExternalShuffleBlockResolver(conf, sameThreadExecutor); + new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor); resolver.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr")); resolver.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr")); diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java index 39aa49911d..a3f9a38b1a 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java @@ -92,7 +92,7 @@ public class ExternalShuffleIntegrationSuite { dataContext1.insertHashShuffleData(1, 0, exec1Blocks); conf = new TransportConf(new SystemPropertyConfigProvider()); - handler = new ExternalShuffleBlockHandler(conf); + handler = new ExternalShuffleBlockHandler(conf, null); TransportContext transportContext = new TransportContext(conf, handler); server = transportContext.createServer(); } diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java index d4ec1956c1..aa99efda94 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java @@ -43,8 +43,9 @@ public class ExternalShuffleSecuritySuite { TransportServer server; @Before - public void beforeEach() { - TransportContext context = new TransportContext(conf, new ExternalShuffleBlockHandler(conf)); + public void beforeEach() throws IOException { + TransportContext context = + new TransportContext(conf, new ExternalShuffleBlockHandler(conf, null)); TransportServerBootstrap bootstrap = new SaslServerBootstrap(conf, new TestSecretKeyHolder("my-app-id", "secret")); this.server = context.createServer(Arrays.asList(bootstrap)); diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 463f99ef33..11ea7f3fd3 100644 --- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -17,25 +17,21 @@ package org.apache.spark.network.yarn; +import java.io.File; import java.nio.ByteBuffer; import java.util.List; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.server.api.AuxiliaryService; -import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; -import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; -import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; -import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; +import org.apache.hadoop.yarn.server.api.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.spark.network.TransportContext; import org.apache.spark.network.sasl.SaslServerBootstrap; import org.apache.spark.network.sasl.ShuffleSecretManager; -import org.apache.spark.network.server.RpcHandler; import org.apache.spark.network.server.TransportServer; import org.apache.spark.network.server.TransportServerBootstrap; import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler; @@ -79,11 +75,26 @@ public class YarnShuffleService extends AuxiliaryService { private TransportServer shuffleServer = null; // Handles registering executors and opening shuffle blocks - private ExternalShuffleBlockHandler blockHandler; + @VisibleForTesting + ExternalShuffleBlockHandler blockHandler; + + // Where to store & reload executor info for recovering state after an NM restart + @VisibleForTesting + File registeredExecutorFile; + + // just for testing when you want to find an open port + @VisibleForTesting + static int boundPort = -1; + + // just for integration tests that want to look at this file -- in general not sensible as + // a static + @VisibleForTesting + static YarnShuffleService instance; public YarnShuffleService() { super("spark_shuffle"); logger.info("Initializing YARN shuffle service for Spark"); + instance = this; } /** @@ -100,11 +111,24 @@ public class YarnShuffleService extends AuxiliaryService { */ @Override protected void serviceInit(Configuration conf) { + + // In case this NM was killed while there were running spark applications, we need to restore + // lost state for the existing executors. We look for an existing file in the NM's local dirs. + // If we don't find one, then we choose a file to use to save the state next time. Even if + // an application was stopped while the NM was down, we expect yarn to call stopApplication() + // when it comes back + registeredExecutorFile = + findRegisteredExecutorFile(conf.getStrings("yarn.nodemanager.local-dirs")); + TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf)); // If authentication is enabled, set up the shuffle server to use a // special RPC handler that filters out unauthenticated fetch requests boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE); - blockHandler = new ExternalShuffleBlockHandler(transportConf); + try { + blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile); + } catch (Exception e) { + logger.error("Failed to initialize external shuffle service", e); + } List<TransportServerBootstrap> bootstraps = Lists.newArrayList(); if (authEnabled) { @@ -116,9 +140,13 @@ public class YarnShuffleService extends AuxiliaryService { SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT); TransportContext transportContext = new TransportContext(transportConf, blockHandler); shuffleServer = transportContext.createServer(port, bootstraps); + // the port should normally be fixed, but for tests its useful to find an open port + port = shuffleServer.getPort(); + boundPort = port; String authEnabledString = authEnabled ? "enabled" : "not enabled"; logger.info("Started YARN shuffle service for Spark on port {}. " + - "Authentication is {}.", port, authEnabledString); + "Authentication is {}. Registered executor file is {}", port, authEnabledString, + registeredExecutorFile); } @Override @@ -161,6 +189,16 @@ public class YarnShuffleService extends AuxiliaryService { logger.info("Stopping container {}", containerId); } + private File findRegisteredExecutorFile(String[] localDirs) { + for (String dir: localDirs) { + File f = new File(dir, "registeredExecutors.ldb"); + if (f.exists()) { + return f; + } + } + return new File(localDirs[0], "registeredExecutors.ldb"); + } + /** * Close the shuffle server to clean up any associated state. */ @@ -170,6 +208,9 @@ public class YarnShuffleService extends AuxiliaryService { if (shuffleServer != null) { shuffleServer.close(); } + if (blockHandler != null) { + blockHandler.close(); + } } catch (Exception e) { logger.error("Exception when stopping service", e); } @@ -180,5 +221,4 @@ public class YarnShuffleService extends AuxiliaryService { public ByteBuffer getMetaData() { return ByteBuffer.allocate(0); } - } |