diff options
author | Ilya Ganelin <ilya.ganelin@capitalone.com> | 2015-03-30 11:52:02 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2015-03-30 11:54:01 +0100 |
commit | 4bdfb7bab3b9d20167571d9b6888a2a44d9d43fc (patch) | |
tree | d66ff730b271cf0de4fbc540beb7a3f6f3793f41 /docs | |
parent | de6733036e060e18b0d1f21f9365bda81132a1a2 (diff) | |
download | spark-4bdfb7bab3b9d20167571d9b6888a2a44d9d43fc.tar.gz spark-4bdfb7bab3b9d20167571d9b6888a2a44d9d43fc.tar.bz2 spark-4bdfb7bab3b9d20167571d9b6888a2a44d9d43fc.zip |
[SPARK-5750][SPARK-3441][SPARK-5836][CORE] Added documentation explaining shuffle
I've updated the Spark Programming Guide to add a section on the shuffle operation providing some background on what it does. I've also addressed some of its performance impacts.
I've included documentation to address the following issues:
https://issues.apache.org/jira/browse/SPARK-5836
https://issues.apache.org/jira/browse/SPARK-3441
https://issues.apache.org/jira/browse/SPARK-5750
https://issues.apache.org/jira/browse/SPARK-4227 is related but can be addressed in a separate PR since it involves updates to the Spark Configuration Guide.
Author: Ilya Ganelin <ilya.ganelin@capitalone.com>
Author: Ilya Ganelin <ilganeli@gmail.com>
Closes #5074 from ilganeli/SPARK-5750 and squashes the following commits:
6178e24 [Ilya Ganelin] Update programming-guide.md
7a0b96f [Ilya Ganelin] Update programming-guide.md
2c5df08 [Ilya Ganelin] Merge branch 'SPARK-5750' of github.com:ilganeli/spark into SPARK-5750
dffbd2d [Ilya Ganelin] [SPARK-5750] Slight wording update
1ff4eb4 [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-5750
85f9c6e [Ilya Ganelin] Update programming-guide.md
349d1fa [Ilya Ganelin] Added cross linkf or configuration page
eeb5a7a [Ilya Ganelin] [SPARK-5750] Added some minor fixes
dd5cc9d [Ilya Ganelin] [SPARK-5750] Fixed some factual inaccuracies with regards to shuffle internals.
a8adb57 [Ilya Ganelin] [SPARK-5750] Incoporated feedback from Sean Owen
9954bbe [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-5750
159dd1c [Ilya Ganelin] [SPARK-5750] Style fixes from rxin.
75ef67b [Ilya Ganelin] [SPARK-5750][SPARK-3441][SPARK-5836] Added documentation explaining the shuffle operation and included errata from a number of other JIRAs
Diffstat (limited to 'docs')
-rw-r--r-- | docs/programming-guide.md | 83 |
1 files changed, 72 insertions, 11 deletions
diff --git a/docs/programming-guide.md b/docs/programming-guide.md index f5b775da79..f4fabb0927 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -937,7 +937,7 @@ for details. <td> Similar to map, but each input item can be mapped to 0 or more output items (so <i>func</i> should return a Seq rather than a single item). </td> </tr> <tr> - <td> <b>mapPartitions</b>(<i>func</i>) </td> + <td> <b>mapPartitions</b>(<i>func</i>) <a name="MapPartLink"></a> </td> <td> Similar to map, but runs separately on each partition (block) of the RDD, so <i>func</i> must be of type Iterator<T> => Iterator<U> when running on an RDD of type T. </td> </tr> @@ -964,7 +964,7 @@ for details. <td> Return a new dataset that contains the distinct elements of the source dataset.</td> </tr> <tr> - <td> <b>groupByKey</b>([<i>numTasks</i>]) </td> + <td> <b>groupByKey</b>([<i>numTasks</i>]) <a name="GroupByLink"></a> </td> <td> When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. <br /> <b>Note:</b> If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using <code>reduceByKey</code> or <code>aggregateByKey</code> will yield much better @@ -975,25 +975,25 @@ for details. </td> </tr> <tr> - <td> <b>reduceByKey</b>(<i>func</i>, [<i>numTasks</i>]) </td> + <td> <b>reduceByKey</b>(<i>func</i>, [<i>numTasks</i>]) <a name="ReduceByLink"></a> </td> <td> When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function <i>func</i>, which must be of type (V,V) => V. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td> </tr> <tr> - <td> <b>aggregateByKey</b>(<i>zeroValue</i>)(<i>seqOp</i>, <i>combOp</i>, [<i>numTasks</i>]) </td> + <td> <b>aggregateByKey</b>(<i>zeroValue</i>)(<i>seqOp</i>, <i>combOp</i>, [<i>numTasks</i>]) <a name="AggregateByLink"></a> </td> <td> When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td> </tr> <tr> - <td> <b>sortByKey</b>([<i>ascending</i>], [<i>numTasks</i>]) </td> + <td> <b>sortByKey</b>([<i>ascending</i>], [<i>numTasks</i>]) <a name="SortByLink"></a> </td> <td> When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean <code>ascending</code> argument.</td> </tr> <tr> - <td> <b>join</b>(<i>otherDataset</i>, [<i>numTasks</i>]) </td> + <td> <b>join</b>(<i>otherDataset</i>, [<i>numTasks</i>]) <a name="JoinLink"></a> </td> <td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through <code>leftOuterJoin</code>, <code>rightOuterJoin</code>, and <code>fullOuterJoin</code>. </td> </tr> <tr> - <td> <b>cogroup</b>(<i>otherDataset</i>, [<i>numTasks</i>]) </td> + <td> <b>cogroup</b>(<i>otherDataset</i>, [<i>numTasks</i>]) <a name="CogroupLink"></a> </td> <td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called <code>groupWith</code>. </td> </tr> <tr> @@ -1006,17 +1006,17 @@ for details. process's stdin and lines output to its stdout are returned as an RDD of strings. </td> </tr> <tr> - <td> <b>coalesce</b>(<i>numPartitions</i>) </td> + <td> <b>coalesce</b>(<i>numPartitions</i>) <a name="CoalesceLink"></a> </td> <td> Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset. </td> </tr> <tr> <td> <b>repartition</b>(<i>numPartitions</i>) </td> <td> Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. - This always shuffles all data over the network. </td> + This always shuffles all data over the network. <a name="RepartitionLink"></a></td> </tr> <tr> - <td> <b>repartitionAndSortWithinPartitions</b>(<i>partitioner</i>) </td> + <td> <b>repartitionAndSortWithinPartitions</b>(<i>partitioner</i>) <a name="Repartition2Link"></a></td> <td> Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling <code>repartition</code> and then sorting within each partition because it can push the sorting down into the shuffle machinery. </td> @@ -1080,7 +1080,7 @@ for details. <code>SparkContext.objectFile()</code>. </td> </tr> <tr> - <td> <b>countByKey</b>() </td> + <td> <b>countByKey</b>() <a name="CountByLink"></a> </td> <td> Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key. </td> </tr> <tr> @@ -1090,6 +1090,67 @@ for details. </tr> </table> +### Shuffle operations + +Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark's +mechanism for re-distributing data so that is grouped differently across partitions. This typically +involves copying data across executors and machines, making the shuffle a complex and +costly operation. + +#### Background + +To understand what happens during the shuffle we can consider the example of the +[`reduceByKey`](#ReduceByLink) operation. The `reduceByKey` operation generates a new RDD where all +values for a single key are combined into a tuple - the key and the result of executing a reduce +function against all values associated with that key. The challenge is that not all values for a +single key necessarily reside on the same partition, or even the same machine, but they must be +co-located to compute the result. + +In Spark, data is generally not distributed across partitions to be in the necessary place for a +specific operation. During computations, a single task will operate on a single partition - thus, to +organize all the data for a single `reduceByKey` reduce task to execute, Spark needs to perform an +all-to-all operation. It must read from all partitions to find all the values for all keys, +and then bring together values across partitions to compute the final result for each key - +this is called the **shuffle**. + +Although the set of elements in each partition of newly shuffled data will be deterministic, and so +is the ordering of partitions themselves, the ordering of these elements is not. If one desires predictably +ordered data following shuffle then it's possible to use: + +* `mapPartitions` to sort each partition using, for example, `.sorted` +* `repartitionAndSortWithinPartitions` to efficiently sort partitions while simultaneously repartitioning +* `sortBy` to make a globally ordered RDD + +Operations which can cause a shuffle include **repartition** operations like +[`repartition`](#RepartitionLink), and [`coalesce`](#CoalesceLink), **'ByKey** operations +(except for counting) like [`groupByKey`](#GroupByLink) and [`reduceByKey`](#ReduceByLink), and +**join** operations like [`cogroup`](#CogroupLink) and [`join`](#JoinLink). + +#### Performance Impact +The **Shuffle** is an expensive operation since it involves disk I/O, data serialization, and +network I/O. To organize data for the shuffle, Spark generates sets of tasks - *map* tasks to +organize the data, and a set of *reduce* tasks to aggregate it. This nomenclature comes from +MapReduce and does not directly relate to Spark's `map` and `reduce` operations. + +Internally, results from individual map tasks are kept in memory until they can't fit. Then, these +are sorted based on the target partition and written to a single file. On the reduce side, tasks +read the relevant sorted blocks. + +Certain shuffle operations can consume significant amounts of heap memory since they employ +in-memory data structures to organize records before or after transferring them. Specifically, +`reduceByKey` and `aggregateByKey` create these structures on the map side and `'ByKey` operations +generate these on the reduce side. When data does not fit in memory Spark will spill these tables +to disk, incurring the additional overhead of disk I/O and increased garbage collection. + +Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files +are not cleaned up from Spark's temporary storage until Spark is stopped, which means that +long-running Spark jobs may consume available disk space. This is done so the shuffle doesn't need +to be re-computed if the lineage is re-computed. The temporary storage directory is specified by the +`spark.local.dir` configuration parameter when configuring the Spark context. + +Shuffle behavior can be tuned by adjusting a variety of configuration parameters. See the +'Shuffle Behavior' section within the [Spark Configuration Guide](configuration.html). + ## RDD Persistence One of the most important capabilities in Spark is *persisting* (or *caching*) a dataset in memory |