aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorWenchen Fan <cloud0fan@outlook.com>2015-07-20 09:42:18 -0700
committerReynold Xin <rxin@databricks.com>2015-07-20 09:42:18 -0700
commit04db58ae30d2f73af45b7e6813f97be62dc92095 (patch)
treec163ba9a2e8b4119d7e6585977e44a82fd7a49af /sql
parenta15ecd057a6226e5cf83ca05c46748624a1cfc8c (diff)
downloadspark-04db58ae30d2f73af45b7e6813f97be62dc92095.tar.gz
spark-04db58ae30d2f73af45b7e6813f97be62dc92095.tar.bz2
spark-04db58ae30d2f73af45b7e6813f97be62dc92095.zip
[SPARK-9186][SQL] make deterministic describing the tree rather than the expression
Author: Wenchen Fan <cloud0fan@outlook.com> Closes #7525 from cloud-fan/deterministic and squashes the following commits: 4189bfa [Wenchen Fan] make deterministic describing the tree rather than the expression
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala12
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala10
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala6
5 files changed, 22 insertions, 16 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index d0a1aa9a1e..da599b8963 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -65,10 +65,12 @@ abstract class Expression extends TreeNode[Expression] {
* Note that this means that an expression should be considered as non-deterministic if:
* - if it relies on some mutable internal state, or
* - if it relies on some implicit input that is not part of the children expression list.
+ * - if it has non-deterministic child or children.
*
* An example would be `SparkPartitionID` that relies on the partition id returned by TaskContext.
+ * By default leaf expressions are deterministic as Nil.forall(_.deterministic) returns true.
*/
- def deterministic: Boolean = true
+ def deterministic: Boolean = children.forall(_.deterministic)
def nullable: Boolean
@@ -183,6 +185,14 @@ trait Unevaluable { self: Expression =>
throw new UnsupportedOperationException(s"Cannot evaluate expression: $this")
}
+/**
+ * An expression that is nondeterministic.
+ */
+trait Nondeterministic { self: Expression =>
+
+ override def deterministic: Boolean = false
+}
+
/**
* A leaf expression, i.e. one without any child expressions.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
index 822898e561..aef24a5486 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
@@ -32,7 +32,9 @@ import org.apache.spark.util.random.XORShiftRandom
*
* Since this expression is stateful, it cannot be a case object.
*/
-abstract class RDG(seed: Long) extends LeafExpression with Serializable {
+abstract class RDG extends LeafExpression with Nondeterministic {
+
+ protected def seed: Long
/**
* Record ID within each partition. By being transient, the Random Number Generator is
@@ -40,15 +42,13 @@ abstract class RDG(seed: Long) extends LeafExpression with Serializable {
*/
@transient protected lazy val rng = new XORShiftRandom(seed + TaskContext.getPartitionId)
- override def deterministic: Boolean = false
-
override def nullable: Boolean = false
override def dataType: DataType = DoubleType
}
/** Generate a random column with i.i.d. uniformly distributed values in [0, 1). */
-case class Rand(seed: Long) extends RDG(seed) {
+case class Rand(seed: Long) extends RDG {
override def eval(input: InternalRow): Double = rng.nextDouble()
def this() = this(Utils.random.nextLong())
@@ -71,7 +71,7 @@ case class Rand(seed: Long) extends RDG(seed) {
}
/** Generate a random column with i.i.d. gaussian random distribution. */
-case class Randn(seed: Long) extends RDG(seed) {
+case class Randn(seed: Long) extends RDG {
override def eval(input: InternalRow): Double = rng.nextGaussian()
def this() = this(Utils.random.nextLong())
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 0f28a0d2c8..fafdae07c9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -216,9 +216,9 @@ object ProjectCollapsing extends Rule[LogicalPlan] {
// We only collapse these two Projects if their overlapped expressions are all
// deterministic.
- val hasNondeterministic = projectList1.flatMap(_.collect {
+ val hasNondeterministic = projectList1.exists(_.collect {
case a: Attribute if aliasMap.contains(a) => aliasMap(a).child
- }).exists(_.find(!_.deterministic).isDefined)
+ }.exists(!_.deterministic))
if (hasNondeterministic) {
p
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
index 4d8ed08973..2645eb1854 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.expressions
import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.LeafExpression
+import org.apache.spark.sql.catalyst.expressions.{Nondeterministic, LeafExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
import org.apache.spark.sql.types.{LongType, DataType}
@@ -33,7 +33,7 @@ import org.apache.spark.sql.types.{LongType, DataType}
*
* Since this expression is stateful, it cannot be a case object.
*/
-private[sql] case class MonotonicallyIncreasingID() extends LeafExpression {
+private[sql] case class MonotonicallyIncreasingID() extends LeafExpression with Nondeterministic {
/**
* Record ID within each partition. By being transient, count's value is reset to 0 every time
@@ -43,8 +43,6 @@ private[sql] case class MonotonicallyIncreasingID() extends LeafExpression {
@transient private lazy val partitionMask = TaskContext.getPartitionId().toLong << 33
- override def deterministic: Boolean = false
-
override def nullable: Boolean = false
override def dataType: DataType = LongType
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
index 43ffc9cc84..53ddd47e3e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.expressions
import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.LeafExpression
+import org.apache.spark.sql.catalyst.expressions.{Nondeterministic, LeafExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
import org.apache.spark.sql.types.{IntegerType, DataType}
@@ -27,9 +27,7 @@ import org.apache.spark.sql.types.{IntegerType, DataType}
/**
* Expression that returns the current partition id of the Spark task.
*/
-private[sql] case object SparkPartitionID extends LeafExpression {
-
- override def deterministic: Boolean = false
+private[sql] case object SparkPartitionID extends LeafExpression with Nondeterministic {
override def nullable: Boolean = false