aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2015-08-04 10:07:53 -0700
committerMichael Armbrust <michael@databricks.com>2015-08-04 10:07:53 -0700
commit34a0eb2e89d59b0823efc035ddf2dc93f19540c1 (patch)
treedbbe8f18d9e49e3d749f17864c2b7a913e00d61b /sql
parent6a0f8b994de36b7a7bdfb9958d39dbd011776107 (diff)
downloadspark-34a0eb2e89d59b0823efc035ddf2dc93f19540c1.tar.gz
spark-34a0eb2e89d59b0823efc035ddf2dc93f19540c1.tar.bz2
spark-34a0eb2e89d59b0823efc035ddf2dc93f19540c1.zip
[SPARK-9512][SQL] Revert SPARK-9251, Allow evaluation while sorting
The analysis rule has a bug and we ended up making the sorter still capable of doing evaluation, so lets revert this for now. Author: Michael Armbrust <michael@databricks.com> Closes #7906 from marmbrus/revertSortProjection and squashes the following commits: 2da6972 [Michael Armbrust] unrevert unrelated changes 4f2b00c [Michael Armbrust] Revert "[SPARK-9251][SQL] do not order by expressions which still need evaluation"
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala58
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala13
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala36
3 files changed, 8 insertions, 99 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index f5daba1543..ca17f3e3d0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -79,7 +79,6 @@ class Analyzer(
ExtractWindowExpressions ::
GlobalAggregates ::
UnresolvedHavingClauseAttributes ::
- RemoveEvaluationFromSort ::
HiveTypeCoercion.typeCoercionRules ++
extendedResolutionRules : _*),
Batch("Nondeterministic", Once,
@@ -955,63 +954,6 @@ class Analyzer(
Project(p.output, newPlan.withNewChildren(newChild :: Nil))
}
}
-
- /**
- * Removes all still-need-evaluate ordering expressions from sort and use an inner project to
- * materialize them, finally use a outer project to project them away to keep the result same.
- * Then we can make sure we only sort by [[AttributeReference]]s.
- *
- * As an example,
- * {{{
- * Sort('a, 'b + 1,
- * Relation('a, 'b))
- * }}}
- * will be turned into:
- * {{{
- * Project('a, 'b,
- * Sort('a, '_sortCondition,
- * Project('a, 'b, ('b + 1).as("_sortCondition"),
- * Relation('a, 'b))))
- * }}}
- */
- object RemoveEvaluationFromSort extends Rule[LogicalPlan] {
- private def hasAlias(expr: Expression) = {
- expr.find {
- case a: Alias => true
- case _ => false
- }.isDefined
- }
-
- override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- // The ordering expressions have no effect to the output schema of `Sort`,
- // so `Alias`s in ordering expressions are unnecessary and we should remove them.
- case s @ Sort(ordering, _, _) if ordering.exists(hasAlias) =>
- val newOrdering = ordering.map(_.transformUp {
- case Alias(child, _) => child
- }.asInstanceOf[SortOrder])
- s.copy(order = newOrdering)
-
- case s @ Sort(ordering, global, child)
- if s.expressions.forall(_.resolved) && s.childrenResolved && !s.hasNoEvaluation =>
-
- val (ref, needEval) = ordering.partition(_.child.isInstanceOf[AttributeReference])
-
- val namedExpr = needEval.map(_.child match {
- case n: NamedExpression => n
- case e => Alias(e, "_sortCondition")()
- })
-
- val newOrdering = ref ++ needEval.zip(namedExpr).map { case (order, ne) =>
- order.copy(child = ne.toAttribute)
- }
-
- // Add still-need-evaluate ordering expressions into inner project and then project
- // them away after the sort.
- Project(child.output,
- Sort(newOrdering, global,
- Project(child.output ++ namedExpr, child)))
- }
- }
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index aacfc86ab0..7c404722d8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -34,7 +34,7 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend
}.nonEmpty
)
- expressions.forall(_.resolved) && childrenResolved && !hasSpecialExpressions
+ !expressions.exists(!_.resolved) && childrenResolved && !hasSpecialExpressions
}
}
@@ -68,7 +68,7 @@ case class Generate(
generator.resolved &&
childrenResolved &&
generator.elementTypes.length == generatorOutput.length &&
- generatorOutput.forall(_.resolved)
+ !generatorOutput.exists(!_.resolved)
}
// we don't want the gOutput to be taken as part of the expressions
@@ -188,7 +188,7 @@ case class WithWindowDefinition(
}
/**
- * @param order The ordering expressions, should all be [[AttributeReference]]
+ * @param order The ordering expressions
* @param global True means global sorting apply for entire data set,
* False means sorting only apply within the partition.
* @param child Child logical plan
@@ -198,11 +198,6 @@ case class Sort(
global: Boolean,
child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
-
- def hasNoEvaluation: Boolean = order.forall(_.child.isInstanceOf[AttributeReference])
-
- override lazy val resolved: Boolean =
- expressions.forall(_.resolved) && childrenResolved && hasNoEvaluation
}
case class Aggregate(
@@ -217,7 +212,7 @@ case class Aggregate(
}.nonEmpty
)
- expressions.forall(_.resolved) && childrenResolved && !hasWindowExpressions
+ !expressions.exists(!_.resolved) && childrenResolved && !hasWindowExpressions
}
lazy val newAggregation: Option[Aggregate] = Utils.tryConvert(this)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index a86cefe941..221b4e92f0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -165,39 +165,11 @@ class AnalysisSuite extends AnalysisTest {
test("pull out nondeterministic expressions from Sort") {
val plan = Sort(Seq(SortOrder(Rand(33), Ascending)), false, testRelation)
- val analyzed = caseSensitiveAnalyzer.execute(plan)
- analyzed.transform {
- case s: Sort if s.expressions.exists(!_.deterministic) =>
- fail("nondeterministic expressions are not allowed in Sort")
- }
- }
-
- test("remove still-need-evaluate ordering expressions from sort") {
- val a = testRelation2.output(0)
- val b = testRelation2.output(1)
-
- def makeOrder(e: Expression): SortOrder = SortOrder(e, Ascending)
-
- val noEvalOrdering = makeOrder(a)
- val noEvalOrderingWithAlias = makeOrder(Alias(Alias(b, "name1")(), "name2")())
-
- val needEvalExpr = Coalesce(Seq(a, Literal("1")))
- val needEvalExpr2 = Coalesce(Seq(a, b))
- val needEvalOrdering = makeOrder(needEvalExpr)
- val needEvalOrdering2 = makeOrder(needEvalExpr2)
-
- val plan = Sort(
- Seq(noEvalOrdering, noEvalOrderingWithAlias, needEvalOrdering, needEvalOrdering2),
- false, testRelation2)
-
- val evaluatedOrdering = makeOrder(AttributeReference("_sortCondition", StringType)())
- val materializedExprs = Seq(needEvalExpr, needEvalExpr2).map(e => Alias(e, "_sortCondition")())
-
+ val projected = Alias(Rand(33), "_nondeterministic")()
val expected =
- Project(testRelation2.output,
- Sort(Seq(makeOrder(a), makeOrder(b), evaluatedOrdering, evaluatedOrdering), false,
- Project(testRelation2.output ++ materializedExprs, testRelation2)))
-
+ Project(testRelation.output,
+ Sort(Seq(SortOrder(projected.toAttribute, Ascending)), false,
+ Project(testRelation.output :+ projected, testRelation)))
checkAnalysis(plan, expected)
}
}