aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-07-15 14:52:02 -0700
committerReynold Xin <rxin@databricks.com>2015-07-15 14:52:02 -0700
commitaffbe329ae0100bd50a3c3fb081b0f2b07efce33 (patch)
tree66b453dd4d58de5bc8a58e8daf75529b9669135f
parent674eb2a4c3ff595760f990daf369ba75d2547593 (diff)
downloadspark-affbe329ae0100bd50a3c3fb081b0f2b07efce33.tar.gz
spark-affbe329ae0100bd50a3c3fb081b0f2b07efce33.tar.bz2
spark-affbe329ae0100bd50a3c3fb081b0f2b07efce33.zip
[SPARK-9071][SQL] MonotonicallyIncreasingID and SparkPartitionID should be marked as nondeterministic.
I also took the chance to more explicitly define the semantics of deterministic. Author: Reynold Xin <rxin@databricks.com> Closes #7428 from rxin/non-deterministic and squashes the following commits: a760827 [Reynold Xin] [SPARK-9071][SQL] MonotonicallyIncreasingID and SparkPartitionID should be marked as nondeterministic.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala4
3 files changed, 14 insertions, 4 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index 3f19ac2b59..7b37ae7335 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -61,9 +61,15 @@ abstract class Expression extends TreeNode[Expression] {
def foldable: Boolean = false
/**
- * Returns true when the current expression always return the same result for fixed input values.
+ * Returns true when the current expression always return the same result for fixed inputs from
+ * children.
+ *
+ * Note that this means that an expression should be considered as non-deterministic if:
+ * - if it relies on some mutable internal state, or
+ * - if it relies on some implicit input that is not part of the children expression list.
+ *
+ * An example would be `SparkPartitionID` that relies on the partition id returned by TaskContext.
*/
- // TODO: Need to define explicit input values vs implicit input values.
def deterministic: Boolean = true
def nullable: Boolean
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
index 69a37750d7..fec403fe2d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
@@ -41,7 +41,9 @@ private[sql] case class MonotonicallyIncreasingID() extends LeafExpression {
*/
@transient private[this] var count: Long = 0L
- @transient private lazy val partitionMask = TaskContext.getPartitionId.toLong << 33
+ @transient private lazy val partitionMask = TaskContext.getPartitionId().toLong << 33
+
+ override def deterministic: Boolean = false
override def nullable: Boolean = false
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
index 5f1b514f2c..7c790c549a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
@@ -29,11 +29,13 @@ import org.apache.spark.sql.types.{IntegerType, DataType}
*/
private[sql] case object SparkPartitionID extends LeafExpression {
+ override def deterministic: Boolean = false
+
override def nullable: Boolean = false
override def dataType: DataType = IntegerType
- @transient private lazy val partitionId = TaskContext.getPartitionId
+ @transient private lazy val partitionId = TaskContext.getPartitionId()
override def eval(input: InternalRow): Int = partitionId