aboutsummaryrefslogtreecommitdiff
path: root/docs/programming-guide.md
diff options
context:
space:
mode:
authorIlya Ganelin <ilya.ganelin@capitalone.com>2015-03-30 11:52:02 +0100
committerSean Owen <sowen@cloudera.com>2015-03-30 11:54:01 +0100
commit4bdfb7bab3b9d20167571d9b6888a2a44d9d43fc (patch)
treed66ff730b271cf0de4fbc540beb7a3f6f3793f41 /docs/programming-guide.md
parentde6733036e060e18b0d1f21f9365bda81132a1a2 (diff)
downloadspark-4bdfb7bab3b9d20167571d9b6888a2a44d9d43fc.tar.gz
spark-4bdfb7bab3b9d20167571d9b6888a2a44d9d43fc.tar.bz2
spark-4bdfb7bab3b9d20167571d9b6888a2a44d9d43fc.zip
[SPARK-5750][SPARK-3441][SPARK-5836][CORE] Added documentation explaining shuffle
I've updated the Spark Programming Guide to add a section on the shuffle operation providing some background on what it does. I've also addressed some of its performance impacts. I've included documentation to address the following issues: https://issues.apache.org/jira/browse/SPARK-5836 https://issues.apache.org/jira/browse/SPARK-3441 https://issues.apache.org/jira/browse/SPARK-5750 https://issues.apache.org/jira/browse/SPARK-4227 is related but can be addressed in a separate PR since it involves updates to the Spark Configuration Guide. Author: Ilya Ganelin <ilya.ganelin@capitalone.com> Author: Ilya Ganelin <ilganeli@gmail.com> Closes #5074 from ilganeli/SPARK-5750 and squashes the following commits: 6178e24 [Ilya Ganelin] Update programming-guide.md 7a0b96f [Ilya Ganelin] Update programming-guide.md 2c5df08 [Ilya Ganelin] Merge branch 'SPARK-5750' of github.com:ilganeli/spark into SPARK-5750 dffbd2d [Ilya Ganelin] [SPARK-5750] Slight wording update 1ff4eb4 [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-5750 85f9c6e [Ilya Ganelin] Update programming-guide.md 349d1fa [Ilya Ganelin] Added cross linkf or configuration page eeb5a7a [Ilya Ganelin] [SPARK-5750] Added some minor fixes dd5cc9d [Ilya Ganelin] [SPARK-5750] Fixed some factual inaccuracies with regards to shuffle internals. a8adb57 [Ilya Ganelin] [SPARK-5750] Incoporated feedback from Sean Owen 9954bbe [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-5750 159dd1c [Ilya Ganelin] [SPARK-5750] Style fixes from rxin. 75ef67b [Ilya Ganelin] [SPARK-5750][SPARK-3441][SPARK-5836] Added documentation explaining the shuffle operation and included errata from a number of other JIRAs
Diffstat (limited to 'docs/programming-guide.md')
-rw-r--r--docs/programming-guide.md83
1 files changed, 72 insertions, 11 deletions
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index f5b775da79..f4fabb0927 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -937,7 +937,7 @@ for details.
<td> Similar to map, but each input item can be mapped to 0 or more output items (so <i>func</i> should return a Seq rather than a single item). </td>
</tr>
<tr>
- <td> <b>mapPartitions</b>(<i>func</i>) </td>
+ <td> <b>mapPartitions</b>(<i>func</i>) <a name="MapPartLink"></a> </td>
<td> Similar to map, but runs separately on each partition (block) of the RDD, so <i>func</i> must be of type
Iterator&lt;T&gt; => Iterator&lt;U&gt; when running on an RDD of type T. </td>
</tr>
@@ -964,7 +964,7 @@ for details.
<td> Return a new dataset that contains the distinct elements of the source dataset.</td>
</tr>
<tr>
- <td> <b>groupByKey</b>([<i>numTasks</i>]) </td>
+ <td> <b>groupByKey</b>([<i>numTasks</i>]) <a name="GroupByLink"></a> </td>
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable&lt;V&gt;) pairs. <br />
<b>Note:</b> If you are grouping in order to perform an aggregation (such as a sum or
average) over each key, using <code>reduceByKey</code> or <code>aggregateByKey</code> will yield much better
@@ -975,25 +975,25 @@ for details.
</td>
</tr>
<tr>
- <td> <b>reduceByKey</b>(<i>func</i>, [<i>numTasks</i>]) </td>
+ <td> <b>reduceByKey</b>(<i>func</i>, [<i>numTasks</i>]) <a name="ReduceByLink"></a> </td>
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function <i>func</i>, which must be of type (V,V) => V. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td>
</tr>
<tr>
- <td> <b>aggregateByKey</b>(<i>zeroValue</i>)(<i>seqOp</i>, <i>combOp</i>, [<i>numTasks</i>]) </td>
+ <td> <b>aggregateByKey</b>(<i>zeroValue</i>)(<i>seqOp</i>, <i>combOp</i>, [<i>numTasks</i>]) <a name="AggregateByLink"></a> </td>
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td>
</tr>
<tr>
- <td> <b>sortByKey</b>([<i>ascending</i>], [<i>numTasks</i>]) </td>
+ <td> <b>sortByKey</b>([<i>ascending</i>], [<i>numTasks</i>]) <a name="SortByLink"></a> </td>
<td> When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean <code>ascending</code> argument.</td>
</tr>
<tr>
- <td> <b>join</b>(<i>otherDataset</i>, [<i>numTasks</i>]) </td>
+ <td> <b>join</b>(<i>otherDataset</i>, [<i>numTasks</i>]) <a name="JoinLink"></a> </td>
<td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.
Outer joins are supported through <code>leftOuterJoin</code>, <code>rightOuterJoin</code>, and <code>fullOuterJoin</code>.
</td>
</tr>
<tr>
- <td> <b>cogroup</b>(<i>otherDataset</i>, [<i>numTasks</i>]) </td>
+ <td> <b>cogroup</b>(<i>otherDataset</i>, [<i>numTasks</i>]) <a name="CogroupLink"></a> </td>
<td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable&lt;V&gt;, Iterable&lt;W&gt;)) tuples. This operation is also called <code>groupWith</code>. </td>
</tr>
<tr>
@@ -1006,17 +1006,17 @@ for details.
process's stdin and lines output to its stdout are returned as an RDD of strings. </td>
</tr>
<tr>
- <td> <b>coalesce</b>(<i>numPartitions</i>) </td>
+ <td> <b>coalesce</b>(<i>numPartitions</i>) <a name="CoalesceLink"></a> </td>
<td> Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently
after filtering down a large dataset. </td>
</tr>
<tr>
<td> <b>repartition</b>(<i>numPartitions</i>) </td>
<td> Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them.
- This always shuffles all data over the network. </td>
+ This always shuffles all data over the network. <a name="RepartitionLink"></a></td>
</tr>
<tr>
- <td> <b>repartitionAndSortWithinPartitions</b>(<i>partitioner</i>) </td>
+ <td> <b>repartitionAndSortWithinPartitions</b>(<i>partitioner</i>) <a name="Repartition2Link"></a></td>
<td> Repartition the RDD according to the given partitioner and, within each resulting partition,
sort records by their keys. This is more efficient than calling <code>repartition</code> and then sorting within
each partition because it can push the sorting down into the shuffle machinery. </td>
@@ -1080,7 +1080,7 @@ for details.
<code>SparkContext.objectFile()</code>. </td>
</tr>
<tr>
- <td> <b>countByKey</b>() </td>
+ <td> <b>countByKey</b>() <a name="CountByLink"></a> </td>
<td> Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key. </td>
</tr>
<tr>
@@ -1090,6 +1090,67 @@ for details.
</tr>
</table>
+### Shuffle operations
+
+Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark's
+mechanism for re-distributing data so that is grouped differently across partitions. This typically
+involves copying data across executors and machines, making the shuffle a complex and
+costly operation.
+
+#### Background
+
+To understand what happens during the shuffle we can consider the example of the
+[`reduceByKey`](#ReduceByLink) operation. The `reduceByKey` operation generates a new RDD where all
+values for a single key are combined into a tuple - the key and the result of executing a reduce
+function against all values associated with that key. The challenge is that not all values for a
+single key necessarily reside on the same partition, or even the same machine, but they must be
+co-located to compute the result.
+
+In Spark, data is generally not distributed across partitions to be in the necessary place for a
+specific operation. During computations, a single task will operate on a single partition - thus, to
+organize all the data for a single `reduceByKey` reduce task to execute, Spark needs to perform an
+all-to-all operation. It must read from all partitions to find all the values for all keys,
+and then bring together values across partitions to compute the final result for each key -
+this is called the **shuffle**.
+
+Although the set of elements in each partition of newly shuffled data will be deterministic, and so
+is the ordering of partitions themselves, the ordering of these elements is not. If one desires predictably
+ordered data following shuffle then it's possible to use:
+
+* `mapPartitions` to sort each partition using, for example, `.sorted`
+* `repartitionAndSortWithinPartitions` to efficiently sort partitions while simultaneously repartitioning
+* `sortBy` to make a globally ordered RDD
+
+Operations which can cause a shuffle include **repartition** operations like
+[`repartition`](#RepartitionLink), and [`coalesce`](#CoalesceLink), **'ByKey** operations
+(except for counting) like [`groupByKey`](#GroupByLink) and [`reduceByKey`](#ReduceByLink), and
+**join** operations like [`cogroup`](#CogroupLink) and [`join`](#JoinLink).
+
+#### Performance Impact
+The **Shuffle** is an expensive operation since it involves disk I/O, data serialization, and
+network I/O. To organize data for the shuffle, Spark generates sets of tasks - *map* tasks to
+organize the data, and a set of *reduce* tasks to aggregate it. This nomenclature comes from
+MapReduce and does not directly relate to Spark's `map` and `reduce` operations.
+
+Internally, results from individual map tasks are kept in memory until they can't fit. Then, these
+are sorted based on the target partition and written to a single file. On the reduce side, tasks
+read the relevant sorted blocks.
+
+Certain shuffle operations can consume significant amounts of heap memory since they employ
+in-memory data structures to organize records before or after transferring them. Specifically,
+`reduceByKey` and `aggregateByKey` create these structures on the map side and `'ByKey` operations
+generate these on the reduce side. When data does not fit in memory Spark will spill these tables
+to disk, incurring the additional overhead of disk I/O and increased garbage collection.
+
+Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files
+are not cleaned up from Spark's temporary storage until Spark is stopped, which means that
+long-running Spark jobs may consume available disk space. This is done so the shuffle doesn't need
+to be re-computed if the lineage is re-computed. The temporary storage directory is specified by the
+`spark.local.dir` configuration parameter when configuring the Spark context.
+
+Shuffle behavior can be tuned by adjusting a variety of configuration parameters. See the
+'Shuffle Behavior' section within the [Spark Configuration Guide](configuration.html).
+
## RDD Persistence
One of the most important capabilities in Spark is *persisting* (or *caching*) a dataset in memory