diff options
author | Wenchen Fan <cloud0fan@outlook.com> | 2015-07-20 09:42:18 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-07-20 09:42:18 -0700 |
commit | 04db58ae30d2f73af45b7e6813f97be62dc92095 (patch) | |
tree | c163ba9a2e8b4119d7e6585977e44a82fd7a49af /sql | |
parent | a15ecd057a6226e5cf83ca05c46748624a1cfc8c (diff) | |
download | spark-04db58ae30d2f73af45b7e6813f97be62dc92095.tar.gz spark-04db58ae30d2f73af45b7e6813f97be62dc92095.tar.bz2 spark-04db58ae30d2f73af45b7e6813f97be62dc92095.zip |
[SPARK-9186][SQL] make deterministic describing the tree rather than the expression
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes #7525 from cloud-fan/deterministic and squashes the following commits:
4189bfa [Wenchen Fan] make deterministic describing the tree rather than the expression
Diffstat (limited to 'sql')
5 files changed, 22 insertions, 16 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index d0a1aa9a1e..da599b8963 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -65,10 +65,12 @@ abstract class Expression extends TreeNode[Expression] { * Note that this means that an expression should be considered as non-deterministic if: * - if it relies on some mutable internal state, or * - if it relies on some implicit input that is not part of the children expression list. + * - if it has non-deterministic child or children. * * An example would be `SparkPartitionID` that relies on the partition id returned by TaskContext. + * By default leaf expressions are deterministic as Nil.forall(_.deterministic) returns true. */ - def deterministic: Boolean = true + def deterministic: Boolean = children.forall(_.deterministic) def nullable: Boolean @@ -183,6 +185,14 @@ trait Unevaluable { self: Expression => throw new UnsupportedOperationException(s"Cannot evaluate expression: $this") } +/** + * An expression that is nondeterministic. + */ +trait Nondeterministic { self: Expression => + + override def deterministic: Boolean = false +} + /** * A leaf expression, i.e. one without any child expressions. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala index 822898e561..aef24a5486 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala @@ -32,7 +32,9 @@ import org.apache.spark.util.random.XORShiftRandom * * Since this expression is stateful, it cannot be a case object. */ -abstract class RDG(seed: Long) extends LeafExpression with Serializable { +abstract class RDG extends LeafExpression with Nondeterministic { + + protected def seed: Long /** * Record ID within each partition. By being transient, the Random Number Generator is @@ -40,15 +42,13 @@ abstract class RDG(seed: Long) extends LeafExpression with Serializable { */ @transient protected lazy val rng = new XORShiftRandom(seed + TaskContext.getPartitionId) - override def deterministic: Boolean = false - override def nullable: Boolean = false override def dataType: DataType = DoubleType } /** Generate a random column with i.i.d. uniformly distributed values in [0, 1). */ -case class Rand(seed: Long) extends RDG(seed) { +case class Rand(seed: Long) extends RDG { override def eval(input: InternalRow): Double = rng.nextDouble() def this() = this(Utils.random.nextLong()) @@ -71,7 +71,7 @@ case class Rand(seed: Long) extends RDG(seed) { } /** Generate a random column with i.i.d. gaussian random distribution. */ -case class Randn(seed: Long) extends RDG(seed) { +case class Randn(seed: Long) extends RDG { override def eval(input: InternalRow): Double = rng.nextGaussian() def this() = this(Utils.random.nextLong()) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0f28a0d2c8..fafdae07c9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -216,9 +216,9 @@ object ProjectCollapsing extends Rule[LogicalPlan] { // We only collapse these two Projects if their overlapped expressions are all // deterministic. - val hasNondeterministic = projectList1.flatMap(_.collect { + val hasNondeterministic = projectList1.exists(_.collect { case a: Attribute if aliasMap.contains(a) => aliasMap(a).child - }).exists(_.find(!_.deterministic).isDefined) + }.exists(!_.deterministic)) if (hasNondeterministic) { p diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala index 4d8ed08973..2645eb1854 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.expressions import org.apache.spark.TaskContext import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.LeafExpression +import org.apache.spark.sql.catalyst.expressions.{Nondeterministic, LeafExpression} import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext} import org.apache.spark.sql.types.{LongType, DataType} @@ -33,7 +33,7 @@ import org.apache.spark.sql.types.{LongType, DataType} * * Since this expression is stateful, it cannot be a case object. */ -private[sql] case class MonotonicallyIncreasingID() extends LeafExpression { +private[sql] case class MonotonicallyIncreasingID() extends LeafExpression with Nondeterministic { /** * Record ID within each partition. By being transient, count's value is reset to 0 every time @@ -43,8 +43,6 @@ private[sql] case class MonotonicallyIncreasingID() extends LeafExpression { @transient private lazy val partitionMask = TaskContext.getPartitionId().toLong << 33 - override def deterministic: Boolean = false - override def nullable: Boolean = false override def dataType: DataType = LongType diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala index 43ffc9cc84..53ddd47e3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.expressions import org.apache.spark.TaskContext import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.LeafExpression +import org.apache.spark.sql.catalyst.expressions.{Nondeterministic, LeafExpression} import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext} import org.apache.spark.sql.types.{IntegerType, DataType} @@ -27,9 +27,7 @@ import org.apache.spark.sql.types.{IntegerType, DataType} /** * Expression that returns the current partition id of the Spark task. */ -private[sql] case object SparkPartitionID extends LeafExpression { - - override def deterministic: Boolean = false +private[sql] case object SparkPartitionID extends LeafExpression with Nondeterministic { override def nullable: Boolean = false |