diff options
Diffstat (limited to 'common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java')
-rw-r--r-- | common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java | 120 |
1 files changed, 6 insertions, 114 deletions
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index d436711692..25e9abde70 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -34,17 +34,16 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.Maps; -import org.fusesource.leveldbjni.JniDBFactory; -import org.fusesource.leveldbjni.internal.NativeDB; import org.iq80.leveldb.DB; import org.iq80.leveldb.DBIterator; -import org.iq80.leveldb.Options; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.spark.network.buffer.FileSegmentManagedBuffer; import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo; +import org.apache.spark.network.util.LevelDBProvider; +import org.apache.spark.network.util.LevelDBProvider.StoreVersion; import org.apache.spark.network.util.JavaUtils; import org.apache.spark.network.util.NettyUtils; import org.apache.spark.network.util.TransportConf; @@ -114,52 +113,10 @@ public class ExternalShuffleBlockResolver { }; shuffleIndexCache = CacheBuilder.newBuilder() .maximumSize(indexCacheEntries).build(indexCacheLoader); - if (registeredExecutorFile != null) { - Options options = new Options(); - options.createIfMissing(false); - options.logger(new LevelDBLogger()); - DB tmpDb; - try { - tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options); - } catch (NativeDB.DBException e) { - if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { - logger.info("Creating state database at " + registeredExecutorFile); - options.createIfMissing(true); - try { - tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options); - } catch (NativeDB.DBException dbExc) { - throw new IOException("Unable to create state store", dbExc); - } - } else { - // the leveldb file seems to be corrupt somehow. Lets just blow it away and create a new - // one, so we can keep processing new apps - logger.error("error opening leveldb file {}. Creating new file, will not be able to " + - "recover state for existing applications", registeredExecutorFile, e); - if (registeredExecutorFile.isDirectory()) { - for (File f : registeredExecutorFile.listFiles()) { - if (!f.delete()) { - logger.warn("error deleting {}", f.getPath()); - } - } - } - if (!registeredExecutorFile.delete()) { - logger.warn("error deleting {}", registeredExecutorFile.getPath()); - } - options.createIfMissing(true); - try { - tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options); - } catch (NativeDB.DBException dbExc) { - throw new IOException("Unable to create state store", dbExc); - } - - } - } - // if there is a version mismatch, we throw an exception, which means the service is unusable - checkVersion(tmpDb); - executors = reloadRegisteredExecutors(tmpDb); - db = tmpDb; + db = LevelDBProvider.initLevelDB(this.registeredExecutorFile, CURRENT_VERSION, mapper); + if (db != null) { + executors = reloadRegisteredExecutors(db); } else { - db = null; executors = Maps.newConcurrentMap(); } this.directoryCleaner = directoryCleaner; @@ -384,76 +341,11 @@ public class ExternalShuffleBlockResolver { break; } AppExecId id = parseDbAppExecKey(key); + logger.info("Reloading registered executors: " + id.toString()); ExecutorShuffleInfo shuffleInfo = mapper.readValue(e.getValue(), ExecutorShuffleInfo.class); registeredExecutors.put(id, shuffleInfo); } } return registeredExecutors; } - - private static class LevelDBLogger implements org.iq80.leveldb.Logger { - private static final Logger LOG = LoggerFactory.getLogger(LevelDBLogger.class); - - @Override - public void log(String message) { - LOG.info(message); - } - } - - /** - * Simple major.minor versioning scheme. Any incompatible changes should be across major - * versions. Minor version differences are allowed -- meaning we should be able to read - * dbs that are either earlier *or* later on the minor version. - */ - private static void checkVersion(DB db) throws IOException { - byte[] bytes = db.get(StoreVersion.KEY); - if (bytes == null) { - storeVersion(db); - } else { - StoreVersion version = mapper.readValue(bytes, StoreVersion.class); - if (version.major != CURRENT_VERSION.major) { - throw new IOException("cannot read state DB with version " + version + ", incompatible " + - "with current version " + CURRENT_VERSION); - } - storeVersion(db); - } - } - - private static void storeVersion(DB db) throws IOException { - db.put(StoreVersion.KEY, mapper.writeValueAsBytes(CURRENT_VERSION)); - } - - - public static class StoreVersion { - - static final byte[] KEY = "StoreVersion".getBytes(StandardCharsets.UTF_8); - - public final int major; - public final int minor; - - @JsonCreator public StoreVersion( - @JsonProperty("major") int major, - @JsonProperty("minor") int minor) { - this.major = major; - this.minor = minor; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - StoreVersion that = (StoreVersion) o; - - return major == that.major && minor == that.minor; - } - - @Override - public int hashCode() { - int result = major; - result = 31 * result + minor; - return result; - } - } - } |