aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2016-11-02 11:41:49 -0700
committerReynold Xin <rxin@databricks.com>2016-11-02 11:41:49 -0700
commit02f203107b8eda1f1576e36c4f12b0e3bc5e910e (patch)
tree46bba2bdb7045caa74ecd9dd3a6a05b3972e69f2 /core/src
parent742e0fea5391857964e90d396641ecf95cac4248 (diff)
downloadspark-02f203107b8eda1f1576e36c4f12b0e3bc5e910e.tar.gz
spark-02f203107b8eda1f1576e36c4f12b0e3bc5e910e.tar.bz2
spark-02f203107b8eda1f1576e36c4f12b0e3bc5e910e.zip
[SPARK-14393][SQL] values generated by non-deterministic functions shouldn't change after coalesce or union
## What changes were proposed in this pull request? When a user appended a column using a "nondeterministic" function to a DataFrame, e.g., `rand`, `randn`, and `monotonically_increasing_id`, the expected semantic is the following: - The value in each row should remain unchanged, as if we materialize the column immediately, regardless of later DataFrame operations. However, since we use `TaskContext.getPartitionId` to get the partition index from the current thread, the values from nondeterministic columns might change if we call `union` or `coalesce` after. `TaskContext.getPartitionId` returns the partition index of the current Spark task, which might not be the corresponding partition index of the DataFrame where we defined the column. See the unit tests below or JIRA for examples. This PR uses the partition index from `RDD.mapPartitionWithIndex` instead of `TaskContext` and fixes the partition initialization logic in whole-stage codegen, normal codegen, and codegen fallback. `initializeStatesForPartition(partitionIndex: Int)` was added to `Projection`, `Nondeterministic`, and `Predicate` (codegen) and initialized right after object creation in `mapPartitionWithIndex`. `newPredicate` now returns a `Predicate` instance rather than a function for proper initialization. ## How was this patch tested? Unit tests. (Actually I'm not very confident that this PR fixed all issues without introducing new ones ...) cc: rxin davies Author: Xiangrui Meng <meng@databricks.com> Closes #15567 from mengxr/SPARK-14393.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala16
1 files changed, 14 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index db535de9e9..e018af35cb 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -788,14 +788,26 @@ abstract class RDD[T: ClassTag](
}
/**
- * [performance] Spark's internal mapPartitions method which skips closure cleaning. It is a
- * performance API to be used carefully only if we are sure that the RDD elements are
+ * [performance] Spark's internal mapPartitionsWithIndex method that skips closure cleaning.
+ * It is a performance API to be used carefully only if we are sure that the RDD elements are
* serializable and don't require closure cleaning.
*
* @param preservesPartitioning indicates whether the input function preserves the partitioner,
* which should be `false` unless this is a pair RDD and the input function doesn't modify
* the keys.
*/
+ private[spark] def mapPartitionsWithIndexInternal[U: ClassTag](
+ f: (Int, Iterator[T]) => Iterator[U],
+ preservesPartitioning: Boolean = false): RDD[U] = withScope {
+ new MapPartitionsRDD(
+ this,
+ (context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter),
+ preservesPartitioning)
+ }
+
+ /**
+ * [performance] Spark's internal mapPartitions method that skips closure cleaning.
+ */
private[spark] def mapPartitionsInternal[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {