aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala90
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/io/CompressionCodec.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskStore.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala53
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/SparkConfSuite.scala19
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerResizableOutputSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala100
-rw-r--r--docs/configuration.md60
-rw-r--r--docs/tuning.md2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala2
-rw-r--r--network/common/src/main/java/org/apache/spark/network/util/ByteUnit.java67
-rw-r--r--network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java107
23 files changed, 488 insertions, 81 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index c1996e0875..a8fc90ad20 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -211,7 +211,74 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
Utils.timeStringAsMs(get(key, defaultValue))
}
+ /**
+ * Get a size parameter as bytes; throws a NoSuchElementException if it's not set. If no
+ * suffix is provided then bytes are assumed.
+ * @throws NoSuchElementException
+ */
+ def getSizeAsBytes(key: String): Long = {
+ Utils.byteStringAsBytes(get(key))
+ }
+
+ /**
+ * Get a size parameter as bytes, falling back to a default if not set. If no
+ * suffix is provided then bytes are assumed.
+ */
+ def getSizeAsBytes(key: String, defaultValue: String): Long = {
+ Utils.byteStringAsBytes(get(key, defaultValue))
+ }
+
+ /**
+ * Get a size parameter as Kibibytes; throws a NoSuchElementException if it's not set. If no
+ * suffix is provided then Kibibytes are assumed.
+ * @throws NoSuchElementException
+ */
+ def getSizeAsKb(key: String): Long = {
+ Utils.byteStringAsKb(get(key))
+ }
+
+ /**
+ * Get a size parameter as Kibibytes, falling back to a default if not set. If no
+ * suffix is provided then Kibibytes are assumed.
+ */
+ def getSizeAsKb(key: String, defaultValue: String): Long = {
+ Utils.byteStringAsKb(get(key, defaultValue))
+ }
+
+ /**
+ * Get a size parameter as Mebibytes; throws a NoSuchElementException if it's not set. If no
+ * suffix is provided then Mebibytes are assumed.
+ * @throws NoSuchElementException
+ */
+ def getSizeAsMb(key: String): Long = {
+ Utils.byteStringAsMb(get(key))
+ }
+
+ /**
+ * Get a size parameter as Mebibytes, falling back to a default if not set. If no
+ * suffix is provided then Mebibytes are assumed.
+ */
+ def getSizeAsMb(key: String, defaultValue: String): Long = {
+ Utils.byteStringAsMb(get(key, defaultValue))
+ }
+
+ /**
+ * Get a size parameter as Gibibytes; throws a NoSuchElementException if it's not set. If no
+ * suffix is provided then Gibibytes are assumed.
+ * @throws NoSuchElementException
+ */
+ def getSizeAsGb(key: String): Long = {
+ Utils.byteStringAsGb(get(key))
+ }
+ /**
+ * Get a size parameter as Gibibytes, falling back to a default if not set. If no
+ * suffix is provided then Gibibytes are assumed.
+ */
+ def getSizeAsGb(key: String, defaultValue: String): Long = {
+ Utils.byteStringAsGb(get(key, defaultValue))
+ }
+
/** Get a parameter as an Option */
def getOption(key: String): Option[String] = {
Option(settings.get(key)).orElse(getDeprecatedConfig(key, this))
@@ -407,7 +474,13 @@ private[spark] object SparkConf extends Logging {
"The spark.cache.class property is no longer being used! Specify storage levels using " +
"the RDD.persist() method instead."),
DeprecatedConfig("spark.yarn.user.classpath.first", "1.3",
- "Please use spark.{driver,executor}.userClassPathFirst instead."))
+ "Please use spark.{driver,executor}.userClassPathFirst instead."),
+ DeprecatedConfig("spark.kryoserializer.buffer.mb", "1.4",
+ "Please use spark.kryoserializer.buffer instead. The default value for " +
+ "spark.kryoserializer.buffer.mb was previously specified as '0.064'. Fractional values " +
+ "are no longer accepted. To specify the equivalent now, one may use '64k'.")
+ )
+
Map(configs.map { cfg => (cfg.key -> cfg) }:_*)
}
@@ -432,6 +505,21 @@ private[spark] object SparkConf extends Logging {
AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3",
// Translate old value to a duration, with 10s wait time per try.
translation = s => s"${s.toLong * 10}s")),
+ "spark.reducer.maxSizeInFlight" -> Seq(
+ AlternateConfig("spark.reducer.maxMbInFlight", "1.4")),
+ "spark.kryoserializer.buffer" ->
+ Seq(AlternateConfig("spark.kryoserializer.buffer.mb", "1.4",
+ translation = s => s"${s.toDouble * 1000}k")),
+ "spark.kryoserializer.buffer.max" -> Seq(
+ AlternateConfig("spark.kryoserializer.buffer.max.mb", "1.4")),
+ "spark.shuffle.file.buffer" -> Seq(
+ AlternateConfig("spark.shuffle.file.buffer.kb", "1.4")),
+ "spark.executor.logs.rolling.maxSize" -> Seq(
+ AlternateConfig("spark.executor.logs.rolling.size.maxBytes", "1.4")),
+ "spark.io.compression.snappy.blockSize" -> Seq(
+ AlternateConfig("spark.io.compression.snappy.block.size", "1.4")),
+ "spark.io.compression.lz4.blockSize" -> Seq(
+ AlternateConfig("spark.io.compression.lz4.block.size", "1.4")),
"spark.rpc.numRetries" -> Seq(
AlternateConfig("spark.akka.num.retries", "1.4")),
"spark.rpc.retry.wait" -> Seq(
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 23b02e6033..a0c9b5e63c 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -74,7 +74,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
} else {
None
}
- blockSize = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
+ // Note: use getSizeAsKb (not bytes) to maintain compatiblity if no units are provided
+ blockSize = conf.getSizeAsKb("spark.broadcast.blockSize", "4m").toInt * 1024
}
setConf(SparkEnv.get.conf)
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 0709b6d689..0756cdb2ed 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -97,7 +97,7 @@ private[spark] object CompressionCodec {
/**
* :: DeveloperApi ::
* LZ4 implementation of [[org.apache.spark.io.CompressionCodec]].
- * Block size can be configured by `spark.io.compression.lz4.block.size`.
+ * Block size can be configured by `spark.io.compression.lz4.blockSize`.
*
* Note: The wire protocol for this codec is not guaranteed to be compatible across versions
* of Spark. This is intended for use as an internal compression utility within a single Spark
@@ -107,7 +107,7 @@ private[spark] object CompressionCodec {
class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec {
override def compressedOutputStream(s: OutputStream): OutputStream = {
- val blockSize = conf.getInt("spark.io.compression.lz4.block.size", 32768)
+ val blockSize = conf.getSizeAsBytes("spark.io.compression.lz4.blockSize", "32k").toInt
new LZ4BlockOutputStream(s, blockSize)
}
@@ -137,7 +137,7 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
/**
* :: DeveloperApi ::
* Snappy implementation of [[org.apache.spark.io.CompressionCodec]].
- * Block size can be configured by `spark.io.compression.snappy.block.size`.
+ * Block size can be configured by `spark.io.compression.snappy.blockSize`.
*
* Note: The wire protocol for this codec is not guaranteed to be compatible across versions
* of Spark. This is intended for use as an internal compression utility within a single Spark
@@ -153,7 +153,7 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
}
override def compressedOutputStream(s: OutputStream): OutputStream = {
- val blockSize = conf.getInt("spark.io.compression.snappy.block.size", 32768)
+ val blockSize = conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt
new SnappyOutputStream(s, blockSize)
}
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 579fb6624e..754832b8a4 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -49,16 +49,17 @@ class KryoSerializer(conf: SparkConf)
with Logging
with Serializable {
- private val bufferSizeMb = conf.getDouble("spark.kryoserializer.buffer.mb", 0.064)
- if (bufferSizeMb >= 2048) {
- throw new IllegalArgumentException("spark.kryoserializer.buffer.mb must be less than " +
- s"2048 mb, got: + $bufferSizeMb mb.")
+ private val bufferSizeKb = conf.getSizeAsKb("spark.kryoserializer.buffer", "64k")
+
+ if (bufferSizeKb >= 2048) {
+ throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than " +
+ s"2048 mb, got: + $bufferSizeKb mb.")
}
- private val bufferSize = (bufferSizeMb * 1024 * 1024).toInt
+ private val bufferSize = (bufferSizeKb * 1024).toInt
- val maxBufferSizeMb = conf.getInt("spark.kryoserializer.buffer.max.mb", 64)
+ val maxBufferSizeMb = conf.getSizeAsMb("spark.kryoserializer.buffer.max", "64m").toInt
if (maxBufferSizeMb >= 2048) {
- throw new IllegalArgumentException("spark.kryoserializer.buffer.max.mb must be less than " +
+ throw new IllegalArgumentException("spark.kryoserializer.buffer.max must be less than " +
s"2048 mb, got: + $maxBufferSizeMb mb.")
}
private val maxBufferSize = maxBufferSizeMb * 1024 * 1024
@@ -173,7 +174,7 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ
} catch {
case e: KryoException if e.getMessage.startsWith("Buffer overflow") =>
throw new SparkException(s"Kryo serialization failed: ${e.getMessage}. To avoid this, " +
- "increase spark.kryoserializer.buffer.max.mb value.")
+ "increase spark.kryoserializer.buffer.max value.")
}
ByteBuffer.wrap(output.toBytes)
}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
index 538e150ead..e9b4e2b955 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
@@ -78,7 +78,8 @@ class FileShuffleBlockManager(conf: SparkConf)
private val consolidateShuffleFiles =
conf.getBoolean("spark.shuffle.consolidateFiles", false)
- private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
+ // Use getSizeAsKb (not bytes) to maintain backwards compatibility of on units are provided
+ private val bufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024
/**
* Contains all the state related to a particular shuffle. This includes a pool of unused
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
index 7a2c5ae32d..80374adc44 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
@@ -79,7 +79,8 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
blockManager,
blocksByAddress,
serializer,
- SparkEnv.get.conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024)
+ // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
+ SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024)
val itr = blockFetcherItr.flatMap(unpackBlock)
val completionIter = CompletionIterator[T, Iterator[T]](itr, {
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 4b232ae7d3..1f45956282 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -31,8 +31,7 @@ import org.apache.spark.util.Utils
private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager)
extends BlockStore(blockManager) with Logging {
- val minMemoryMapBytes = blockManager.conf.getLong(
- "spark.storage.memoryMapThreshold", 2 * 1024L * 1024L)
+ val minMemoryMapBytes = blockManager.conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
override def getSize(blockId: BlockId): Long = {
diskManager.getFile(blockId.name).length
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 342bc9a06d..4c028c06a5 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1020,21 +1020,48 @@ private[spark] object Utils extends Logging {
}
/**
- * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
+ * Convert a passed byte string (e.g. 50b, 100k, or 250m) to bytes for internal use.
+ *
+ * If no suffix is provided, the passed number is assumed to be in bytes.
+ */
+ def byteStringAsBytes(str: String): Long = {
+ JavaUtils.byteStringAsBytes(str)
+ }
+
+ /**
+ * Convert a passed byte string (e.g. 50b, 100k, or 250m) to kibibytes for internal use.
+ *
+ * If no suffix is provided, the passed number is assumed to be in kibibytes.
+ */
+ def byteStringAsKb(str: String): Long = {
+ JavaUtils.byteStringAsKb(str)
+ }
+
+ /**
+ * Convert a passed byte string (e.g. 50b, 100k, or 250m) to mebibytes for internal use.
+ *
+ * If no suffix is provided, the passed number is assumed to be in mebibytes.
+ */
+ def byteStringAsMb(str: String): Long = {
+ JavaUtils.byteStringAsMb(str)
+ }
+
+ /**
+ * Convert a passed byte string (e.g. 50b, 100k, or 250m, 500g) to gibibytes for internal use.
+ *
+ * If no suffix is provided, the passed number is assumed to be in gibibytes.
+ */
+ def byteStringAsGb(str: String): Long = {
+ JavaUtils.byteStringAsGb(str)
+ }
+
+ /**
+ * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of mebibytes.
*/
def memoryStringToMb(str: String): Int = {
- val lower = str.toLowerCase
- if (lower.endsWith("k")) {
- (lower.substring(0, lower.length-1).toLong / 1024).toInt
- } else if (lower.endsWith("m")) {
- lower.substring(0, lower.length-1).toInt
- } else if (lower.endsWith("g")) {
- lower.substring(0, lower.length-1).toInt * 1024
- } else if (lower.endsWith("t")) {
- lower.substring(0, lower.length-1).toInt * 1024 * 1024
- } else {// no suffix, so it's just a number in bytes
- (lower.toLong / 1024 / 1024).toInt
- }
+ // Convert to bytes, rather than directly to MB, because when no units are specified the unit
+ // is assumed to be bytes
+ (JavaUtils.byteStringAsBytes(str) / 1024 / 1024).toInt
}
/**
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 30dd7f22e4..f912049563 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -89,8 +89,10 @@ class ExternalAppendOnlyMap[K, V, C](
// Number of bytes spilled in total
private var _diskBytesSpilled = 0L
-
- private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
+
+ // Use getSizeAsKb (not bytes) to maintain backwards compatibility of on units are provided
+ private val fileBufferSize =
+ sparkConf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024
// Write metrics for current spill
private var curWriteMetrics: ShuffleWriteMetrics = _
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 79a695fb62..ef3cac6225 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -108,7 +108,9 @@ private[spark] class ExternalSorter[K, V, C](
private val conf = SparkEnv.get.conf
private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true)
- private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
+
+ // Use getSizeAsKb (not bytes) to maintain backwards compatibility of on units are provided
+ private val fileBufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024
private val transferToEnabled = conf.getBoolean("spark.file.transferTo", true)
// Size of object batches when reading/writing from serializers.
diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
index e579421676..7138b4b8e4 100644
--- a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
+++ b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
@@ -138,7 +138,7 @@ private[spark] object RollingFileAppender {
val STRATEGY_DEFAULT = ""
val INTERVAL_PROPERTY = "spark.executor.logs.rolling.time.interval"
val INTERVAL_DEFAULT = "daily"
- val SIZE_PROPERTY = "spark.executor.logs.rolling.size.maxBytes"
+ val SIZE_PROPERTY = "spark.executor.logs.rolling.maxSize"
val SIZE_DEFAULT = (1024 * 1024).toString
val RETAINED_FILES_PROPERTY = "spark.executor.logs.rolling.maxRetainedFiles"
val DEFAULT_BUFFER_SIZE = 8192
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 97ea3578aa..96a9c207ad 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -77,7 +77,7 @@ class DistributedSuite extends FunSuite with Matchers with LocalSparkContext {
}
test("groupByKey where map output sizes exceed maxMbInFlight") {
- val conf = new SparkConf().set("spark.reducer.maxMbInFlight", "1")
+ val conf = new SparkConf().set("spark.reducer.maxSizeInFlight", "1m")
sc = new SparkContext(clusterUrl, "test", conf)
// This data should be around 20 MB, so even with 4 mappers and 2 reducers, each map output
// file should be about 2.5 MB
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 272e6af051..68d08e32f9 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -24,11 +24,30 @@ import scala.language.postfixOps
import scala.util.{Try, Random}
import org.scalatest.FunSuite
+import org.apache.spark.network.util.ByteUnit
import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer}
import org.apache.spark.util.{RpcUtils, ResetSystemProperties}
import com.esotericsoftware.kryo.Kryo
class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemProperties {
+ test("Test byteString conversion") {
+ val conf = new SparkConf()
+ // Simply exercise the API, we don't need a complete conversion test since that's handled in
+ // UtilsSuite.scala
+ assert(conf.getSizeAsBytes("fake","1k") === ByteUnit.KiB.toBytes(1))
+ assert(conf.getSizeAsKb("fake","1k") === ByteUnit.KiB.toKiB(1))
+ assert(conf.getSizeAsMb("fake","1k") === ByteUnit.KiB.toMiB(1))
+ assert(conf.getSizeAsGb("fake","1k") === ByteUnit.KiB.toGiB(1))
+ }
+
+ test("Test timeString conversion") {
+ val conf = new SparkConf()
+ // Simply exercise the API, we don't need a complete conversion test since that's handled in
+ // UtilsSuite.scala
+ assert(conf.getTimeAsMs("fake","1ms") === TimeUnit.MILLISECONDS.toMillis(1))
+ assert(conf.getTimeAsSeconds("fake","1000ms") === TimeUnit.MILLISECONDS.toSeconds(1000))
+ }
+
test("loading from system properties") {
System.setProperty("spark.test.testProperty", "2")
val conf = new SparkConf()
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerResizableOutputSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerResizableOutputSuite.scala
index 967c9e9899..da98d09184 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerResizableOutputSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerResizableOutputSuite.scala
@@ -33,8 +33,8 @@ class KryoSerializerResizableOutputSuite extends FunSuite {
test("kryo without resizable output buffer should fail on large array") {
val conf = new SparkConf(false)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- conf.set("spark.kryoserializer.buffer.mb", "1")
- conf.set("spark.kryoserializer.buffer.max.mb", "1")
+ conf.set("spark.kryoserializer.buffer", "1m")
+ conf.set("spark.kryoserializer.buffer.max", "1m")
val sc = new SparkContext("local", "test", conf)
intercept[SparkException](sc.parallelize(x).collect())
LocalSparkContext.stop(sc)
@@ -43,8 +43,8 @@ class KryoSerializerResizableOutputSuite extends FunSuite {
test("kryo with resizable output buffer should succeed on large array") {
val conf = new SparkConf(false)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- conf.set("spark.kryoserializer.buffer.mb", "1")
- conf.set("spark.kryoserializer.buffer.max.mb", "2")
+ conf.set("spark.kryoserializer.buffer", "1m")
+ conf.set("spark.kryoserializer.buffer.max", "2m")
val sc = new SparkContext("local", "test", conf)
assert(sc.parallelize(x).collect() === x)
LocalSparkContext.stop(sc)
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index b070a54aa9..1b13559e77 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -269,7 +269,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
test("serialization buffer overflow reporting") {
import org.apache.spark.SparkException
- val kryoBufferMaxProperty = "spark.kryoserializer.buffer.max.mb"
+ val kryoBufferMaxProperty = "spark.kryoserializer.buffer.max"
val largeObject = (1 to 1000000).toArray
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index ffa5162a31..f647200402 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -50,7 +50,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
val allStores = new ArrayBuffer[BlockManager]
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
- conf.set("spark.kryoserializer.buffer.mb", "1")
+ conf.set("spark.kryoserializer.buffer", "1m")
val serializer = new KryoSerializer(conf)
// Implicitly convert strings to BlockIds for test clarity.
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 7d82a7c66a..6957bc72e9 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -55,7 +55,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
val shuffleManager = new HashShuffleManager(conf)
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
- conf.set("spark.kryoserializer.buffer.mb", "1")
+ conf.set("spark.kryoserializer.buffer", "1m")
val serializer = new KryoSerializer(conf)
// Implicitly convert strings to BlockIds for test clarity.
@@ -814,14 +814,14 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
// be nice to refactor classes involved in disk storage in a way that
// allows for easier testing.
val blockManager = mock(classOf[BlockManager])
- when(blockManager.conf).thenReturn(conf.clone.set(confKey, 0.toString))
+ when(blockManager.conf).thenReturn(conf.clone.set(confKey, "0"))
val diskBlockManager = new DiskBlockManager(blockManager, conf)
val diskStoreMapped = new DiskStore(blockManager, diskBlockManager)
diskStoreMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY)
val mapped = diskStoreMapped.getBytes(blockId).get
- when(blockManager.conf).thenReturn(conf.clone.set(confKey, (1000 * 1000).toString))
+ when(blockManager.conf).thenReturn(conf.clone.set(confKey, "1m"))
val diskStoreNotMapped = new DiskStore(blockManager, diskBlockManager)
diskStoreNotMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY)
val notMapped = diskStoreNotMapped.getBytes(blockId).get
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 1ba99803f5..62a3cbcdf6 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -23,7 +23,6 @@ import java.nio.{ByteBuffer, ByteOrder}
import java.text.DecimalFormatSymbols
import java.util.concurrent.TimeUnit
import java.util.Locale
-import java.util.PriorityQueue
import scala.collection.mutable.ListBuffer
import scala.util.Random
@@ -35,6 +34,7 @@ import org.scalatest.FunSuite
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
+import org.apache.spark.network.util.ByteUnit
import org.apache.spark.SparkConf
class UtilsSuite extends FunSuite with ResetSystemProperties {
@@ -66,6 +66,10 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
// Test invalid strings
intercept[NumberFormatException] {
+ Utils.timeStringAsMs("600l")
+ }
+
+ intercept[NumberFormatException] {
Utils.timeStringAsMs("This breaks 600s")
}
@@ -82,6 +86,100 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
}
}
+ test("Test byteString conversion") {
+ // Test zero
+ assert(Utils.byteStringAsBytes("0") === 0)
+
+ assert(Utils.byteStringAsGb("1") === 1)
+ assert(Utils.byteStringAsGb("1g") === 1)
+ assert(Utils.byteStringAsGb("1023m") === 0)
+ assert(Utils.byteStringAsGb("1024m") === 1)
+ assert(Utils.byteStringAsGb("1048575k") === 0)
+ assert(Utils.byteStringAsGb("1048576k") === 1)
+ assert(Utils.byteStringAsGb("1k") === 0)
+ assert(Utils.byteStringAsGb("1t") === ByteUnit.TiB.toGiB(1))
+ assert(Utils.byteStringAsGb("1p") === ByteUnit.PiB.toGiB(1))
+
+ assert(Utils.byteStringAsMb("1") === 1)
+ assert(Utils.byteStringAsMb("1m") === 1)
+ assert(Utils.byteStringAsMb("1048575b") === 0)
+ assert(Utils.byteStringAsMb("1048576b") === 1)
+ assert(Utils.byteStringAsMb("1023k") === 0)
+ assert(Utils.byteStringAsMb("1024k") === 1)
+ assert(Utils.byteStringAsMb("3645k") === 3)
+ assert(Utils.byteStringAsMb("1024gb") === 1048576)
+ assert(Utils.byteStringAsMb("1g") === ByteUnit.GiB.toMiB(1))
+ assert(Utils.byteStringAsMb("1t") === ByteUnit.TiB.toMiB(1))
+ assert(Utils.byteStringAsMb("1p") === ByteUnit.PiB.toMiB(1))
+
+ assert(Utils.byteStringAsKb("1") === 1)
+ assert(Utils.byteStringAsKb("1k") === 1)
+ assert(Utils.byteStringAsKb("1m") === ByteUnit.MiB.toKiB(1))
+ assert(Utils.byteStringAsKb("1g") === ByteUnit.GiB.toKiB(1))
+ assert(Utils.byteStringAsKb("1t") === ByteUnit.TiB.toKiB(1))
+ assert(Utils.byteStringAsKb("1p") === ByteUnit.PiB.toKiB(1))
+
+ assert(Utils.byteStringAsBytes("1") === 1)
+ assert(Utils.byteStringAsBytes("1k") === ByteUnit.KiB.toBytes(1))
+ assert(Utils.byteStringAsBytes("1m") === ByteUnit.MiB.toBytes(1))
+ assert(Utils.byteStringAsBytes("1g") === ByteUnit.GiB.toBytes(1))
+ assert(Utils.byteStringAsBytes("1t") === ByteUnit.TiB.toBytes(1))
+ assert(Utils.byteStringAsBytes("1p") === ByteUnit.PiB.toBytes(1))
+
+ // Overflow handling, 1073741824p exceeds Long.MAX_VALUE if converted straight to Bytes
+ // This demonstrates that we can have e.g 1024^3 PB without overflowing.
+ assert(Utils.byteStringAsGb("1073741824p") === ByteUnit.PiB.toGiB(1073741824))
+ assert(Utils.byteStringAsMb("1073741824p") === ByteUnit.PiB.toMiB(1073741824))
+
+ // Run this to confirm it doesn't throw an exception
+ assert(Utils.byteStringAsBytes("9223372036854775807") === 9223372036854775807L)
+ assert(ByteUnit.PiB.toPiB(9223372036854775807L) === 9223372036854775807L)
+
+ // Test overflow exception
+ intercept[IllegalArgumentException] {
+ // This value exceeds Long.MAX when converted to bytes
+ Utils.byteStringAsBytes("9223372036854775808")
+ }
+
+ // Test overflow exception
+ intercept[IllegalArgumentException] {
+ // This value exceeds Long.MAX when converted to TB
+ ByteUnit.PiB.toTiB(9223372036854775807L)
+ }
+
+ // Test fractional string
+ intercept[NumberFormatException] {
+ Utils.byteStringAsMb("0.064")
+ }
+
+ // Test fractional string
+ intercept[NumberFormatException] {
+ Utils.byteStringAsMb("0.064m")
+ }
+
+ // Test invalid strings
+ intercept[NumberFormatException] {
+ Utils.byteStringAsBytes("500ub")
+ }
+
+ // Test invalid strings
+ intercept[NumberFormatException] {
+ Utils.byteStringAsBytes("This breaks 600b")
+ }
+
+ intercept[NumberFormatException] {
+ Utils.byteStringAsBytes("This breaks 600")
+ }
+
+ intercept[NumberFormatException] {
+ Utils.byteStringAsBytes("600gb This breaks")
+ }
+
+ intercept[NumberFormatException] {
+ Utils.byteStringAsBytes("This 123mb breaks")
+ }
+ }
+
test("bytesToString") {
assert(Utils.bytesToString(10) === "10.0 B")
assert(Utils.bytesToString(1500) === "1500.0 B")
diff --git a/docs/configuration.md b/docs/configuration.md
index d587b91124..72105feba4 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -48,6 +48,17 @@ The following format is accepted:
5d (days)
1y (years)
+
+Properties that specify a byte size should be configured with a unit of size.
+The following format is accepted:
+
+ 1b (bytes)
+ 1k or 1kb (kibibytes = 1024 bytes)
+ 1m or 1mb (mebibytes = 1024 kibibytes)
+ 1g or 1gb (gibibytes = 1024 mebibytes)
+ 1t or 1tb (tebibytes = 1024 gibibytes)
+ 1p or 1pb (pebibytes = 1024 tebibytes)
+
## Dynamically Loading Spark Properties
In some cases, you may want to avoid hard-coding certain configurations in a `SparkConf`. For
instance, if you'd like to run the same application with different masters or different
@@ -272,12 +283,11 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
- <td><code>spark.executor.logs.rolling.size.maxBytes</code></td>
+ <td><code>spark.executor.logs.rolling.maxSize</code></td>
<td>(none)</td>
<td>
Set the max size of the file by which the executor logs will be rolled over.
- Rolling is disabled by default. Value is set in terms of bytes.
- See <code>spark.executor.logs.rolling.maxRetainedFiles</code>
+ Rolling is disabled by default. See <code>spark.executor.logs.rolling.maxRetainedFiles</code>
for automatic cleaning of old logs.
</td>
</tr>
@@ -366,10 +376,10 @@ Apart from these, the following properties are also available, and may be useful
<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
- <td><code>spark.reducer.maxMbInFlight</code></td>
- <td>48</td>
+ <td><code>spark.reducer.maxSizeInFlight</code></td>
+ <td>48m</td>
<td>
- Maximum size (in megabytes) of map outputs to fetch simultaneously from each reduce task. Since
+ Maximum size of map outputs to fetch simultaneously from each reduce task. Since
each output requires us to create a buffer to receive it, this represents a fixed memory
overhead per reduce task, so keep it small unless you have a large amount of memory.
</td>
@@ -403,10 +413,10 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
- <td><code>spark.shuffle.file.buffer.kb</code></td>
- <td>32</td>
+ <td><code>spark.shuffle.file.buffer</code></td>
+ <td>32k</td>
<td>
- Size of the in-memory buffer for each shuffle file output stream, in kilobytes. These buffers
+ Size of the in-memory buffer for each shuffle file output stream. These buffers
reduce the number of disk seeks and system calls made in creating intermediate shuffle files.
</td>
</tr>
@@ -582,18 +592,18 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
- <td><code>spark.io.compression.lz4.block.size</code></td>
- <td>32768</td>
+ <td><code>spark.io.compression.lz4.blockSize</code></td>
+ <td>32k</td>
<td>
- Block size (in bytes) used in LZ4 compression, in the case when LZ4 compression codec
+ Block size used in LZ4 compression, in the case when LZ4 compression codec
is used. Lowering this block size will also lower shuffle memory usage when LZ4 is used.
</td>
</tr>
<tr>
- <td><code>spark.io.compression.snappy.block.size</code></td>
- <td>32768</td>
+ <td><code>spark.io.compression.snappy.blockSize</code></td>
+ <td>32k</td>
<td>
- Block size (in bytes) used in Snappy compression, in the case when Snappy compression codec
+ Block size used in Snappy compression, in the case when Snappy compression codec
is used. Lowering this block size will also lower shuffle memory usage when Snappy is used.
</td>
</tr>
@@ -641,19 +651,19 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
- <td><code>spark.kryoserializer.buffer.max.mb</code></td>
- <td>64</td>
+ <td><code>spark.kryoserializer.buffer.max</code></td>
+ <td>64m</td>
<td>
- Maximum allowable size of Kryo serialization buffer, in megabytes. This must be larger than any
+ Maximum allowable size of Kryo serialization buffer. This must be larger than any
object you attempt to serialize. Increase this if you get a "buffer limit exceeded" exception
inside Kryo.
</td>
</tr>
<tr>
- <td><code>spark.kryoserializer.buffer.mb</code></td>
- <td>0.064</td>
+ <td><code>spark.kryoserializer.buffer</code></td>
+ <td>64k</td>
<td>
- Initial size of Kryo's serialization buffer, in megabytes. Note that there will be one buffer
+ Initial size of Kryo's serialization buffer. Note that there will be one buffer
<i>per core</i> on each worker. This buffer will grow up to
<code>spark.kryoserializer.buffer.max.mb</code> if needed.
</td>
@@ -698,9 +708,9 @@ Apart from these, the following properties are also available, and may be useful
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
<td><code>spark.broadcast.blockSize</code></td>
- <td>4096</td>
+ <td>4m</td>
<td>
- Size of each piece of a block in kilobytes for <code>TorrentBroadcastFactory</code>.
+ Size of each piece of a block for <code>TorrentBroadcastFactory</code>.
Too large a value decreases parallelism during broadcast (makes it slower); however, if it is
too small, <code>BlockManager</code> might take a performance hit.
</td>
@@ -816,9 +826,9 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td><code>spark.storage.memoryMapThreshold</code></td>
- <td>2097152</td>
+ <td>2m</td>
<td>
- Size of a block, in bytes, above which Spark memory maps when reading a block from disk.
+ Size of a block above which Spark memory maps when reading a block from disk.
This prevents Spark from memory mapping very small blocks. In general, memory
mapping has high overhead for blocks close to or below the page size of the operating system.
</td>
diff --git a/docs/tuning.md b/docs/tuning.md
index cbd227868b..1cb223e74f 100644
--- a/docs/tuning.md
+++ b/docs/tuning.md
@@ -60,7 +60,7 @@ val sc = new SparkContext(conf)
The [Kryo documentation](https://github.com/EsotericSoftware/kryo) describes more advanced
registration options, such as adding custom serialization code.
-If your objects are large, you may also need to increase the `spark.kryoserializer.buffer.mb`
+If your objects are large, you may also need to increase the `spark.kryoserializer.buffer`
config property. The default is 2, but this value needs to be large enough to hold the *largest*
object you will serialize.
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala
index 0bc36ea65e..99588b0984 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala
@@ -100,7 +100,7 @@ object MovieLensALS {
val conf = new SparkConf().setAppName(s"MovieLensALS with $params")
if (params.kryo) {
conf.registerKryoClasses(Array(classOf[mutable.BitSet], classOf[Rating]))
- .set("spark.kryoserializer.buffer.mb", "8")
+ .set("spark.kryoserializer.buffer", "8m")
}
val sc = new SparkContext(conf)
diff --git a/network/common/src/main/java/org/apache/spark/network/util/ByteUnit.java b/network/common/src/main/java/org/apache/spark/network/util/ByteUnit.java
new file mode 100644
index 0000000000..36d655017f
--- /dev/null
+++ b/network/common/src/main/java/org/apache/spark/network/util/ByteUnit.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.util;
+
+public enum ByteUnit {
+ BYTE (1),
+ KiB (1024L),
+ MiB ((long) Math.pow(1024L, 2L)),
+ GiB ((long) Math.pow(1024L, 3L)),
+ TiB ((long) Math.pow(1024L, 4L)),
+ PiB ((long) Math.pow(1024L, 5L));
+
+ private ByteUnit(long multiplier) {
+ this.multiplier = multiplier;
+ }
+
+ // Interpret the provided number (d) with suffix (u) as this unit type.
+ // E.g. KiB.interpret(1, MiB) interprets 1MiB as its KiB representation = 1024k
+ public long convertFrom(long d, ByteUnit u) {
+ return u.convertTo(d, this);
+ }
+
+ // Convert the provided number (d) interpreted as this unit type to unit type (u).
+ public long convertTo(long d, ByteUnit u) {
+ if (multiplier > u.multiplier) {
+ long ratio = multiplier / u.multiplier;
+ if (Long.MAX_VALUE / ratio < d) {
+ throw new IllegalArgumentException("Conversion of " + d + " exceeds Long.MAX_VALUE in "
+ + name() + ". Try a larger unit (e.g. MiB instead of KiB)");
+ }
+ return d * ratio;
+ } else {
+ // Perform operations in this order to avoid potential overflow
+ // when computing d * multiplier
+ return d / (u.multiplier / multiplier);
+ }
+ }
+
+ public double toBytes(long d) {
+ if (d < 0) {
+ throw new IllegalArgumentException("Negative size value. Size must be positive: " + d);
+ }
+ return d * multiplier;
+ }
+
+ public long toKiB(long d) { return convertTo(d, KiB); }
+ public long toMiB(long d) { return convertTo(d, MiB); }
+ public long toGiB(long d) { return convertTo(d, GiB); }
+ public long toTiB(long d) { return convertTo(d, TiB); }
+ public long toPiB(long d) { return convertTo(d, PiB); }
+
+ private final long multiplier;
+}
diff --git a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
index b6fbace509..6b514aaa12 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
@@ -126,7 +126,7 @@ public class JavaUtils {
return !fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile());
}
- private static ImmutableMap<String, TimeUnit> timeSuffixes =
+ private static final ImmutableMap<String, TimeUnit> timeSuffixes =
ImmutableMap.<String, TimeUnit>builder()
.put("us", TimeUnit.MICROSECONDS)
.put("ms", TimeUnit.MILLISECONDS)
@@ -137,6 +137,21 @@ public class JavaUtils {
.put("d", TimeUnit.DAYS)
.build();
+ private static final ImmutableMap<String, ByteUnit> byteSuffixes =
+ ImmutableMap.<String, ByteUnit>builder()
+ .put("b", ByteUnit.BYTE)
+ .put("k", ByteUnit.KiB)
+ .put("kb", ByteUnit.KiB)
+ .put("m", ByteUnit.MiB)
+ .put("mb", ByteUnit.MiB)
+ .put("g", ByteUnit.GiB)
+ .put("gb", ByteUnit.GiB)
+ .put("t", ByteUnit.TiB)
+ .put("tb", ByteUnit.TiB)
+ .put("p", ByteUnit.PiB)
+ .put("pb", ByteUnit.PiB)
+ .build();
+
/**
* Convert a passed time string (e.g. 50s, 100ms, or 250us) to a time count for
* internal use. If no suffix is provided a direct conversion is attempted.
@@ -145,16 +160,14 @@ public class JavaUtils {
String lower = str.toLowerCase().trim();
try {
- String suffix;
- long val;
Matcher m = Pattern.compile("(-?[0-9]+)([a-z]+)?").matcher(lower);
- if (m.matches()) {
- val = Long.parseLong(m.group(1));
- suffix = m.group(2);
- } else {
+ if (!m.matches()) {
throw new NumberFormatException("Failed to parse time string: " + str);
}
+ long val = Long.parseLong(m.group(1));
+ String suffix = m.group(2);
+
// Check for invalid suffixes
if (suffix != null && !timeSuffixes.containsKey(suffix)) {
throw new NumberFormatException("Invalid suffix: \"" + suffix + "\"");
@@ -164,7 +177,7 @@ public class JavaUtils {
return unit.convert(val, suffix != null ? timeSuffixes.get(suffix) : unit);
} catch (NumberFormatException e) {
String timeError = "Time must be specified as seconds (s), " +
- "milliseconds (ms), microseconds (us), minutes (m or min) hour (h), or day (d). " +
+ "milliseconds (ms), microseconds (us), minutes (m or min), hour (h), or day (d). " +
"E.g. 50s, 100ms, or 250us.";
throw new NumberFormatException(timeError + "\n" + e.getMessage());
@@ -186,5 +199,83 @@ public class JavaUtils {
public static long timeStringAsSec(String str) {
return parseTimeString(str, TimeUnit.SECONDS);
}
+
+ /**
+ * Convert a passed byte string (e.g. 50b, 100kb, or 250mb) to a ByteUnit for
+ * internal use. If no suffix is provided a direct conversion of the provided default is
+ * attempted.
+ */
+ private static long parseByteString(String str, ByteUnit unit) {
+ String lower = str.toLowerCase().trim();
+
+ try {
+ Matcher m = Pattern.compile("([0-9]+)([a-z]+)?").matcher(lower);
+ Matcher fractionMatcher = Pattern.compile("([0-9]+\\.[0-9]+)([a-z]+)?").matcher(lower);
+
+ if (m.matches()) {
+ long val = Long.parseLong(m.group(1));
+ String suffix = m.group(2);
+
+ // Check for invalid suffixes
+ if (suffix != null && !byteSuffixes.containsKey(suffix)) {
+ throw new NumberFormatException("Invalid suffix: \"" + suffix + "\"");
+ }
+
+ // If suffix is valid use that, otherwise none was provided and use the default passed
+ return unit.convertFrom(val, suffix != null ? byteSuffixes.get(suffix) : unit);
+ } else if (fractionMatcher.matches()) {
+ throw new NumberFormatException("Fractional values are not supported. Input was: "
+ + fractionMatcher.group(1));
+ } else {
+ throw new NumberFormatException("Failed to parse byte string: " + str);
+ }
+
+ } catch (NumberFormatException e) {
+ String timeError = "Size must be specified as bytes (b), " +
+ "kibibytes (k), mebibytes (m), gibibytes (g), tebibytes (t), or pebibytes(p). " +
+ "E.g. 50b, 100k, or 250m.";
+ throw new NumberFormatException(timeError + "\n" + e.getMessage());
+ }
+ }
+
+ /**
+ * Convert a passed byte string (e.g. 50b, 100k, or 250m) to bytes for
+ * internal use.
+ *
+ * If no suffix is provided, the passed number is assumed to be in bytes.
+ */
+ public static long byteStringAsBytes(String str) {
+ return parseByteString(str, ByteUnit.BYTE);
+ }
+
+ /**
+ * Convert a passed byte string (e.g. 50b, 100k, or 250m) to kibibytes for
+ * internal use.
+ *
+ * If no suffix is provided, the passed number is assumed to be in kibibytes.
+ */
+ public static long byteStringAsKb(String str) {
+ return parseByteString(str, ByteUnit.KiB);
+ }
+
+ /**
+ * Convert a passed byte string (e.g. 50b, 100k, or 250m) to mebibytes for
+ * internal use.
+ *
+ * If no suffix is provided, the passed number is assumed to be in mebibytes.
+ */
+ public static long byteStringAsMb(String str) {
+ return parseByteString(str, ByteUnit.MiB);
+ }
+
+ /**
+ * Convert a passed byte string (e.g. 50b, 100k, or 250m) to gibibytes for
+ * internal use.
+ *
+ * If no suffix is provided, the passed number is assumed to be in gibibytes.
+ */
+ public static long byteStringAsGb(String str) {
+ return parseByteString(str, ByteUnit.GiB);
+ }
}