aboutsummaryrefslogtreecommitdiff
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/network-common/pom.xml16
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/util/LevelDBProvider.java152
-rw-r--r--common/network-shuffle/pom.xml16
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java120
-rw-r--r--common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java135
5 files changed, 301 insertions, 138 deletions
diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml
index 81f0c6e226..fcefe64d59 100644
--- a/common/network-common/pom.xml
+++ b/common/network-common/pom.xml
@@ -45,6 +45,22 @@
<artifactId>commons-lang3</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.fusesource.leveldbjni</groupId>
+ <artifactId>leveldbjni-all</artifactId>
+ <version>1.8</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
+
<!-- Provided dependencies -->
<dependency>
<groupId>org.slf4j</groupId>
diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/LevelDBProvider.java b/common/network-common/src/main/java/org/apache/spark/network/util/LevelDBProvider.java
new file mode 100644
index 0000000000..ec900a7b3c
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/util/LevelDBProvider.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.fusesource.leveldbjni.internal.NativeDB;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * LevelDB utility class available in the network package.
+ */
+public class LevelDBProvider {
+ private static final Logger logger = LoggerFactory.getLogger(LevelDBProvider.class);
+
+ public static DB initLevelDB(File dbFile, StoreVersion version, ObjectMapper mapper) throws
+ IOException {
+ DB tmpDb = null;
+ if (dbFile != null) {
+ Options options = new Options();
+ options.createIfMissing(false);
+ options.logger(new LevelDBLogger());
+ try {
+ tmpDb = JniDBFactory.factory.open(dbFile, options);
+ } catch (NativeDB.DBException e) {
+ if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
+ logger.info("Creating state database at " + dbFile);
+ options.createIfMissing(true);
+ try {
+ tmpDb = JniDBFactory.factory.open(dbFile, options);
+ } catch (NativeDB.DBException dbExc) {
+ throw new IOException("Unable to create state store", dbExc);
+ }
+ } else {
+ // the leveldb file seems to be corrupt somehow. Lets just blow it away and create a new
+ // one, so we can keep processing new apps
+ logger.error("error opening leveldb file {}. Creating new file, will not be able to " +
+ "recover state for existing applications", dbFile, e);
+ if (dbFile.isDirectory()) {
+ for (File f : dbFile.listFiles()) {
+ if (!f.delete()) {
+ logger.warn("error deleting {}", f.getPath());
+ }
+ }
+ }
+ if (!dbFile.delete()) {
+ logger.warn("error deleting {}", dbFile.getPath());
+ }
+ options.createIfMissing(true);
+ try {
+ tmpDb = JniDBFactory.factory.open(dbFile, options);
+ } catch (NativeDB.DBException dbExc) {
+ throw new IOException("Unable to create state store", dbExc);
+ }
+
+ }
+ }
+ // if there is a version mismatch, we throw an exception, which means the service is unusable
+ checkVersion(tmpDb, version, mapper);
+ }
+ return tmpDb;
+ }
+
+ private static class LevelDBLogger implements org.iq80.leveldb.Logger {
+ private static final Logger LOG = LoggerFactory.getLogger(LevelDBLogger.class);
+
+ @Override
+ public void log(String message) {
+ LOG.info(message);
+ }
+ }
+
+ /**
+ * Simple major.minor versioning scheme. Any incompatible changes should be across major
+ * versions. Minor version differences are allowed -- meaning we should be able to read
+ * dbs that are either earlier *or* later on the minor version.
+ */
+ public static void checkVersion(DB db, StoreVersion newversion, ObjectMapper mapper) throws
+ IOException {
+ byte[] bytes = db.get(StoreVersion.KEY);
+ if (bytes == null) {
+ storeVersion(db, newversion, mapper);
+ } else {
+ StoreVersion version = mapper.readValue(bytes, StoreVersion.class);
+ if (version.major != newversion.major) {
+ throw new IOException("cannot read state DB with version " + version + ", incompatible " +
+ "with current version " + newversion);
+ }
+ storeVersion(db, newversion, mapper);
+ }
+ }
+
+ public static void storeVersion(DB db, StoreVersion version, ObjectMapper mapper)
+ throws IOException {
+ db.put(StoreVersion.KEY, mapper.writeValueAsBytes(version));
+ }
+
+ public static class StoreVersion {
+
+ final static byte[] KEY = "StoreVersion".getBytes(StandardCharsets.UTF_8);
+
+ public final int major;
+ public final int minor;
+
+ @JsonCreator
+ public StoreVersion(@JsonProperty("major") int major, @JsonProperty("minor") int minor) {
+ this.major = major;
+ this.minor = minor;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ StoreVersion that = (StoreVersion) o;
+
+ return major == that.major && minor == that.minor;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = major;
+ result = 31 * result + minor;
+ return result;
+ }
+ }
+}
diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml
index d211bd5bd1..511e1f29de 100644
--- a/common/network-shuffle/pom.xml
+++ b/common/network-shuffle/pom.xml
@@ -43,26 +43,10 @@
</dependency>
<dependency>
- <groupId>org.fusesource.leveldbjni</groupId>
- <artifactId>leveldbjni-all</artifactId>
- <version>1.8</version>
- </dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
-
- <dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-annotations</artifactId>
- </dependency>
-
<!-- Provided dependencies -->
<dependency>
<groupId>org.slf4j</groupId>
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index d436711692..25e9abde70 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -34,17 +34,16 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Maps;
-import org.fusesource.leveldbjni.JniDBFactory;
-import org.fusesource.leveldbjni.internal.NativeDB;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
-import org.iq80.leveldb.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
+import org.apache.spark.network.util.LevelDBProvider;
+import org.apache.spark.network.util.LevelDBProvider.StoreVersion;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.NettyUtils;
import org.apache.spark.network.util.TransportConf;
@@ -114,52 +113,10 @@ public class ExternalShuffleBlockResolver {
};
shuffleIndexCache = CacheBuilder.newBuilder()
.maximumSize(indexCacheEntries).build(indexCacheLoader);
- if (registeredExecutorFile != null) {
- Options options = new Options();
- options.createIfMissing(false);
- options.logger(new LevelDBLogger());
- DB tmpDb;
- try {
- tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options);
- } catch (NativeDB.DBException e) {
- if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
- logger.info("Creating state database at " + registeredExecutorFile);
- options.createIfMissing(true);
- try {
- tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options);
- } catch (NativeDB.DBException dbExc) {
- throw new IOException("Unable to create state store", dbExc);
- }
- } else {
- // the leveldb file seems to be corrupt somehow. Lets just blow it away and create a new
- // one, so we can keep processing new apps
- logger.error("error opening leveldb file {}. Creating new file, will not be able to " +
- "recover state for existing applications", registeredExecutorFile, e);
- if (registeredExecutorFile.isDirectory()) {
- for (File f : registeredExecutorFile.listFiles()) {
- if (!f.delete()) {
- logger.warn("error deleting {}", f.getPath());
- }
- }
- }
- if (!registeredExecutorFile.delete()) {
- logger.warn("error deleting {}", registeredExecutorFile.getPath());
- }
- options.createIfMissing(true);
- try {
- tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options);
- } catch (NativeDB.DBException dbExc) {
- throw new IOException("Unable to create state store", dbExc);
- }
-
- }
- }
- // if there is a version mismatch, we throw an exception, which means the service is unusable
- checkVersion(tmpDb);
- executors = reloadRegisteredExecutors(tmpDb);
- db = tmpDb;
+ db = LevelDBProvider.initLevelDB(this.registeredExecutorFile, CURRENT_VERSION, mapper);
+ if (db != null) {
+ executors = reloadRegisteredExecutors(db);
} else {
- db = null;
executors = Maps.newConcurrentMap();
}
this.directoryCleaner = directoryCleaner;
@@ -384,76 +341,11 @@ public class ExternalShuffleBlockResolver {
break;
}
AppExecId id = parseDbAppExecKey(key);
+ logger.info("Reloading registered executors: " + id.toString());
ExecutorShuffleInfo shuffleInfo = mapper.readValue(e.getValue(), ExecutorShuffleInfo.class);
registeredExecutors.put(id, shuffleInfo);
}
}
return registeredExecutors;
}
-
- private static class LevelDBLogger implements org.iq80.leveldb.Logger {
- private static final Logger LOG = LoggerFactory.getLogger(LevelDBLogger.class);
-
- @Override
- public void log(String message) {
- LOG.info(message);
- }
- }
-
- /**
- * Simple major.minor versioning scheme. Any incompatible changes should be across major
- * versions. Minor version differences are allowed -- meaning we should be able to read
- * dbs that are either earlier *or* later on the minor version.
- */
- private static void checkVersion(DB db) throws IOException {
- byte[] bytes = db.get(StoreVersion.KEY);
- if (bytes == null) {
- storeVersion(db);
- } else {
- StoreVersion version = mapper.readValue(bytes, StoreVersion.class);
- if (version.major != CURRENT_VERSION.major) {
- throw new IOException("cannot read state DB with version " + version + ", incompatible " +
- "with current version " + CURRENT_VERSION);
- }
- storeVersion(db);
- }
- }
-
- private static void storeVersion(DB db) throws IOException {
- db.put(StoreVersion.KEY, mapper.writeValueAsBytes(CURRENT_VERSION));
- }
-
-
- public static class StoreVersion {
-
- static final byte[] KEY = "StoreVersion".getBytes(StandardCharsets.UTF_8);
-
- public final int major;
- public final int minor;
-
- @JsonCreator public StoreVersion(
- @JsonProperty("major") int major,
- @JsonProperty("minor") int minor) {
- this.major = major;
- this.minor = minor;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- StoreVersion that = (StoreVersion) o;
-
- return major == that.major && minor == that.minor;
- }
-
- @Override
- public int hashCode() {
- int result = major;
- result = 31 * result + minor;
- return result;
- }
- }
-
}
diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index 2cf3f53e6d..df082e4a92 100644
--- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -18,15 +18,28 @@
package org.apache.spark.network.yarn;
import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Map;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.api.*;
+import org.apache.spark.network.util.LevelDBProvider;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBIterator;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,12 +82,26 @@ public class YarnShuffleService extends AuxiliaryService {
private static final boolean DEFAULT_SPARK_AUTHENTICATE = false;
private static final String RECOVERY_FILE_NAME = "registeredExecutors.ldb";
+ private static final String SECRETS_RECOVERY_FILE_NAME = "sparkShuffleRecovery.ldb";
// Whether failure during service initialization should stop the NM.
@VisibleForTesting
static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure";
private static final boolean DEFAULT_STOP_ON_FAILURE = false;
+ // just for testing when you want to find an open port
+ @VisibleForTesting
+ static int boundPort = -1;
+ private static final ObjectMapper mapper = new ObjectMapper();
+ private static final String APP_CREDS_KEY_PREFIX = "AppCreds";
+ private static final LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider
+ .StoreVersion(1, 0);
+
+ // just for integration tests that want to look at this file -- in general not sensible as
+ // a static
+ @VisibleForTesting
+ static YarnShuffleService instance;
+
// An entity that manages the shuffle secret per application
// This is used only if authentication is enabled
private ShuffleSecretManager secretManager;
@@ -96,14 +123,11 @@ public class YarnShuffleService extends AuxiliaryService {
@VisibleForTesting
File registeredExecutorFile;
- // just for testing when you want to find an open port
+ // Where to store & reload application secrets for recovering state after an NM restart
@VisibleForTesting
- static int boundPort = -1;
+ File secretsFile;
- // just for integration tests that want to look at this file -- in general not sensible as
- // a static
- @VisibleForTesting
- static YarnShuffleService instance;
+ private DB db;
public YarnShuffleService() {
super("spark_shuffle");
@@ -143,10 +167,10 @@ public class YarnShuffleService extends AuxiliaryService {
// If authentication is enabled, set up the shuffle server to use a
// special RPC handler that filters out unauthenticated fetch requests
- boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
+ boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
if (authEnabled) {
- secretManager = new ShuffleSecretManager();
+ createSecretManager();
bootstraps.add(new SaslServerBootstrap(transportConf, secretManager));
}
@@ -170,6 +194,50 @@ public class YarnShuffleService extends AuxiliaryService {
}
}
+ private void createSecretManager() throws IOException {
+ secretManager = new ShuffleSecretManager();
+ secretsFile = new File(getRecoveryPath().toUri().getPath(), SECRETS_RECOVERY_FILE_NAME);
+
+ // Make sure this is protected in case its not in the NM recovery dir
+ FileSystem fs = FileSystem.getLocal(_conf);
+ fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short)0700));
+
+ db = LevelDBProvider.initLevelDB(secretsFile, CURRENT_VERSION, mapper);
+ logger.info("Recovery location is: " + secretsFile.getPath());
+ if (db != null) {
+ logger.info("Going to reload spark shuffle data");
+ DBIterator itr = db.iterator();
+ itr.seek(APP_CREDS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+ while (itr.hasNext()) {
+ Map.Entry<byte[], byte[]> e = itr.next();
+ String key = new String(e.getKey(), StandardCharsets.UTF_8);
+ if (!key.startsWith(APP_CREDS_KEY_PREFIX)) {
+ break;
+ }
+ String id = parseDbAppKey(key);
+ ByteBuffer secret = mapper.readValue(e.getValue(), ByteBuffer.class);
+ logger.info("Reloading tokens for app: " + id);
+ secretManager.registerApp(id, secret);
+ }
+ }
+ }
+
+ private static String parseDbAppKey(String s) throws IOException {
+ if (!s.startsWith(APP_CREDS_KEY_PREFIX)) {
+ throw new IllegalArgumentException("expected a string starting with " + APP_CREDS_KEY_PREFIX);
+ }
+ String json = s.substring(APP_CREDS_KEY_PREFIX.length() + 1);
+ AppId parsed = mapper.readValue(json, AppId.class);
+ return parsed.appId;
+ }
+
+ private static byte[] dbAppKey(AppId appExecId) throws IOException {
+ // we stick a common prefix on all the keys so we can find them in the DB
+ String appExecJson = mapper.writeValueAsString(appExecId);
+ String key = (APP_CREDS_KEY_PREFIX + ";" + appExecJson);
+ return key.getBytes(StandardCharsets.UTF_8);
+ }
+
@Override
public void initializeApplication(ApplicationInitializationContext context) {
String appId = context.getApplicationId().toString();
@@ -177,6 +245,12 @@ public class YarnShuffleService extends AuxiliaryService {
ByteBuffer shuffleSecret = context.getApplicationDataForService();
logger.info("Initializing application {}", appId);
if (isAuthenticationEnabled()) {
+ AppId fullId = new AppId(appId);
+ if (db != null) {
+ byte[] key = dbAppKey(fullId);
+ byte[] value = mapper.writeValueAsString(shuffleSecret).getBytes(StandardCharsets.UTF_8);
+ db.put(key, value);
+ }
secretManager.registerApp(appId, shuffleSecret);
}
} catch (Exception e) {
@@ -190,6 +264,14 @@ public class YarnShuffleService extends AuxiliaryService {
try {
logger.info("Stopping application {}", appId);
if (isAuthenticationEnabled()) {
+ AppId fullId = new AppId(appId);
+ if (db != null) {
+ try {
+ db.delete(dbAppKey(fullId));
+ } catch (IOException e) {
+ logger.error("Error deleting {} from executor state db", appId, e);
+ }
+ }
secretManager.unregisterApp(appId);
}
blockHandler.applicationRemoved(appId, false /* clean up local dirs */);
@@ -222,6 +304,9 @@ public class YarnShuffleService extends AuxiliaryService {
if (blockHandler != null) {
blockHandler.close();
}
+ if (db != null) {
+ db.close();
+ }
} catch (Exception e) {
logger.error("Exception when stopping service", e);
}
@@ -275,4 +360,38 @@ public class YarnShuffleService extends AuxiliaryService {
return _recoveryPath;
}
+
+ /**
+ * Simply encodes an application ID.
+ */
+ public static class AppId {
+ public final String appId;
+
+ @JsonCreator
+ public AppId(@JsonProperty("appId") String appId) {
+ this.appId = appId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ AppId appExecId = (AppId) o;
+ return Objects.equal(appId, appExecId.appId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(appId);
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("appId", appId)
+ .toString();
+ }
+ }
+
}