aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-15 20:28:09 -0700
committerYin Huai <yhuai@databricks.com>2016-04-15 20:28:09 -0700
commitf4be0946af219379fb2476e6f80b2e50463adeb2 (patch)
treed4085adef6750a5315daa6fabef7f9c0218fca20 /sql
parentb2dfa849599843269a43e6e0f2ab8c539dfc32b6 (diff)
downloadspark-f4be0946af219379fb2476e6f80b2e50463adeb2.tar.gz
spark-f4be0946af219379fb2476e6f80b2e50463adeb2.tar.bz2
spark-f4be0946af219379fb2476e6f80b2e50463adeb2.zip
[SPARK-14677][SQL] Make the max number of iterations configurable for Catalyst
## What changes were proposed in this pull request? We currently hard code the max number of optimizer/analyzer iterations to 100. This patch makes it configurable. While I'm at it, I also added the SessionCatalog to the optimizer, so we can use information there in optimization. ## How was this patch tested? Updated unit tests to reflect the change. Author: Reynold Xin <rxin@databricks.com> Closes #12434 from rxin/SPARK-14677.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala32
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala19
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala36
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala2
9 files changed, 62 insertions, 64 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
index 2b98aacdd7..abba866821 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
@@ -19,46 +19,32 @@ package org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.analysis._
-private[spark] trait CatalystConf {
+/**
+ * Interface for configuration options used in the catalyst module.
+ */
+trait CatalystConf {
def caseSensitiveAnalysis: Boolean
def orderByOrdinal: Boolean
def groupByOrdinal: Boolean
+ def optimizerMaxIterations: Int
+
/**
* Returns the [[Resolver]] for the current configuration, which can be used to determine if two
* identifiers are equal.
*/
def resolver: Resolver = {
- if (caseSensitiveAnalysis) {
- caseSensitiveResolution
- } else {
- caseInsensitiveResolution
- }
+ if (caseSensitiveAnalysis) caseSensitiveResolution else caseInsensitiveResolution
}
}
-/**
- * A trivial conf that is empty. Used for testing when all
- * relations are already filled in and the analyser needs only to resolve attribute references.
- */
-object EmptyConf extends CatalystConf {
- override def caseSensitiveAnalysis: Boolean = {
- throw new UnsupportedOperationException
- }
- override def orderByOrdinal: Boolean = {
- throw new UnsupportedOperationException
- }
- override def groupByOrdinal: Boolean = {
- throw new UnsupportedOperationException
- }
-}
/** A CatalystConf that can be used for local testing. */
case class SimpleCatalystConf(
caseSensitiveAnalysis: Boolean,
orderByOrdinal: Boolean = true,
- groupByOrdinal: Boolean = true)
-
+ groupByOrdinal: Boolean = true,
+ optimizerMaxIterations: Int = 100)
extends CatalystConf {
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index de40ddde1b..37ff6ab6f6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -39,14 +39,13 @@ import org.apache.spark.sql.types._
* Used for testing when all relations are already filled in and the analyzer needs only
* to resolve attribute references.
*/
-object SimpleAnalyzer
- extends SimpleAnalyzer(
- EmptyFunctionRegistry,
+object SimpleAnalyzer extends Analyzer(
+ new SessionCatalog(
+ new InMemoryCatalog,
+ EmptyFunctionRegistry,
+ new SimpleCatalystConf(caseSensitiveAnalysis = true)),
new SimpleCatalystConf(caseSensitiveAnalysis = true))
-class SimpleAnalyzer(functionRegistry: FunctionRegistry, conf: CatalystConf)
- extends Analyzer(new SessionCatalog(new InMemoryCatalog, functionRegistry, conf), conf)
-
/**
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
* [[UnresolvedRelation]]s into fully typed objects using information in a
@@ -55,9 +54,13 @@ class SimpleAnalyzer(functionRegistry: FunctionRegistry, conf: CatalystConf)
class Analyzer(
catalog: SessionCatalog,
conf: CatalystConf,
- maxIterations: Int = 100)
+ maxIterations: Int)
extends RuleExecutor[LogicalPlan] with CheckAnalysis {
+ def this(catalog: SessionCatalog, conf: CatalystConf) = {
+ this(catalog, conf, conf.optimizerMaxIterations)
+ }
+
def resolver: Resolver = {
if (conf.caseSensitiveAnalysis) {
caseSensitiveResolution
@@ -66,7 +69,7 @@ class Analyzer(
}
}
- val fixedPoint = FixedPoint(maxIterations)
+ protected val fixedPoint = FixedPoint(maxIterations)
/**
* Override to provide additional rules for the "Resolution" batch.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index bb68ef826f..6c8f8f40dd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
import scala.annotation.tailrec
import scala.collection.immutable.HashSet
-import org.apache.spark.sql.catalyst.{CatalystConf, EmptyConf}
+import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, DistinctAggregationRewriter, EliminateSubqueryAliases, EmptyFunctionRegistry}
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
@@ -36,9 +36,11 @@ import org.apache.spark.sql.types._
* Abstract class all optimizers should inherit of, contains the standard batches (extending
* Optimizers can override this.
*/
-abstract class Optimizer(
- conf: CatalystConf,
- sessionCatalog: SessionCatalog) extends RuleExecutor[LogicalPlan] {
+abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
+ extends RuleExecutor[LogicalPlan] {
+
+ protected val fixedPoint = FixedPoint(conf.optimizerMaxIterations)
+
def batches: Seq[Batch] = {
// Technically some of the rules in Finish Analysis are not optimizer rules and belong more
// in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
@@ -59,12 +61,12 @@ abstract class Optimizer(
// since the other rules might make two separate Unions operators adjacent.
Batch("Union", Once,
CombineUnions) ::
- Batch("Replace Operators", FixedPoint(100),
+ Batch("Replace Operators", fixedPoint,
ReplaceIntersectWithSemiJoin,
ReplaceDistinctWithAggregate) ::
- Batch("Aggregate", FixedPoint(100),
+ Batch("Aggregate", fixedPoint,
RemoveLiteralFromGroupExpressions) ::
- Batch("Operator Optimizations", FixedPoint(100),
+ Batch("Operator Optimizations", fixedPoint,
// Operator push down
SetOperationPushDown,
SamplePushDown,
@@ -95,11 +97,11 @@ abstract class Optimizer(
SimplifyCasts,
SimplifyCaseConversionExpressions,
EliminateSerialization) ::
- Batch("Decimal Optimizations", FixedPoint(100),
+ Batch("Decimal Optimizations", fixedPoint,
DecimalAggregates) ::
- Batch("Typed Filter Optimization", FixedPoint(100),
+ Batch("Typed Filter Optimization", fixedPoint,
EmbedSerializerInFilter) ::
- Batch("LocalRelation", FixedPoint(100),
+ Batch("LocalRelation", fixedPoint,
ConvertToLocalRelation) ::
Batch("Subquery", Once,
OptimizeSubqueries) :: Nil
@@ -117,15 +119,19 @@ abstract class Optimizer(
}
/**
- * Non-abstract representation of the standard Spark optimizing strategies
+ * An optimizer used in test code.
*
* To ensure extendability, we leave the standard rules in the abstract optimizer rules, while
* specific rules go to the subclasses
*/
-object DefaultOptimizer
- extends Optimizer(
- EmptyConf,
- new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, EmptyConf))
+object SimpleTestOptimizer extends SimpleTestOptimizer
+
+class SimpleTestOptimizer extends Optimizer(
+ new SessionCatalog(
+ new InMemoryCatalog,
+ EmptyFunctionRegistry,
+ new SimpleCatalystConf(caseSensitiveAnalysis = true)),
+ new SimpleCatalystConf(caseSensitiveAnalysis = true))
/**
* Pushes operations down into a Sample.
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
index cf26d4843d..faa90fb1c5 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
@@ -24,7 +24,7 @@ import org.scalatest.prop.GeneratorDrivenPropertyChecks
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.codegen._
-import org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer
+import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer
import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project}
import org.apache.spark.sql.types.DataType
import org.apache.spark.util.Utils
@@ -153,7 +153,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
expected: Any,
inputRow: InternalRow = EmptyRow): Unit = {
val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, OneRowRelation)
- val optimizedPlan = DefaultOptimizer.execute(plan)
+ val optimizedPlan = SimpleTestOptimizer.execute(plan)
checkEvaluationWithoutCodegen(optimizedPlan.expressions.head, expected, inputRow)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
index 27195d3458..452792d21c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
-import org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer
+import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer
import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project}
import org.apache.spark.sql.types._
@@ -151,7 +151,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
expression: Expression,
inputRow: InternalRow = EmptyRow): Unit = {
val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, OneRowRelation)
- val optimizedPlan = DefaultOptimizer.execute(plan)
+ val optimizedPlan = SimpleTestOptimizer.execute(plan)
checkNaNWithoutCodegen(optimizedPlan.expressions.head, inputRow)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala
index 55af6c5d6a..7112c033ea 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala
@@ -15,12 +15,9 @@
* limitations under the License.
*/
-package org.apache.spark.sql.catalyst
+package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.analysis.EmptyFunctionRegistry
-import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
@@ -40,10 +37,7 @@ class OptimizerExtendableSuite extends SparkFunSuite {
* This class represents a dummy extended optimizer that takes the batches of the
* Optimizer and adds custom ones.
*/
- class ExtendedOptimizer
- extends Optimizer(
- EmptyConf,
- new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, EmptyConf)) {
+ class ExtendedOptimizer extends SimpleTestOptimizer {
// rules set to DummyRule, would not be executed anyways
val myBatches: Seq[Batch] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index 8dfbba779d..08b2d7fcd4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -18,14 +18,16 @@
package org.apache.spark.sql.execution
import org.apache.spark.sql.ExperimentalMethods
-import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.internal.SQLConf
class SparkOptimizer(
- conf: CatalystConf,
- sessionCatalog: SessionCatalog,
- experimentalMethods: ExperimentalMethods) extends Optimizer(conf, sessionCatalog) {
+ catalog: SessionCatalog,
+ conf: SQLConf,
+ experimentalMethods: ExperimentalMethods)
+ extends Optimizer(catalog, conf) {
+
override def batches: Seq[Batch] = super.batches :+ Batch(
- "User Provided Optimizers", FixedPoint(100), experimentalMethods.extraOptimizations: _*)
+ "User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 20d9a28548..e58b7178e9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -51,6 +51,11 @@ object SQLConf {
}
+ val OPTIMIZER_MAX_ITERATIONS = SQLConfigBuilder("spark.sql.optimizer.maxIterations")
+ .doc("The max number of iterations the optimizer and analyzer runs")
+ .intConf
+ .createWithDefault(100)
+
val ALLOW_MULTIPLE_CONTEXTS = SQLConfigBuilder("spark.sql.allowMultipleContexts")
.doc("When set to true, creating multiple SQLContexts/HiveContexts is allowed. " +
"When set to false, only one SQLContext/HiveContext is allowed to be created " +
@@ -473,6 +478,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
/** ************************ Spark SQL Params/Hints ******************* */
+ def optimizerMaxIterations: Int = getConf(OPTIMIZER_MAX_ITERATIONS)
+
def checkpointLocation: String = getConf(CHECKPOINT_LOCATION)
def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 10497e4fdf..c30f879ded 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -80,7 +80,7 @@ private[sql] class SessionState(ctx: SQLContext) {
/**
* Logical query plan optimizer.
*/
- lazy val optimizer: Optimizer = new SparkOptimizer(conf, catalog, experimentalMethods)
+ lazy val optimizer: Optimizer = new SparkOptimizer(catalog, conf, experimentalMethods)
/**
* Parser that extracts expressions, plans, table identifiers etc. from SQL texts.