aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-05-21 20:07:34 -0700
committerReynold Xin <rxin@databricks.com>2016-05-21 20:07:34 -0700
commit8f0a3d5bcba313dc3b70d4aa9a8ba2aa2d276062 (patch)
treef35feb55bf0c1fe384f0cdf0882fc9648f7eb513
parentc18fa464f404ed2612f8c4d355cb0544b355975b (diff)
downloadspark-8f0a3d5bcba313dc3b70d4aa9a8ba2aa2d276062.tar.gz
spark-8f0a3d5bcba313dc3b70d4aa9a8ba2aa2d276062.tar.bz2
spark-8f0a3d5bcba313dc3b70d4aa9a8ba2aa2d276062.zip
[SPARK-15330][SQL] Implement Reset Command
#### What changes were proposed in this pull request? Like `Set` Command in Hive, `Reset` is also supported by Hive. See the link: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Cli Below is the related Hive JIRA: https://issues.apache.org/jira/browse/HIVE-3202 This PR is to implement such a command for resetting the SQL-related configuration to the default values. One of the use case shown in HIVE-3202 is listed below: > For the purpose of optimization we set various configs per query. It's worthy but all those configs should be reset every time for next query. #### How was this patch tested? Added a test case. Author: gatorsmile <gatorsmile@gmail.com> Author: xiaoli <lixiao1983@gmail.com> Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local> Closes #13121 from gatorsmile/resetCommand.
-rw-r--r--sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g44
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala16
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala49
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala6
5 files changed, 82 insertions, 5 deletions
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 06ac37b7f8..848c59e3b8 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -120,6 +120,7 @@ statement
| ADD identifier .*? #addResource
| SET ROLE .*? #failNativeCommand
| SET .*? #setConfiguration
+ | RESET #resetConfiguration
| unsupportedHiveNativeCommands .*? #failNativeCommand
;
@@ -633,7 +634,7 @@ nonReserved
| GROUPING | CUBE | ROLLUP
| EXPLAIN | FORMAT | LOGICAL | FORMATTED | CODEGEN
| TABLESAMPLE | USE | TO | BUCKET | PERCENTLIT | OUT | OF
- | SET
+ | SET | RESET
| VIEW | REPLACE
| IF
| NO | DATA
@@ -748,6 +749,7 @@ MAP: 'MAP';
STRUCT: 'STRUCT';
COMMENT: 'COMMENT';
SET: 'SET';
+RESET: 'RESET';
DATA: 'DATA';
START: 'START';
TRANSACTION: 'TRANSACTION';
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 2966eefd07..2e3ac9706d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -76,6 +76,18 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
/**
+ * Create a [[ResetCommand]] logical plan.
+ * Example SQL :
+ * {{{
+ * RESET;
+ * }}}
+ */
+ override def visitResetConfiguration(
+ ctx: ResetConfigurationContext): LogicalPlan = withOrigin(ctx) {
+ ResetCommand
+ }
+
+ /**
* Create an [[AnalyzeTableCommand]] command. This currently only implements the NOSCAN
* option (other options are passed on to Hive) e.g.:
* {{{
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
index 282f26ce99..b0e2d03af0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
@@ -116,3 +116,19 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
override def run(sparkSession: SparkSession): Seq[Row] = runFunc(sparkSession)
}
+
+/**
+ * This command is for resetting SQLConf to the default values. Command that runs
+ * {{{
+ * reset;
+ * }}}
+ */
+case object ResetCommand extends RunnableCommand with Logging {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ sparkSession.sessionState.conf.clear()
+ Seq.empty[Row]
+ }
+
+ override val output: Seq[Attribute] = Seq.empty
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
index 0296229100..f8227e3bd6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
@@ -99,7 +99,7 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
test("deprecated property") {
spark.sqlContext.conf.clear()
val original = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS)
- try{
+ try {
sql(s"set ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}=10")
assert(spark.conf.get(SQLConf.SHUFFLE_PARTITIONS) === 10)
} finally {
@@ -107,6 +107,53 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
}
}
+ test("reset - public conf") {
+ spark.sqlContext.conf.clear()
+ val original = spark.conf.get(SQLConf.GROUP_BY_ORDINAL)
+ try {
+ assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL) === true)
+ sql(s"set ${SQLConf.GROUP_BY_ORDINAL.key}=false")
+ assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL) === false)
+ assert(sql(s"set").where(s"key = '${SQLConf.GROUP_BY_ORDINAL.key}'").count() == 1)
+ sql(s"reset")
+ assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL) === true)
+ assert(sql(s"set").where(s"key = '${SQLConf.GROUP_BY_ORDINAL.key}'").count() == 0)
+ } finally {
+ sql(s"set ${SQLConf.GROUP_BY_ORDINAL}=$original")
+ }
+ }
+
+ test("reset - internal conf") {
+ spark.sqlContext.conf.clear()
+ val original = spark.conf.get(SQLConf.NATIVE_VIEW)
+ try {
+ assert(spark.conf.get(SQLConf.NATIVE_VIEW) === true)
+ sql(s"set ${SQLConf.NATIVE_VIEW.key}=false")
+ assert(spark.conf.get(SQLConf.NATIVE_VIEW) === false)
+ assert(sql(s"set").where(s"key = '${SQLConf.NATIVE_VIEW.key}'").count() == 1)
+ sql(s"reset")
+ assert(spark.conf.get(SQLConf.NATIVE_VIEW) === true)
+ assert(sql(s"set").where(s"key = '${SQLConf.NATIVE_VIEW.key}'").count() == 0)
+ } finally {
+ sql(s"set ${SQLConf.NATIVE_VIEW}=$original")
+ }
+ }
+
+ test("reset - user-defined conf") {
+ spark.sqlContext.conf.clear()
+ val userDefinedConf = "x.y.z.reset"
+ try {
+ assert(spark.conf.getOption(userDefinedConf).isEmpty)
+ sql(s"set $userDefinedConf=false")
+ assert(spark.conf.get(userDefinedConf) === "false")
+ assert(sql(s"set").where(s"key = '$userDefinedConf'").count() == 1)
+ sql(s"reset")
+ assert(spark.conf.getOption(userDefinedConf).isEmpty)
+ } finally {
+ spark.conf.unset(userDefinedConf)
+ }
+ }
+
test("invalid conf value") {
spark.sqlContext.conf.clear()
val e = intercept[IllegalArgumentException] {
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 1402e0a687..33ff8aee79 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -32,8 +32,8 @@ import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.exec.Utilities
-import org.apache.hadoop.hive.ql.processors.{AddResourceProcessor, CommandProcessor,
- CommandProcessorFactory, SetProcessor}
+import org.apache.hadoop.hive.ql.processors.{AddResourceProcessor, CommandProcessor}
+import org.apache.hadoop.hive.ql.processors.{CommandProcessorFactory, ResetProcessor, SetProcessor}
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.thrift.transport.TSocket
@@ -312,7 +312,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
if (proc != null) {
// scalastyle:off println
if (proc.isInstanceOf[Driver] || proc.isInstanceOf[SetProcessor] ||
- proc.isInstanceOf[AddResourceProcessor]) {
+ proc.isInstanceOf[AddResourceProcessor] || proc.isInstanceOf[ResetProcessor]) {
val driver = new SparkSQLDriver
driver.init()