diff options
author | Reynold Xin <rxin@databricks.com> | 2015-07-15 14:52:02 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-07-15 14:52:02 -0700 |
commit | affbe329ae0100bd50a3c3fb081b0f2b07efce33 (patch) | |
tree | 66b453dd4d58de5bc8a58e8daf75529b9669135f | |
parent | 674eb2a4c3ff595760f990daf369ba75d2547593 (diff) | |
download | spark-affbe329ae0100bd50a3c3fb081b0f2b07efce33.tar.gz spark-affbe329ae0100bd50a3c3fb081b0f2b07efce33.tar.bz2 spark-affbe329ae0100bd50a3c3fb081b0f2b07efce33.zip |
[SPARK-9071][SQL] MonotonicallyIncreasingID and SparkPartitionID should be marked as nondeterministic.
I also took the chance to more explicitly define the semantics of deterministic.
Author: Reynold Xin <rxin@databricks.com>
Closes #7428 from rxin/non-deterministic and squashes the following commits:
a760827 [Reynold Xin] [SPARK-9071][SQL] MonotonicallyIncreasingID and SparkPartitionID should be marked as nondeterministic.
3 files changed, 14 insertions, 4 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 3f19ac2b59..7b37ae7335 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -61,9 +61,15 @@ abstract class Expression extends TreeNode[Expression] { def foldable: Boolean = false /** - * Returns true when the current expression always return the same result for fixed input values. + * Returns true when the current expression always return the same result for fixed inputs from + * children. + * + * Note that this means that an expression should be considered as non-deterministic if: + * - if it relies on some mutable internal state, or + * - if it relies on some implicit input that is not part of the children expression list. + * + * An example would be `SparkPartitionID` that relies on the partition id returned by TaskContext. */ - // TODO: Need to define explicit input values vs implicit input values. def deterministic: Boolean = true def nullable: Boolean diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala index 69a37750d7..fec403fe2d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala @@ -41,7 +41,9 @@ private[sql] case class MonotonicallyIncreasingID() extends LeafExpression { */ @transient private[this] var count: Long = 0L - @transient private lazy val partitionMask = TaskContext.getPartitionId.toLong << 33 + @transient private lazy val partitionMask = TaskContext.getPartitionId().toLong << 33 + + override def deterministic: Boolean = false override def nullable: Boolean = false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala index 5f1b514f2c..7c790c549a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala @@ -29,11 +29,13 @@ import org.apache.spark.sql.types.{IntegerType, DataType} */ private[sql] case object SparkPartitionID extends LeafExpression { + override def deterministic: Boolean = false + override def nullable: Boolean = false override def dataType: DataType = IntegerType - @transient private lazy val partitionId = TaskContext.getPartitionId + @transient private lazy val partitionId = TaskContext.getPartitionId() override def eval(input: InternalRow): Int = partitionId |