aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKevin Yu <qyu@us.ibm.com>2016-01-07 21:13:17 -0800
committerReynold Xin <rxin@databricks.com>2016-01-07 21:13:17 -0800
commit5028a001d51a9e9a13e3c39f6a080618f3425d87 (patch)
tree774a461be1ac0b6b8f590ed9ac5a740ef37f49e5
parent28e0e500a2062baeda8c887e17dc8ab2b7d7d4b4 (diff)
downloadspark-5028a001d51a9e9a13e3c39f6a080618f3425d87.tar.gz
spark-5028a001d51a9e9a13e3c39f6a080618f3425d87.tar.bz2
spark-5028a001d51a9e9a13e3c39f6a080618f3425d87.zip
[SPARK-12317][SQL] Support units (m,k,g) in SQLConf
This PR is continue from previous closed PR 10314. In this PR, SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE will be taken memory string conventions as input. For example, the user can now specify 10g for SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE in SQLConf file. marmbrus srowen : Can you help review this code changes ? Thanks. Author: Kevin Yu <qyu@us.ibm.com> Closes #10629 from kevinyu98/spark-12317.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala22
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala39
2 files changed, 60 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 26c00dc250..7976795ff5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -26,6 +26,7 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.parser.ParserConf
+import org.apache.spark.util.Utils
////////////////////////////////////////////////////////////////////////////////////////////////////
// This file defines the configuration options for Spark SQL.
@@ -115,6 +116,25 @@ private[spark] object SQLConf {
}
}, _.toString, doc, isPublic)
+ def longMemConf(
+ key: String,
+ defaultValue: Option[Long] = None,
+ doc: String = "",
+ isPublic: Boolean = true): SQLConfEntry[Long] =
+ SQLConfEntry(key, defaultValue, { v =>
+ try {
+ v.toLong
+ } catch {
+ case _: NumberFormatException =>
+ try {
+ Utils.byteStringAsBytes(v)
+ } catch {
+ case _: NumberFormatException =>
+ throw new IllegalArgumentException(s"$key should be long, but was $v")
+ }
+ }
+ }, _.toString, doc, isPublic)
+
def doubleConf(
key: String,
defaultValue: Option[Double] = None,
@@ -235,7 +255,7 @@ private[spark] object SQLConf {
doc = "The default number of partitions to use when shuffling data for joins or aggregations.")
val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE =
- longConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize",
+ longMemConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize",
defaultValue = Some(64 * 1024 * 1024),
doc = "The target post-shuffle input size in bytes of a task.")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
index 43300cd635..a2eddc8fe1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
@@ -92,4 +92,43 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
}
assert(e.getMessage === s"${SQLConf.CASE_SENSITIVE.key} should be boolean, but was 10")
}
+
+ test("Test SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE's method") {
+ sqlContext.conf.clear()
+
+ sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "100")
+ assert(sqlContext.conf.targetPostShuffleInputSize === 100)
+
+ sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1k")
+ assert(sqlContext.conf.targetPostShuffleInputSize === 1024)
+
+ sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1M")
+ assert(sqlContext.conf.targetPostShuffleInputSize === 1048576)
+
+ sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1g")
+ assert(sqlContext.conf.targetPostShuffleInputSize === 1073741824)
+
+ sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-1")
+ assert(sqlContext.conf.targetPostShuffleInputSize === -1)
+
+ // Test overflow exception
+ intercept[IllegalArgumentException] {
+ // This value exceeds Long.MaxValue
+ // Utils.byteStringAsBytes("90000000000g")
+ sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "90000000000g")
+ }
+
+ intercept[IllegalArgumentException] {
+ // This value less than Int.MinValue
+ // Utils.byteStringAsBytes("-90000000000g")
+ sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-90000000000g")
+ }
+ // Test invalid input
+ intercept[IllegalArgumentException] {
+ // This value exceeds Long.MaxValue
+ // Utils.byteStringAsBytes("-1g")
+ sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-1g")
+ }
+ sqlContext.conf.clear()
+ }
}