diff options
author | Yuming Wang <wgyumg@gmail.com> | 2017-03-08 11:31:01 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2017-03-08 11:31:01 +0000 |
commit | 3f9f9180c2e695ad468eb813df5feec41e169531 (patch) | |
tree | f1d56e7a5b51beabaa340f326ba764466ebd507a /sql | |
parent | 81303f7ca7808d51229411dce8feeed8c23dbe15 (diff) | |
download | spark-3f9f9180c2e695ad468eb813df5feec41e169531.tar.gz spark-3f9f9180c2e695ad468eb813df5feec41e169531.tar.bz2 spark-3f9f9180c2e695ad468eb813df5feec41e169531.zip |
[SPARK-19693][SQL] Make the SET mapreduce.job.reduces automatically converted to spark.sql.shuffle.partitions
## What changes were proposed in this pull request?
Make the `SET mapreduce.job.reduces` automatically converted to `spark.sql.shuffle.partitions`, it's similar to `SET mapred.reduce.tasks`.
## How was this patch tested?
unit tests
Author: Yuming Wang <wgyumg@gmail.com>
Closes #17020 from wangyum/SPARK-19693.
Diffstat (limited to 'sql')
3 files changed, 33 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala index 7afa4e78a3..5f12830ee6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala @@ -60,6 +60,23 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm } (keyValueOutput, runFunc) + case Some((SQLConf.Replaced.MAPREDUCE_JOB_REDUCES, Some(value))) => + val runFunc = (sparkSession: SparkSession) => { + logWarning( + s"Property ${SQLConf.Replaced.MAPREDUCE_JOB_REDUCES} is Hadoop's property, " + + s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS.key} instead.") + if (value.toInt < 1) { + val msg = + s"Setting negative ${SQLConf.Replaced.MAPREDUCE_JOB_REDUCES} for automatically " + + "determining the number of reducers is not supported." + throw new IllegalArgumentException(msg) + } else { + sparkSession.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, value) + Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, value)) + } + } + (keyValueOutput, runFunc) + case Some((key @ SetCommand.VariableName(name), Some(value))) => val runFunc = (sparkSession: SparkSession) => { sparkSession.conf.set(name, value) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 461dfe3a66..fd3acd42e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -677,6 +677,10 @@ object SQLConf { object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" } + + object Replaced { + val MAPREDUCE_JOB_REDUCES = "mapreduce.job.reduces" + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 468ea05512..d9e0196c57 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1019,6 +1019,18 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { spark.sessionState.conf.clear() } + test("SET mapreduce.job.reduces automatically converted to spark.sql.shuffle.partitions") { + spark.sessionState.conf.clear() + val before = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS.key).toInt + val newConf = before + 1 + sql(s"SET mapreduce.job.reduces=${newConf.toString}") + val after = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS.key).toInt + assert(before != after) + assert(newConf === after) + intercept[IllegalArgumentException](sql(s"SET mapreduce.job.reduces=-1")) + spark.sessionState.conf.clear() + } + test("apply schema") { val schema1 = StructType( StructField("f1", IntegerType, false) :: |