aboutsummaryrefslogtreecommitdiff
path: root/docs/programming-guide.md
diff options
context:
space:
mode:
authorIlya Ganelin <ilya.ganelin@capitalone.com>2015-03-11 13:20:15 +0000
committerSean Owen <sowen@cloudera.com>2015-03-11 13:20:15 +0000
commit548643a9e4690b69e2a496cdcd0a426b6de8d8b5 (patch)
treea53a56c05984859125c045677871a79c1de0f977 /docs/programming-guide.md
parent5b335bdda3efb7c6a5b18b4eeff189064c11e6c3 (diff)
downloadspark-548643a9e4690b69e2a496cdcd0a426b6de8d8b5.tar.gz
spark-548643a9e4690b69e2a496cdcd0a426b6de8d8b5.tar.bz2
spark-548643a9e4690b69e2a496cdcd0a426b6de8d8b5.zip
[SPARK-4423] Improve foreach() documentation to avoid confusion between local- and cluster-mode behavior
Hi all - I've added a writeup on how closures work within Spark to help clarify the general case for this problem and similar problems. I hope this addresses the issue and would love any feedback. Author: Ilya Ganelin <ilya.ganelin@capitalone.com> Closes #4696 from ilganeli/SPARK-4423 and squashes the following commits: c5dc498 [Ilya Ganelin] Fixed typo 07b78e8 [Ilya Ganelin] Updated to fix capitalization 48c1983 [Ilya Ganelin] Updated to fix capitalization and clarify wording 2fd2a07 [Ilya Ganelin] Incoporated a few more minor fixes. Fixed a bug in python code. Added semicolons for java 4772f99 [Ilya Ganelin] Incorporated latest feedback 448bd79 [Ilya Ganelin] Updated some verbage and added section links 5dbbda5 [Ilya Ganelin] Improved some wording d374d3a [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-4423 2600668 [Ilya Ganelin] Minor edits c768ab2 [Ilya Ganelin] Updated documentation to add a section on closures. This helps understand confusing behavior of foreach and map functions when attempting to modify variables outside of the scope of an RDD action or transformation
Diffstat (limited to 'docs/programming-guide.md')
-rw-r--r--docs/programming-guide.md72
1 files changed, 68 insertions, 4 deletions
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index fa0b4e3705..c011a8404f 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -725,7 +725,7 @@ class MyClass(object):
def __init__(self):
self.field = "Hello"
def doStuff(self, rdd):
- return rdd.map(lambda s: self.field + x)
+ return rdd.map(lambda s: self.field + s)
{% endhighlight %}
To avoid this issue, the simplest way is to copy `field` into a local variable instead
@@ -734,13 +734,76 @@ of accessing it externally:
{% highlight python %}
def doStuff(self, rdd):
field = self.field
- return rdd.map(lambda s: field + x)
+ return rdd.map(lambda s: field + s)
{% endhighlight %}
</div>
</div>
+### Understanding closures <a name="ClosuresLink"></a>
+One of the harder things about Spark is understanding the scope and life cycle of variables and methods when executing code across a cluster. RDD operations that modify variables outside of their scope can be a frequent source of confusion. In the example below we'll look at code that uses `foreach()` to increment a counter, but similar issues can occur for other operations as well.
+
+#### Example
+
+Consider the naive RDD element sum below, which behaves completely differently depending on whether execution is happening within the same JVM. A common example of this is when running Spark in `local` mode (`--master = local[n]`) versus deploying a Spark application to a cluster (e.g. via spark-submit to YARN):
+
+<div class="codetabs">
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+var counter = 0
+var rdd = sc.parallelize(data)
+
+// Wrong: Don't do this!!
+rdd.foreach(x => counter += x)
+
+println("Counter value: " + counter)
+{% endhighlight %}
+</div>
+
+<div data-lang="java" markdown="1">
+{% highlight java %}
+int counter = 0;
+JavaRDD<Integer> rdd = sc.parallelize(data);
+
+// Wrong: Don't do this!!
+rdd.foreach(x -> counter += x);
+
+println("Counter value: " + counter);
+{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+counter = 0
+rdd = sc.parallelize(data)
+
+# Wrong: Don't do this!!
+rdd.foreach(lambda x: counter += x)
+
+print("Counter value: " + counter)
+
+{% endhighlight %}
+</div>
+
+</div>
+
+#### Local vs. cluster modes
+
+The primary challenge is that the behavior of the above code is undefined. In local mode with a single JVM, the above code will sum the values within the RDD and store it in **counter**. This is because both the RDD and the variable **counter** are in the same memory space on the driver node.
+
+However, in `cluster` mode, what happens is more complicated, and the above may not work as intended. To execute jobs, Spark breaks up the processing of RDD operations into tasks - each of which is operated on by an executor. Prior to execution, Spark computes the **closure**. The closure is those variables and methods which must be visible for the executor to perform its computations on the RDD (in this case `foreach()`). This closure is serialized and sent to each executor. In `local` mode, there is only the one executors so everything shares the same closure. In other modes however, this is not the case and the executors running on seperate worker nodes each have their own copy of the closure.
+
+What is happening here is that the variables within the closure sent to each executor are now copies and thus, when **counter** is referenced within the `foreach` function, it's no longer the **counter** on the driver node. There is still a **counter** in the memory of the driver node but this is no longer visible to the executors! The executors only sees the copy from the serialized closure. Thus, the final value of **counter** will still be zero since all operations on **counter** were referencing the value within the serialized closure.
+
+To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#AccumLink). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.
+
+In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that's just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation is needed.
+
+#### Printing elements of an RDD
+Another common idiom is attempting to print out the elements of an RDD using `rdd.foreach(println)` or `rdd.map(println)`. On a single machine, this will generate the expected output and print all the RDD's elements. However, in `cluster` mode, the output to `stdout` being called by the executors is now writing to the executor's `stdout` instead, not the one on the driver, so `stdout` on the driver won't show these! To print all elements on the driver, one can use the `collect()` method to first bring the RDD to the driver node thus: `rdd.collect().foreach(println)`. This can cause the driver to run out of memory, though, because `collect()` fetches the entire RDD to a single machine; if you only need to print a few elements of the RDD, a safer approach is to use the `take()`: `rdd.take(100).foreach(println)`.
+
### Working with Key-Value Pairs
<div class="codetabs">
@@ -1018,7 +1081,8 @@ for details.
</tr>
<tr>
<td> <b>foreach</b>(<i>func</i>) </td>
- <td> Run a function <i>func</i> on each element of the dataset. This is usually done for side effects such as updating an accumulator variable (see below) or interacting with external storage systems. </td>
+ <td> Run a function <i>func</i> on each element of the dataset. This is usually done for side effects such as updating an <a href="#AccumLink">Accumulator</a> or interacting with external storage systems.
+ <br /><b>Note</b>: modifying variables other than Accumulators outside of the <code>foreach()</code> may result in undefined behavior. See <a href="#ClosuresLink">Understanding closures </a> for more details.</td>
</tr>
</table>
@@ -1191,7 +1255,7 @@ run on the cluster so that `v` is not shipped to the nodes more than once. In ad
`v` should not be modified after it is broadcast in order to ensure that all nodes get the same
value of the broadcast variable (e.g. if the variable is shipped to a new node later).
-## Accumulators
+## Accumulators <a name="AccumLink"></a>
Accumulators are variables that are only "added" to through an associative operation and can
therefore be efficiently supported in parallel. They can be used to implement counters (as in