aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2014-09-30 11:15:38 -0700
committerMatei Zaharia <matei@databricks.com>2014-09-30 11:15:38 -0700
commitab6dd80ba0f7e1042ea270d10400109a467fe40e (patch)
treef18c310909d5abdad8c78f7d1957358d8af8fce6
parent157e7d0f62eaf016a0c3749065ddcec170540a36 (diff)
downloadspark-ab6dd80ba0f7e1042ea270d10400109a467fe40e.tar.gz
spark-ab6dd80ba0f7e1042ea270d10400109a467fe40e.tar.bz2
spark-ab6dd80ba0f7e1042ea270d10400109a467fe40e.zip
[SPARK-3356] [DOCS] Document when RDD elements' ordering within partitions is nondeterministic
As suggested by mateiz , and because it came up on the mailing list again last week, this attempts to document that ordering of elements is not guaranteed across RDD evaluations in groupBy, zip, and partition-wise RDD methods. Suggestions welcome about the wording, or other methods that need a note. Author: Sean Owen <sowen@cloudera.com> Closes #2508 from srowen/SPARK-3356 and squashes the following commits: b7c96fd [Sean Owen] Undo change to programming guide ad4aeec [Sean Owen] Don't mention ordering in partition-wise methods, reword description of ordering for zip methods per review, and add similar note to programming guide, which mentions groupByKey (but not zip methods) fce943b [Sean Owen] Note that ordering of elements is not guaranteed across RDD evaluations in groupBy, zip, and partition-wise RDD methods
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala20
-rw-r--r--docs/programming-guide.md2
3 files changed, 25 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 67833743f3..929ded58a3 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -420,6 +420,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
/**
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
+ * The ordering of elements within each group is not guaranteed, and may even differ
+ * each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
@@ -439,7 +441,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
- * resulting RDD with into `numPartitions` partitions.
+ * resulting RDD with into `numPartitions` partitions. The ordering of elements within
+ * each group is not guaranteed, and may even differ each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
@@ -535,7 +538,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
- * resulting RDD with the existing partitioner/parallelism level.
+ * resulting RDD with the existing partitioner/parallelism level. The ordering of elements
+ * within each group is not guaranteed, and may even differ each time the resulting RDD is
+ * evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index ba712c9d77..ab9e97c8fe 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -509,7 +509,8 @@ abstract class RDD[T: ClassTag](
/**
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements
- * mapping to that key.
+ * mapping to that key. The ordering of elements within each group is not guaranteed, and
+ * may even differ each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
@@ -520,7 +521,8 @@ abstract class RDD[T: ClassTag](
/**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
- * mapping to that key.
+ * mapping to that key. The ordering of elements within each group is not guaranteed, and
+ * may even differ each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
@@ -531,7 +533,8 @@ abstract class RDD[T: ClassTag](
/**
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements
- * mapping to that key.
+ * mapping to that key. The ordering of elements within each group is not guaranteed, and
+ * may even differ each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
@@ -1028,8 +1031,14 @@ abstract class RDD[T: ClassTag](
* Zips this RDD with its element indices. The ordering is first based on the partition index
* and then the ordering of items within each partition. So the first item in the first
* partition gets index 0, and the last item in the last partition receives the largest index.
+ *
* This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type.
* This method needs to trigger a spark job when this RDD contains more than one partitions.
+ *
+ * Note that some RDDs, such as those returned by groupBy(), do not guarantee order of
+ * elements in a partition. The index assigned to each element is therefore not guaranteed,
+ * and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee
+ * the same index assignments, you should sort the RDD with sortByKey() or save it to a file.
*/
def zipWithIndex(): RDD[(T, Long)] = new ZippedWithIndexRDD(this)
@@ -1037,6 +1046,11 @@ abstract class RDD[T: ClassTag](
* Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k,
* 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method
* won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].
+ *
+ * Note that some RDDs, such as those returned by groupBy(), do not guarantee order of
+ * elements in a partition. The unique ID assigned to each element is therefore not guaranteed,
+ * and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee
+ * the same index assignments, you should sort the RDD with sortByKey() or save it to a file.
*/
def zipWithUniqueId(): RDD[(T, Long)] = {
val n = this.partitions.size.toLong
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index 510b47a2aa..1d61a3c555 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -883,7 +883,7 @@ for details.
<tr>
<td> <b>groupByKey</b>([<i>numTasks</i>]) </td>
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable&lt;V&gt;) pairs. <br />
- <b>Note:</b> If you are grouping in order to perform an aggregation (such as a sum or
+ <b>Note:</b> If you are grouping in order to perform an aggregation (such as a sum or
average) over each key, using <code>reduceByKey</code> or <code>combineByKey</code> will yield much better
performance.
<br />