aboutsummaryrefslogtreecommitdiff
path: root/network
diff options
context:
space:
mode:
authorIlya Ganelin <ilya.ganelin@capitalone.com>2015-04-13 16:28:07 -0700
committerAndrew Or <andrew@databricks.com>2015-04-13 16:28:07 -0700
commitc4ab255e94366ba9b9023d5431f9d2412e0d6dc7 (patch)
treecade698e2139a54ab81957383c3ef2b5c8e8e9f2 /network
parentc5602bdc310cc8f82dc304500bebe40217cba785 (diff)
downloadspark-c4ab255e94366ba9b9023d5431f9d2412e0d6dc7.tar.gz
spark-c4ab255e94366ba9b9023d5431f9d2412e0d6dc7.tar.bz2
spark-c4ab255e94366ba9b9023d5431f9d2412e0d6dc7.zip
[SPARK-5931][CORE] Use consistent naming for time properties
I've added new utility methods to do the conversion from times specified as e.g. 120s, 240ms, 360us to convert to a consistent internal representation. I've updated usage of these constants throughout the code to be consistent. I believe I've captured all usages of time-based properties throughout the code. I've also updated variable names in a number of places to reflect their units for clarity and updated documentation where appropriate. Author: Ilya Ganelin <ilya.ganelin@capitalone.com> Author: Ilya Ganelin <ilganeli@gmail.com> Closes #5236 from ilganeli/SPARK-5931 and squashes the following commits: 4526c81 [Ilya Ganelin] Update configuration.md de3bff9 [Ilya Ganelin] Fixing style errors f5fafcd [Ilya Ganelin] Doc updates 951ca2d [Ilya Ganelin] Made the most recent round of changes bc04e05 [Ilya Ganelin] Minor fixes and doc updates 25d3f52 [Ilya Ganelin] Minor nit fixes 642a06d [Ilya Ganelin] Fixed logic for invalid suffixes and addid matching test 8927e66 [Ilya Ganelin] Fixed handling of -1 69fedcc [Ilya Ganelin] Added test for zero dc7bd08 [Ilya Ganelin] Fixed error in exception handling 7d19cdd [Ilya Ganelin] Added fix for possible NPE 6f651a8 [Ilya Ganelin] Now using regexes to simplify code in parseTimeString. Introduces getTimeAsSec and getTimeAsMs methods in SparkConf. Updated documentation cbd2ca6 [Ilya Ganelin] Formatting error 1a1122c [Ilya Ganelin] Formatting fixes and added m for use as minute formatter 4e48679 [Ilya Ganelin] Fixed priority order and mixed up conversions in a couple spots d4efd26 [Ilya Ganelin] Added time conversion for yarn.scheduler.heartbeat.interval-ms cbf41db [Ilya Ganelin] Got rid of thrown exceptions 1465390 [Ilya Ganelin] Nit 28187bf [Ilya Ganelin] Convert straight to seconds ff40bfe [Ilya Ganelin] Updated tests to fix small bugs 19c31af [Ilya Ganelin] Added cleaner computation of time conversions in tests 6387772 [Ilya Ganelin] Updated suffix handling to handle overlap of units more gracefully 5193d5f [Ilya Ganelin] Resolved merge conflicts 76cfa27 [Ilya Ganelin] [SPARK-5931] Minor nit fixes' bf779b0 [Ilya Ganelin] Special handling of overlapping usffixes for java dd0a680 [Ilya Ganelin] Updated scala code to call into java b2fc965 [Ilya Ganelin] replaced get or default since it's not present in this version of java 39164f9 [Ilya Ganelin] [SPARK-5931] Updated Java conversion to be similar to scala conversion. Updated conversions to clean up code a little using TimeUnit.convert. Added Unit tests 3b126e1 [Ilya Ganelin] Fixed conversion to US from seconds 1858197 [Ilya Ganelin] Fixed bug where all time was being converted to us instead of the appropriate units bac9edf [Ilya Ganelin] More whitespace 8613631 [Ilya Ganelin] Whitespace 1c0c07c [Ilya Ganelin] Updated Java code to add day, minutes, and hours 647b5ac [Ilya Ganelin] Udpated time conversion to use map iterator instead of if fall through 70ac213 [Ilya Ganelin] Fixed remaining usages to be consistent. Updated Java-side time conversion 68f4e93 [Ilya Ganelin] Updated more files to clean up usage of default time strings 3a12dd8 [Ilya Ganelin] Updated host revceiver 5232a36 [Ilya Ganelin] [SPARK-5931] Changed default behavior of time string conversion. 499bdf0 [Ilya Ganelin] Merge branch 'SPARK-5931' of github.com:ilganeli/spark into SPARK-5931 9e2547c [Ilya Ganelin] Reverting doc changes 8f741e1 [Ilya Ganelin] Update JavaUtils.java 34f87c2 [Ilya Ganelin] Update Utils.scala 9a29d8d [Ilya Ganelin] Fixed misuse of time in streaming context test 42477aa [Ilya Ganelin] Updated configuration doc with note on specifying time properties cde9bff [Ilya Ganelin] Updated spark.streaming.blockInterval c6a0095 [Ilya Ganelin] Updated spark.core.connection.auth.wait.timeout 5181597 [Ilya Ganelin] Updated spark.dynamicAllocation.schedulerBacklogTimeout 2fcc91c [Ilya Ganelin] Updated spark.dynamicAllocation.executorIdleTimeout 6d1518e [Ilya Ganelin] Upated spark.speculation.interval 3f1cfc8 [Ilya Ganelin] Updated spark.scheduler.revive.interval 3352d34 [Ilya Ganelin] Updated spark.scheduler.maxRegisteredResourcesWaitingTime 272c215 [Ilya Ganelin] Updated spark.locality.wait 7320c87 [Ilya Ganelin] updated spark.akka.heartbeat.interval 064ebd6 [Ilya Ganelin] Updated usage of spark.cleaner.ttl 21ef3dd [Ilya Ganelin] updated spark.shuffle.sasl.timeout c9f5cad [Ilya Ganelin] Updated spark.shuffle.io.retryWait 4933fda [Ilya Ganelin] Updated usage of spark.storage.blockManagerSlaveTimeout 7db6d2a [Ilya Ganelin] Updated usage of spark.akka.timeout 404f8c3 [Ilya Ganelin] Updated usage of spark.core.connection.ack.wait.timeout 59bf9e1 [Ilya Ganelin] [SPARK-5931] Updated Utils and JavaUtils classes to add helper methods to handle time strings. Updated time strings in a few places to properly parse time
Diffstat (limited to 'network')
-rw-r--r--network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java66
-rw-r--r--network/common/src/main/java/org/apache/spark/network/util/TransportConf.java15
2 files changed, 77 insertions, 4 deletions
diff --git a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
index 73da9b7346..b6fbace509 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
@@ -21,9 +21,13 @@ import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import io.netty.buffer.Unpooled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -121,4 +125,66 @@ public class JavaUtils {
}
return !fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile());
}
+
+ private static ImmutableMap<String, TimeUnit> timeSuffixes =
+ ImmutableMap.<String, TimeUnit>builder()
+ .put("us", TimeUnit.MICROSECONDS)
+ .put("ms", TimeUnit.MILLISECONDS)
+ .put("s", TimeUnit.SECONDS)
+ .put("m", TimeUnit.MINUTES)
+ .put("min", TimeUnit.MINUTES)
+ .put("h", TimeUnit.HOURS)
+ .put("d", TimeUnit.DAYS)
+ .build();
+
+ /**
+ * Convert a passed time string (e.g. 50s, 100ms, or 250us) to a time count for
+ * internal use. If no suffix is provided a direct conversion is attempted.
+ */
+ private static long parseTimeString(String str, TimeUnit unit) {
+ String lower = str.toLowerCase().trim();
+
+ try {
+ String suffix;
+ long val;
+ Matcher m = Pattern.compile("(-?[0-9]+)([a-z]+)?").matcher(lower);
+ if (m.matches()) {
+ val = Long.parseLong(m.group(1));
+ suffix = m.group(2);
+ } else {
+ throw new NumberFormatException("Failed to parse time string: " + str);
+ }
+
+ // Check for invalid suffixes
+ if (suffix != null && !timeSuffixes.containsKey(suffix)) {
+ throw new NumberFormatException("Invalid suffix: \"" + suffix + "\"");
+ }
+
+ // If suffix is valid use that, otherwise none was provided and use the default passed
+ return unit.convert(val, suffix != null ? timeSuffixes.get(suffix) : unit);
+ } catch (NumberFormatException e) {
+ String timeError = "Time must be specified as seconds (s), " +
+ "milliseconds (ms), microseconds (us), minutes (m or min) hour (h), or day (d). " +
+ "E.g. 50s, 100ms, or 250us.";
+
+ throw new NumberFormatException(timeError + "\n" + e.getMessage());
+ }
+ }
+
+ /**
+ * Convert a time parameter such as (50s, 100ms, or 250us) to milliseconds for internal use. If
+ * no suffix is provided, the passed number is assumed to be in ms.
+ */
+ public static long timeStringAsMs(String str) {
+ return parseTimeString(str, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Convert a time parameter such as (50s, 100ms, or 250us) to seconds for internal use. If
+ * no suffix is provided, the passed number is assumed to be in seconds.
+ */
+ public static long timeStringAsSec(String str) {
+ return parseTimeString(str, TimeUnit.SECONDS);
+ }
+
}
diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 2eaf3b71d9..0aef7f1987 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -37,8 +37,11 @@ public class TransportConf {
/** Connect timeout in milliseconds. Default 120 secs. */
public int connectionTimeoutMs() {
- int defaultTimeout = conf.getInt("spark.network.timeout", 120);
- return conf.getInt("spark.shuffle.io.connectionTimeout", defaultTimeout) * 1000;
+ long defaultNetworkTimeoutS = JavaUtils.timeStringAsSec(
+ conf.get("spark.network.timeout", "120s"));
+ long defaultTimeoutMs = JavaUtils.timeStringAsSec(
+ conf.get("spark.shuffle.io.connectionTimeout", defaultNetworkTimeoutS + "s")) * 1000;
+ return (int) defaultTimeoutMs;
}
/** Number of concurrent connections between two nodes for fetching data. */
@@ -68,7 +71,9 @@ public class TransportConf {
public int sendBuf() { return conf.getInt("spark.shuffle.io.sendBuffer", -1); }
/** Timeout for a single round trip of SASL token exchange, in milliseconds. */
- public int saslRTTimeoutMs() { return conf.getInt("spark.shuffle.sasl.timeout", 30) * 1000; }
+ public int saslRTTimeoutMs() {
+ return (int) JavaUtils.timeStringAsSec(conf.get("spark.shuffle.sasl.timeout", "30s")) * 1000;
+ }
/**
* Max number of times we will try IO exceptions (such as connection timeouts) per request.
@@ -80,7 +85,9 @@ public class TransportConf {
* Time (in milliseconds) that we will wait in order to perform a retry after an IOException.
* Only relevant if maxIORetries &gt; 0.
*/
- public int ioRetryWaitTimeMs() { return conf.getInt("spark.shuffle.io.retryWait", 5) * 1000; }
+ public int ioRetryWaitTimeMs() {
+ return (int) JavaUtils.timeStringAsSec(conf.get("spark.shuffle.io.retryWait", "5s")) * 1000;
+ }
/**
* Minimum size of a block that we should start using memory map rather than reading in through