diff options
author | Kevin Yu <qyu@us.ibm.com> | 2016-01-07 21:13:17 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-01-07 21:13:17 -0800 |
commit | 5028a001d51a9e9a13e3c39f6a080618f3425d87 (patch) | |
tree | 774a461be1ac0b6b8f590ed9ac5a740ef37f49e5 /sql/core | |
parent | 28e0e500a2062baeda8c887e17dc8ab2b7d7d4b4 (diff) | |
download | spark-5028a001d51a9e9a13e3c39f6a080618f3425d87.tar.gz spark-5028a001d51a9e9a13e3c39f6a080618f3425d87.tar.bz2 spark-5028a001d51a9e9a13e3c39f6a080618f3425d87.zip |
[SPARK-12317][SQL] Support units (m,k,g) in SQLConf
This PR is continue from previous closed PR 10314.
In this PR, SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE will be taken memory string conventions as input.
For example, the user can now specify 10g for SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE in SQLConf file.
marmbrus srowen : Can you help review this code changes ? Thanks.
Author: Kevin Yu <qyu@us.ibm.com>
Closes #10629 from kevinyu98/spark-12317.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala | 22 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala | 39 |
2 files changed, 60 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 26c00dc250..7976795ff5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -26,6 +26,7 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.sql.catalyst.parser.ParserConf +import org.apache.spark.util.Utils //////////////////////////////////////////////////////////////////////////////////////////////////// // This file defines the configuration options for Spark SQL. @@ -115,6 +116,25 @@ private[spark] object SQLConf { } }, _.toString, doc, isPublic) + def longMemConf( + key: String, + defaultValue: Option[Long] = None, + doc: String = "", + isPublic: Boolean = true): SQLConfEntry[Long] = + SQLConfEntry(key, defaultValue, { v => + try { + v.toLong + } catch { + case _: NumberFormatException => + try { + Utils.byteStringAsBytes(v) + } catch { + case _: NumberFormatException => + throw new IllegalArgumentException(s"$key should be long, but was $v") + } + } + }, _.toString, doc, isPublic) + def doubleConf( key: String, defaultValue: Option[Double] = None, @@ -235,7 +255,7 @@ private[spark] object SQLConf { doc = "The default number of partitions to use when shuffling data for joins or aggregations.") val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE = - longConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", + longMemConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", defaultValue = Some(64 * 1024 * 1024), doc = "The target post-shuffle input size in bytes of a task.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala index 43300cd635..a2eddc8fe1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala @@ -92,4 +92,43 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { } assert(e.getMessage === s"${SQLConf.CASE_SENSITIVE.key} should be boolean, but was 10") } + + test("Test SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE's method") { + sqlContext.conf.clear() + + sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "100") + assert(sqlContext.conf.targetPostShuffleInputSize === 100) + + sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1k") + assert(sqlContext.conf.targetPostShuffleInputSize === 1024) + + sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1M") + assert(sqlContext.conf.targetPostShuffleInputSize === 1048576) + + sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1g") + assert(sqlContext.conf.targetPostShuffleInputSize === 1073741824) + + sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-1") + assert(sqlContext.conf.targetPostShuffleInputSize === -1) + + // Test overflow exception + intercept[IllegalArgumentException] { + // This value exceeds Long.MaxValue + // Utils.byteStringAsBytes("90000000000g") + sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "90000000000g") + } + + intercept[IllegalArgumentException] { + // This value less than Int.MinValue + // Utils.byteStringAsBytes("-90000000000g") + sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-90000000000g") + } + // Test invalid input + intercept[IllegalArgumentException] { + // This value exceeds Long.MaxValue + // Utils.byteStringAsBytes("-1g") + sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-1g") + } + sqlContext.conf.clear() + } } |