aboutsummaryrefslogtreecommitdiff
path: root/network
diff options
context:
space:
mode:
Diffstat (limited to 'network')
-rw-r--r--network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java66
-rw-r--r--network/common/src/main/java/org/apache/spark/network/util/TransportConf.java15
2 files changed, 77 insertions, 4 deletions
diff --git a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
index 73da9b7346..b6fbace509 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
@@ -21,9 +21,13 @@ import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import io.netty.buffer.Unpooled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -121,4 +125,66 @@ public class JavaUtils {
}
return !fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile());
}
+
+ private static ImmutableMap<String, TimeUnit> timeSuffixes =
+ ImmutableMap.<String, TimeUnit>builder()
+ .put("us", TimeUnit.MICROSECONDS)
+ .put("ms", TimeUnit.MILLISECONDS)
+ .put("s", TimeUnit.SECONDS)
+ .put("m", TimeUnit.MINUTES)
+ .put("min", TimeUnit.MINUTES)
+ .put("h", TimeUnit.HOURS)
+ .put("d", TimeUnit.DAYS)
+ .build();
+
+ /**
+ * Convert a passed time string (e.g. 50s, 100ms, or 250us) to a time count for
+ * internal use. If no suffix is provided a direct conversion is attempted.
+ */
+ private static long parseTimeString(String str, TimeUnit unit) {
+ String lower = str.toLowerCase().trim();
+
+ try {
+ String suffix;
+ long val;
+ Matcher m = Pattern.compile("(-?[0-9]+)([a-z]+)?").matcher(lower);
+ if (m.matches()) {
+ val = Long.parseLong(m.group(1));
+ suffix = m.group(2);
+ } else {
+ throw new NumberFormatException("Failed to parse time string: " + str);
+ }
+
+ // Check for invalid suffixes
+ if (suffix != null && !timeSuffixes.containsKey(suffix)) {
+ throw new NumberFormatException("Invalid suffix: \"" + suffix + "\"");
+ }
+
+ // If suffix is valid use that, otherwise none was provided and use the default passed
+ return unit.convert(val, suffix != null ? timeSuffixes.get(suffix) : unit);
+ } catch (NumberFormatException e) {
+ String timeError = "Time must be specified as seconds (s), " +
+ "milliseconds (ms), microseconds (us), minutes (m or min) hour (h), or day (d). " +
+ "E.g. 50s, 100ms, or 250us.";
+
+ throw new NumberFormatException(timeError + "\n" + e.getMessage());
+ }
+ }
+
+ /**
+ * Convert a time parameter such as (50s, 100ms, or 250us) to milliseconds for internal use. If
+ * no suffix is provided, the passed number is assumed to be in ms.
+ */
+ public static long timeStringAsMs(String str) {
+ return parseTimeString(str, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Convert a time parameter such as (50s, 100ms, or 250us) to seconds for internal use. If
+ * no suffix is provided, the passed number is assumed to be in seconds.
+ */
+ public static long timeStringAsSec(String str) {
+ return parseTimeString(str, TimeUnit.SECONDS);
+ }
+
}
diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 2eaf3b71d9..0aef7f1987 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -37,8 +37,11 @@ public class TransportConf {
/** Connect timeout in milliseconds. Default 120 secs. */
public int connectionTimeoutMs() {
- int defaultTimeout = conf.getInt("spark.network.timeout", 120);
- return conf.getInt("spark.shuffle.io.connectionTimeout", defaultTimeout) * 1000;
+ long defaultNetworkTimeoutS = JavaUtils.timeStringAsSec(
+ conf.get("spark.network.timeout", "120s"));
+ long defaultTimeoutMs = JavaUtils.timeStringAsSec(
+ conf.get("spark.shuffle.io.connectionTimeout", defaultNetworkTimeoutS + "s")) * 1000;
+ return (int) defaultTimeoutMs;
}
/** Number of concurrent connections between two nodes for fetching data. */
@@ -68,7 +71,9 @@ public class TransportConf {
public int sendBuf() { return conf.getInt("spark.shuffle.io.sendBuffer", -1); }
/** Timeout for a single round trip of SASL token exchange, in milliseconds. */
- public int saslRTTimeoutMs() { return conf.getInt("spark.shuffle.sasl.timeout", 30) * 1000; }
+ public int saslRTTimeoutMs() {
+ return (int) JavaUtils.timeStringAsSec(conf.get("spark.shuffle.sasl.timeout", "30s")) * 1000;
+ }
/**
* Max number of times we will try IO exceptions (such as connection timeouts) per request.
@@ -80,7 +85,9 @@ public class TransportConf {
* Time (in milliseconds) that we will wait in order to perform a retry after an IOException.
* Only relevant if maxIORetries &gt; 0.
*/
- public int ioRetryWaitTimeMs() { return conf.getInt("spark.shuffle.io.retryWait", 5) * 1000; }
+ public int ioRetryWaitTimeMs() {
+ return (int) JavaUtils.timeStringAsSec(conf.get("spark.shuffle.io.retryWait", "5s")) * 1000;
+ }
/**
* Minimum size of a block that we should start using memory map rather than reading in through