aboutsummaryrefslogtreecommitdiff
path: root/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
diff options
context:
space:
mode:
Diffstat (limited to 'common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java')
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java120
1 files changed, 6 insertions, 114 deletions
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index d436711692..25e9abde70 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -34,17 +34,16 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Maps;
-import org.fusesource.leveldbjni.JniDBFactory;
-import org.fusesource.leveldbjni.internal.NativeDB;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
-import org.iq80.leveldb.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
+import org.apache.spark.network.util.LevelDBProvider;
+import org.apache.spark.network.util.LevelDBProvider.StoreVersion;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.NettyUtils;
import org.apache.spark.network.util.TransportConf;
@@ -114,52 +113,10 @@ public class ExternalShuffleBlockResolver {
};
shuffleIndexCache = CacheBuilder.newBuilder()
.maximumSize(indexCacheEntries).build(indexCacheLoader);
- if (registeredExecutorFile != null) {
- Options options = new Options();
- options.createIfMissing(false);
- options.logger(new LevelDBLogger());
- DB tmpDb;
- try {
- tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options);
- } catch (NativeDB.DBException e) {
- if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
- logger.info("Creating state database at " + registeredExecutorFile);
- options.createIfMissing(true);
- try {
- tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options);
- } catch (NativeDB.DBException dbExc) {
- throw new IOException("Unable to create state store", dbExc);
- }
- } else {
- // the leveldb file seems to be corrupt somehow. Lets just blow it away and create a new
- // one, so we can keep processing new apps
- logger.error("error opening leveldb file {}. Creating new file, will not be able to " +
- "recover state for existing applications", registeredExecutorFile, e);
- if (registeredExecutorFile.isDirectory()) {
- for (File f : registeredExecutorFile.listFiles()) {
- if (!f.delete()) {
- logger.warn("error deleting {}", f.getPath());
- }
- }
- }
- if (!registeredExecutorFile.delete()) {
- logger.warn("error deleting {}", registeredExecutorFile.getPath());
- }
- options.createIfMissing(true);
- try {
- tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options);
- } catch (NativeDB.DBException dbExc) {
- throw new IOException("Unable to create state store", dbExc);
- }
-
- }
- }
- // if there is a version mismatch, we throw an exception, which means the service is unusable
- checkVersion(tmpDb);
- executors = reloadRegisteredExecutors(tmpDb);
- db = tmpDb;
+ db = LevelDBProvider.initLevelDB(this.registeredExecutorFile, CURRENT_VERSION, mapper);
+ if (db != null) {
+ executors = reloadRegisteredExecutors(db);
} else {
- db = null;
executors = Maps.newConcurrentMap();
}
this.directoryCleaner = directoryCleaner;
@@ -384,76 +341,11 @@ public class ExternalShuffleBlockResolver {
break;
}
AppExecId id = parseDbAppExecKey(key);
+ logger.info("Reloading registered executors: " + id.toString());
ExecutorShuffleInfo shuffleInfo = mapper.readValue(e.getValue(), ExecutorShuffleInfo.class);
registeredExecutors.put(id, shuffleInfo);
}
}
return registeredExecutors;
}
-
- private static class LevelDBLogger implements org.iq80.leveldb.Logger {
- private static final Logger LOG = LoggerFactory.getLogger(LevelDBLogger.class);
-
- @Override
- public void log(String message) {
- LOG.info(message);
- }
- }
-
- /**
- * Simple major.minor versioning scheme. Any incompatible changes should be across major
- * versions. Minor version differences are allowed -- meaning we should be able to read
- * dbs that are either earlier *or* later on the minor version.
- */
- private static void checkVersion(DB db) throws IOException {
- byte[] bytes = db.get(StoreVersion.KEY);
- if (bytes == null) {
- storeVersion(db);
- } else {
- StoreVersion version = mapper.readValue(bytes, StoreVersion.class);
- if (version.major != CURRENT_VERSION.major) {
- throw new IOException("cannot read state DB with version " + version + ", incompatible " +
- "with current version " + CURRENT_VERSION);
- }
- storeVersion(db);
- }
- }
-
- private static void storeVersion(DB db) throws IOException {
- db.put(StoreVersion.KEY, mapper.writeValueAsBytes(CURRENT_VERSION));
- }
-
-
- public static class StoreVersion {
-
- static final byte[] KEY = "StoreVersion".getBytes(StandardCharsets.UTF_8);
-
- public final int major;
- public final int minor;
-
- @JsonCreator public StoreVersion(
- @JsonProperty("major") int major,
- @JsonProperty("minor") int minor) {
- this.major = major;
- this.minor = minor;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- StoreVersion that = (StoreVersion) o;
-
- return major == that.major && minor == that.minor;
- }
-
- @Override
- public int hashCode() {
- int result = major;
- result = 31 * result + minor;
- return result;
- }
- }
-
}