aboutsummaryrefslogtreecommitdiff
path: root/common
diff options
context:
space:
mode:
authorSital Kedia <skedia@fb.com>2016-08-04 14:54:38 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-08-04 14:54:38 -0700
commit9c15d079df2418a1412269a702f3a7861daee61c (patch)
tree713b294ebc99767ad3143df8815bbe2014f2cd3e /common
parent0e2e5d7d0b42226c61c3200fd63d2831c558519d (diff)
downloadspark-9c15d079df2418a1412269a702f3a7861daee61c.tar.gz
spark-9c15d079df2418a1412269a702f3a7861daee61c.tar.bz2
spark-9c15d079df2418a1412269a702f3a7861daee61c.zip
[SPARK-15074][SHUFFLE] Cache shuffle index file to speedup shuffle fetch
## What changes were proposed in this pull request? Shuffle fetch on large intermediate dataset is slow because the shuffle service open/close the index file for each shuffle fetch. This change introduces a cache for the index information so that we can avoid accessing the index files for each block fetch ## How was this patch tested? Tested by running a job on the cluster and the shuffle read time was reduced by 50%. Author: Sital Kedia <skedia@fb.com> Closes #12944 from sitalkedia/shuffle_service.
Diffstat (limited to 'common')
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java4
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java36
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java63
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexRecord.java40
4 files changed, 131 insertions, 12 deletions
diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 9f030da2b3..0efc400aa3 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -60,6 +60,10 @@ public class TransportConf {
SPARK_NETWORK_IO_LAZYFD_KEY = getConfKey("io.lazyFD");
}
+ public int getInt(String name, int defaultValue) {
+ return conf.getInt(name, defaultValue);
+ }
+
private String getConfKey(String suffix) {
return "spark." + module + "." + suffix;
}
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index 7eefccaaed..56cf1e2e3e 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -21,6 +21,7 @@ import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@@ -29,6 +30,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.Maps;
import org.fusesource.leveldbjni.JniDBFactory;
import org.fusesource.leveldbjni.internal.NativeDB;
@@ -66,6 +70,12 @@ public class ExternalShuffleBlockResolver {
@VisibleForTesting
final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
+ /**
+ * Caches index file information so that we can avoid open/close the index files
+ * for each block fetch.
+ */
+ private final LoadingCache<File, ShuffleIndexInformation> shuffleIndexCache;
+
// Single-threaded Java executor used to perform expensive recursive directory deletion.
private final Executor directoryCleaner;
@@ -95,6 +105,15 @@ public class ExternalShuffleBlockResolver {
Executor directoryCleaner) throws IOException {
this.conf = conf;
this.registeredExecutorFile = registeredExecutorFile;
+ int indexCacheEntries = conf.getInt("spark.shuffle.service.index.cache.entries", 1024);
+ CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
+ new CacheLoader<File, ShuffleIndexInformation>() {
+ public ShuffleIndexInformation load(File file) throws IOException {
+ return new ShuffleIndexInformation(file);
+ }
+ };
+ shuffleIndexCache = CacheBuilder.newBuilder()
+ .maximumSize(indexCacheEntries).build(indexCacheLoader);
if (registeredExecutorFile != null) {
Options options = new Options();
options.createIfMissing(false);
@@ -265,24 +284,17 @@ public class ExternalShuffleBlockResolver {
File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.index");
- DataInputStream in = null;
try {
- in = new DataInputStream(new FileInputStream(indexFile));
- in.skipBytes(reduceId * 8);
- long offset = in.readLong();
- long nextOffset = in.readLong();
+ ShuffleIndexInformation shuffleIndexInformation = shuffleIndexCache.get(indexFile);
+ ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId);
return new FileSegmentManagedBuffer(
conf,
getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
- offset,
- nextOffset - offset);
- } catch (IOException e) {
+ shuffleIndexRecord.getOffset(),
+ shuffleIndexRecord.getLength());
+ } catch (ExecutionException e) {
throw new RuntimeException("Failed to open file: " + indexFile, e);
- } finally {
- if (in != null) {
- JavaUtils.closeQuietly(in);
- }
}
}
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
new file mode 100644
index 0000000000..f1ff44a3f7
--- /dev/null
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import com.google.common.cache.LoadingCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.nio.ch.IOUtil;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.LongBuffer;
+
+/**
+ * Keeps the index information for a particular map output
+ * as an in-memory LongBuffer.
+ */
+public class ShuffleIndexInformation {
+ /** offsets as long buffer */
+ private final LongBuffer offsets;
+
+ public ShuffleIndexInformation(File indexFile) throws IOException {
+ int size = (int)indexFile.length();
+ ByteBuffer buffer = ByteBuffer.allocate(size);
+ offsets = buffer.asLongBuffer();
+ DataInputStream dis = null;
+ try {
+ dis = new DataInputStream(new FileInputStream(indexFile));
+ dis.readFully(buffer.array());
+ } finally {
+ if (dis != null) {
+ dis.close();
+ }
+ }
+ }
+
+ /**
+ * Get index offset for a particular reducer.
+ */
+ public ShuffleIndexRecord getIndex(int reduceId) {
+ long offset = offsets.get(reduceId);
+ long nextOffset = offsets.get(reduceId + 1);
+ return new ShuffleIndexRecord(offset, nextOffset - offset);
+ }
+}
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexRecord.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexRecord.java
new file mode 100644
index 0000000000..6a4fac150a
--- /dev/null
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexRecord.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+/**
+ * Contains offset and length of the shuffle block data.
+ */
+public class ShuffleIndexRecord {
+ private final long offset;
+ private final long length;
+
+ public ShuffleIndexRecord(long offset, long length) {
+ this.offset = offset;
+ this.length = length;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public long getLength() {
+ return length;
+ }
+}
+