aboutsummaryrefslogtreecommitdiff
path: root/docs/programming-guide.md
diff options
context:
space:
mode:
authorJacek Laskowski <jacek.laskowski@deepsense.io>2015-09-21 19:46:39 +0100
committerSean Owen <sowen@cloudera.com>2015-09-21 19:46:39 +0100
commitca9fe540fe04e2e230d1e76526b5502bab152914 (patch)
tree48b2bde988e1162e2528aae9452f1b84d3680148 /docs/programming-guide.md
parentebbf85f07bb8de0d566f1ae4b41f26421180bebe (diff)
downloadspark-ca9fe540fe04e2e230d1e76526b5502bab152914.tar.gz
spark-ca9fe540fe04e2e230d1e76526b5502bab152914.tar.bz2
spark-ca9fe540fe04e2e230d1e76526b5502bab152914.zip
[SPARK-10662] [DOCS] Code snippets are not properly formatted in tables
* Backticks are processed properly in Spark Properties table * Removed unnecessary spaces * See http://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/running-on-yarn.html Author: Jacek Laskowski <jacek.laskowski@deepsense.io> Closes #8795 from jaceklaskowski/docs-yarn-formatting.
Diffstat (limited to 'docs/programming-guide.md')
-rw-r--r--docs/programming-guide.md100
1 files changed, 50 insertions, 50 deletions
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index 4cf83bb392..8ad238315f 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -182,8 +182,8 @@ in-process.
In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the
variable called `sc`. Making your own SparkContext will not work. You can set which master the
context connects to using the `--master` argument, and you can add JARs to the classpath
-by passing a comma-separated list to the `--jars` argument. You can also add dependencies
-(e.g. Spark Packages) to your shell session by supplying a comma-separated list of maven coordinates
+by passing a comma-separated list to the `--jars` argument. You can also add dependencies
+(e.g. Spark Packages) to your shell session by supplying a comma-separated list of maven coordinates
to the `--packages` argument. Any additional repositories where dependencies might exist (e.g. SonaType)
can be passed to the `--repositories` argument. For example, to run `bin/spark-shell` on exactly
four cores, use:
@@ -217,7 +217,7 @@ context connects to using the `--master` argument, and you can add Python .zip,
to the runtime path by passing a comma-separated list to `--py-files`. You can also add dependencies
(e.g. Spark Packages) to your shell session by supplying a comma-separated list of maven coordinates
to the `--packages` argument. Any additional repositories where dependencies might exist (e.g. SonaType)
-can be passed to the `--repositories` argument. Any python dependencies a Spark Package has (listed in
+can be passed to the `--repositories` argument. Any python dependencies a Spark Package has (listed in
the requirements.txt of that package) must be manually installed using pip when necessary.
For example, to run `bin/pyspark` on exactly four cores, use:
@@ -249,8 +249,8 @@ the [IPython Notebook](http://ipython.org/notebook.html) with PyLab plot support
$ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook" ./bin/pyspark
{% endhighlight %}
-After the IPython Notebook server is launched, you can create a new "Python 2" notebook from
-the "Files" tab. Inside the notebook, you can input the command `%pylab inline` as part of
+After the IPython Notebook server is launched, you can create a new "Python 2" notebook from
+the "Files" tab. Inside the notebook, you can input the command `%pylab inline` as part of
your notebook before you start to try Spark from the IPython notebook.
</div>
@@ -418,9 +418,9 @@ Apart from text files, Spark's Python API also supports several other data forma
**Writable Support**
-PySpark SequenceFile support loads an RDD of key-value pairs within Java, converts Writables to base Java types, and pickles the
-resulting Java objects using [Pyrolite](https://github.com/irmen/Pyrolite/). When saving an RDD of key-value pairs to SequenceFile,
-PySpark does the reverse. It unpickles Python objects into Java objects and then converts them to Writables. The following
+PySpark SequenceFile support loads an RDD of key-value pairs within Java, converts Writables to base Java types, and pickles the
+resulting Java objects using [Pyrolite](https://github.com/irmen/Pyrolite/). When saving an RDD of key-value pairs to SequenceFile,
+PySpark does the reverse. It unpickles Python objects into Java objects and then converts them to Writables. The following
Writables are automatically converted:
<table class="table">
@@ -435,9 +435,9 @@ Writables are automatically converted:
<tr><td>MapWritable</td><td>dict</td></tr>
</table>
-Arrays are not handled out-of-the-box. Users need to specify custom `ArrayWritable` subtypes when reading or writing. When writing,
-users also need to specify custom converters that convert arrays to custom `ArrayWritable` subtypes. When reading, the default
-converter will convert custom `ArrayWritable` subtypes to Java `Object[]`, which then get pickled to Python tuples. To get
+Arrays are not handled out-of-the-box. Users need to specify custom `ArrayWritable` subtypes when reading or writing. When writing,
+users also need to specify custom converters that convert arrays to custom `ArrayWritable` subtypes. When reading, the default
+converter will convert custom `ArrayWritable` subtypes to Java `Object[]`, which then get pickled to Python tuples. To get
Python `array.array` for arrays of primitive types, users need to specify custom converters.
**Saving and Loading SequenceFiles**
@@ -454,7 +454,7 @@ classes can be specified, but for standard Writables this is not required.
**Saving and Loading Other Hadoop Input/Output Formats**
-PySpark can also read any Hadoop InputFormat or write any Hadoop OutputFormat, for both 'new' and 'old' Hadoop MapReduce APIs.
+PySpark can also read any Hadoop InputFormat or write any Hadoop OutputFormat, for both 'new' and 'old' Hadoop MapReduce APIs.
If required, a Hadoop configuration can be passed in as a Python dict. Here is an example using the
Elasticsearch ESInputFormat:
@@ -474,15 +474,15 @@ Note that, if the InputFormat simply depends on a Hadoop configuration and/or in
the key and value classes can easily be converted according to the above table,
then this approach should work well for such cases.
-If you have custom serialized binary data (such as loading data from Cassandra / HBase), then you will first need to
+If you have custom serialized binary data (such as loading data from Cassandra / HBase), then you will first need to
transform that data on the Scala/Java side to something which can be handled by Pyrolite's pickler.
-A [Converter](api/scala/index.html#org.apache.spark.api.python.Converter) trait is provided
-for this. Simply extend this trait and implement your transformation code in the ```convert```
-method. Remember to ensure that this class, along with any dependencies required to access your ```InputFormat```, are packaged into your Spark job jar and included on the PySpark
+A [Converter](api/scala/index.html#org.apache.spark.api.python.Converter) trait is provided
+for this. Simply extend this trait and implement your transformation code in the ```convert```
+method. Remember to ensure that this class, along with any dependencies required to access your ```InputFormat```, are packaged into your Spark job jar and included on the PySpark
classpath.
-See the [Python examples]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python) and
-the [Converter examples]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/pythonconverters)
+See the [Python examples]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python) and
+the [Converter examples]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/pythonconverters)
for examples of using Cassandra / HBase ```InputFormat``` and ```OutputFormat``` with custom converters.
</div>
@@ -758,7 +758,7 @@ One of the harder things about Spark is understanding the scope and life cycle o
#### Example
-Consider the naive RDD element sum below, which behaves completely differently depending on whether execution is happening within the same JVM. A common example of this is when running Spark in `local` mode (`--master = local[n]`) versus deploying a Spark application to a cluster (e.g. via spark-submit to YARN):
+Consider the naive RDD element sum below, which behaves completely differently depending on whether execution is happening within the same JVM. A common example of this is when running Spark in `local` mode (`--master = local[n]`) versus deploying a Spark application to a cluster (e.g. via spark-submit to YARN):
<div class="codetabs">
@@ -777,7 +777,7 @@ println("Counter value: " + counter)
<div data-lang="java" markdown="1">
{% highlight java %}
int counter = 0;
-JavaRDD<Integer> rdd = sc.parallelize(data);
+JavaRDD<Integer> rdd = sc.parallelize(data);
// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);
@@ -803,7 +803,7 @@ print("Counter value: " + counter)
#### Local vs. cluster modes
-The primary challenge is that the behavior of the above code is undefined. In local mode with a single JVM, the above code will sum the values within the RDD and store it in **counter**. This is because both the RDD and the variable **counter** are in the same memory space on the driver node.
+The primary challenge is that the behavior of the above code is undefined. In local mode with a single JVM, the above code will sum the values within the RDD and store it in **counter**. This is because both the RDD and the variable **counter** are in the same memory space on the driver node.
However, in `cluster` mode, what happens is more complicated, and the above may not work as intended. To execute jobs, Spark breaks up the processing of RDD operations into tasks - each of which is operated on by an executor. Prior to execution, Spark computes the **closure**. The closure is those variables and methods which must be visible for the executor to perform its computations on the RDD (in this case `foreach()`). This closure is serialized and sent to each executor. In `local` mode, there is only the one executors so everything shares the same closure. In other modes however, this is not the case and the executors running on seperate worker nodes each have their own copy of the closure.
@@ -813,9 +813,9 @@ To ensure well-defined behavior in these sorts of scenarios one should use an [`
In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that's just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation is needed.
-#### Printing elements of an RDD
+#### Printing elements of an RDD
Another common idiom is attempting to print out the elements of an RDD using `rdd.foreach(println)` or `rdd.map(println)`. On a single machine, this will generate the expected output and print all the RDD's elements. However, in `cluster` mode, the output to `stdout` being called by the executors is now writing to the executor's `stdout` instead, not the one on the driver, so `stdout` on the driver won't show these! To print all elements on the driver, one can use the `collect()` method to first bring the RDD to the driver node thus: `rdd.collect().foreach(println)`. This can cause the driver to run out of memory, though, because `collect()` fetches the entire RDD to a single machine; if you only need to print a few elements of the RDD, a safer approach is to use the `take()`: `rdd.take(100).foreach(println)`.
-
+
### Working with Key-Value Pairs
<div class="codetabs">
@@ -859,7 +859,7 @@ only available on RDDs of key-value pairs.
The most common ones are distributed "shuffle" operations, such as grouping or aggregating the elements
by a key.
-In Java, key-value pairs are represented using the
+In Java, key-value pairs are represented using the
[scala.Tuple2](http://www.scala-lang.org/api/{{site.SCALA_VERSION}}/index.html#scala.Tuple2) class
from the Scala standard library. You can simply call `new Tuple2(a, b)` to create a tuple, and access
its fields later with `tuple._1()` and `tuple._2()`.
@@ -974,7 +974,7 @@ for details.
<td> <b>groupByKey</b>([<i>numTasks</i>]) <a name="GroupByLink"></a> </td>
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable&lt;V&gt;) pairs. <br />
<b>Note:</b> If you are grouping in order to perform an aggregation (such as a sum or
- average) over each key, using <code>reduceByKey</code> or <code>aggregateByKey</code> will yield much better
+ average) over each key, using <code>reduceByKey</code> or <code>aggregateByKey</code> will yield much better
performance.
<br />
<b>Note:</b> By default, the level of parallelism in the output depends on the number of partitions of the parent RDD.
@@ -1025,7 +1025,7 @@ for details.
<tr>
<td> <b>repartitionAndSortWithinPartitions</b>(<i>partitioner</i>) <a name="Repartition2Link"></a></td>
<td> Repartition the RDD according to the given partitioner and, within each resulting partition,
- sort records by their keys. This is more efficient than calling <code>repartition</code> and then sorting within
+ sort records by their keys. This is more efficient than calling <code>repartition</code> and then sorting within
each partition because it can push the sorting down into the shuffle machinery. </td>
</tr>
</table>
@@ -1038,7 +1038,7 @@ RDD API doc
[Java](api/java/index.html?org/apache/spark/api/java/JavaRDD.html),
[Python](api/python/pyspark.html#pyspark.RDD),
[R](api/R/index.html))
-
+
and pair RDD functions doc
([Scala](api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions),
[Java](api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html))
@@ -1094,7 +1094,7 @@ for details.
</tr>
<tr>
<td> <b>foreach</b>(<i>func</i>) </td>
- <td> Run a function <i>func</i> on each element of the dataset. This is usually done for side effects such as updating an <a href="#AccumLink">Accumulator</a> or interacting with external storage systems.
+ <td> Run a function <i>func</i> on each element of the dataset. This is usually done for side effects such as updating an <a href="#AccumLink">Accumulator</a> or interacting with external storage systems.
<br /><b>Note</b>: modifying variables other than Accumulators outside of the <code>foreach()</code> may result in undefined behavior. See <a href="#ClosuresLink">Understanding closures </a> for more details.</td>
</tr>
</table>
@@ -1118,13 +1118,13 @@ co-located to compute the result.
In Spark, data is generally not distributed across partitions to be in the necessary place for a
specific operation. During computations, a single task will operate on a single partition - thus, to
organize all the data for a single `reduceByKey` reduce task to execute, Spark needs to perform an
-all-to-all operation. It must read from all partitions to find all the values for all keys,
-and then bring together values across partitions to compute the final result for each key -
+all-to-all operation. It must read from all partitions to find all the values for all keys,
+and then bring together values across partitions to compute the final result for each key -
this is called the **shuffle**.
Although the set of elements in each partition of newly shuffled data will be deterministic, and so
-is the ordering of partitions themselves, the ordering of these elements is not. If one desires predictably
-ordered data following shuffle then it's possible to use:
+is the ordering of partitions themselves, the ordering of these elements is not. If one desires predictably
+ordered data following shuffle then it's possible to use:
* `mapPartitions` to sort each partition using, for example, `.sorted`
* `repartitionAndSortWithinPartitions` to efficiently sort partitions while simultaneously repartitioning
@@ -1141,26 +1141,26 @@ network I/O. To organize data for the shuffle, Spark generates sets of tasks - *
organize the data, and a set of *reduce* tasks to aggregate it. This nomenclature comes from
MapReduce and does not directly relate to Spark's `map` and `reduce` operations.
-Internally, results from individual map tasks are kept in memory until they can't fit. Then, these
-are sorted based on the target partition and written to a single file. On the reduce side, tasks
+Internally, results from individual map tasks are kept in memory until they can't fit. Then, these
+are sorted based on the target partition and written to a single file. On the reduce side, tasks
read the relevant sorted blocks.
-
-Certain shuffle operations can consume significant amounts of heap memory since they employ
-in-memory data structures to organize records before or after transferring them. Specifically,
-`reduceByKey` and `aggregateByKey` create these structures on the map side, and `'ByKey` operations
-generate these on the reduce side. When data does not fit in memory Spark will spill these tables
+
+Certain shuffle operations can consume significant amounts of heap memory since they employ
+in-memory data structures to organize records before or after transferring them. Specifically,
+`reduceByKey` and `aggregateByKey` create these structures on the map side, and `'ByKey` operations
+generate these on the reduce side. When data does not fit in memory Spark will spill these tables
to disk, incurring the additional overhead of disk I/O and increased garbage collection.
Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files
-are preserved until the corresponding RDDs are no longer used and are garbage collected.
-This is done so the shuffle files don't need to be re-created if the lineage is re-computed.
-Garbage collection may happen only after a long period time, if the application retains references
-to these RDDs or if GC does not kick in frequently. This means that long-running Spark jobs may
+are preserved until the corresponding RDDs are no longer used and are garbage collected.
+This is done so the shuffle files don't need to be re-created if the lineage is re-computed.
+Garbage collection may happen only after a long period time, if the application retains references
+to these RDDs or if GC does not kick in frequently. This means that long-running Spark jobs may
consume a large amount of disk space. The temporary storage directory is specified by the
`spark.local.dir` configuration parameter when configuring the Spark context.
Shuffle behavior can be tuned by adjusting a variety of configuration parameters. See the
-'Shuffle Behavior' section within the [Spark Configuration Guide](configuration.html).
+'Shuffle Behavior' section within the [Spark Configuration Guide](configuration.html).
## RDD Persistence
@@ -1246,7 +1246,7 @@ efficiency. We recommend going through the following process to select one:
This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.
* If not, try using `MEMORY_ONLY_SER` and [selecting a fast serialization library](tuning.html) to
-make the objects much more space-efficient, but still reasonably fast to access.
+make the objects much more space-efficient, but still reasonably fast to access.
* Don't spill to disk unless the functions that computed your datasets are expensive, or they filter
a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from
@@ -1345,7 +1345,7 @@ Accumulators are variables that are only "added" to through an associative opera
therefore be efficiently supported in parallel. They can be used to implement counters (as in
MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers
can add support for new types. If accumulators are created with a name, they will be
-displayed in Spark's UI. This can be useful for understanding the progress of
+displayed in Spark's UI. This can be useful for understanding the progress of
running stages (NOTE: this is not yet supported in Python).
An accumulator is created from an initial value `v` by calling `SparkContext.accumulator(v)`. Tasks
@@ -1474,8 +1474,8 @@ vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())
</div>
-For accumulator updates performed inside <b>actions only</b>, Spark guarantees that each task's update to the accumulator
-will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware
+For accumulator updates performed inside <b>actions only</b>, Spark guarantees that each task's update to the accumulator
+will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware
of that each task's update may be applied more than once if tasks or job stages are re-executed.
Accumulators do not change the lazy evaluation model of Spark. If they are being updated within an operation on an RDD, their value is only updated once that RDD is computed as part of an action. Consequently, accumulator updates are not guaranteed to be executed when made within a lazy transformation like `map()`. The below code fragment demonstrates this property:
@@ -1486,7 +1486,7 @@ Accumulators do not change the lazy evaluation model of Spark. If they are being
{% highlight scala %}
val accum = sc.accumulator(0)
data.map { x => accum += x; f(x) }
-// Here, accum is still 0 because no actions have caused the `map` to be computed.
+// Here, accum is still 0 because no actions have caused the <code>map</code> to be computed.
{% endhighlight %}
</div>
@@ -1553,7 +1553,7 @@ Several changes were made to the Java API:
code that `extends Function` should `implement Function` instead.
* New variants of the `map` transformations, like `mapToPair` and `mapToDouble`, were added to create RDDs
of special data types.
-* Grouping operations like `groupByKey`, `cogroup` and `join` have changed from returning
+* Grouping operations like `groupByKey`, `cogroup` and `join` have changed from returning
`(Key, List<Value>)` pairs to `(Key, Iterable<Value>)`.
</div>