aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYuming Wang <wgyumg@gmail.com>2017-03-08 11:31:01 +0000
committerSean Owen <sowen@cloudera.com>2017-03-08 11:31:01 +0000
commit3f9f9180c2e695ad468eb813df5feec41e169531 (patch)
treef1d56e7a5b51beabaa340f326ba764466ebd507a /sql
parent81303f7ca7808d51229411dce8feeed8c23dbe15 (diff)
downloadspark-3f9f9180c2e695ad468eb813df5feec41e169531.tar.gz
spark-3f9f9180c2e695ad468eb813df5feec41e169531.tar.bz2
spark-3f9f9180c2e695ad468eb813df5feec41e169531.zip
[SPARK-19693][SQL] Make the SET mapreduce.job.reduces automatically converted to spark.sql.shuffle.partitions
## What changes were proposed in this pull request? Make the `SET mapreduce.job.reduces` automatically converted to `spark.sql.shuffle.partitions`, it's similar to `SET mapred.reduce.tasks`. ## How was this patch tested? unit tests Author: Yuming Wang <wgyumg@gmail.com> Closes #17020 from wangyum/SPARK-19693.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala17
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala12
3 files changed, 33 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
index 7afa4e78a3..5f12830ee6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
@@ -60,6 +60,23 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
}
(keyValueOutput, runFunc)
+ case Some((SQLConf.Replaced.MAPREDUCE_JOB_REDUCES, Some(value))) =>
+ val runFunc = (sparkSession: SparkSession) => {
+ logWarning(
+ s"Property ${SQLConf.Replaced.MAPREDUCE_JOB_REDUCES} is Hadoop's property, " +
+ s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS.key} instead.")
+ if (value.toInt < 1) {
+ val msg =
+ s"Setting negative ${SQLConf.Replaced.MAPREDUCE_JOB_REDUCES} for automatically " +
+ "determining the number of reducers is not supported."
+ throw new IllegalArgumentException(msg)
+ } else {
+ sparkSession.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, value)
+ Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, value))
+ }
+ }
+ (keyValueOutput, runFunc)
+
case Some((key @ SetCommand.VariableName(name), Some(value))) =>
val runFunc = (sparkSession: SparkSession) => {
sparkSession.conf.set(name, value)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 461dfe3a66..fd3acd42e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -677,6 +677,10 @@ object SQLConf {
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
+
+ object Replaced {
+ val MAPREDUCE_JOB_REDUCES = "mapreduce.job.reduces"
+ }
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 468ea05512..d9e0196c57 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1019,6 +1019,18 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
spark.sessionState.conf.clear()
}
+ test("SET mapreduce.job.reduces automatically converted to spark.sql.shuffle.partitions") {
+ spark.sessionState.conf.clear()
+ val before = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS.key).toInt
+ val newConf = before + 1
+ sql(s"SET mapreduce.job.reduces=${newConf.toString}")
+ val after = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS.key).toInt
+ assert(before != after)
+ assert(newConf === after)
+ intercept[IllegalArgumentException](sql(s"SET mapreduce.job.reduces=-1"))
+ spark.sessionState.conf.clear()
+ }
+
test("apply schema") {
val schema1 = StructType(
StructField("f1", IntegerType, false) ::