aboutsummaryrefslogtreecommitdiff
path: root/docs
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-09-26 19:17:58 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-09-26 19:17:58 -0700
commit874a9fd407943c7102395cfc64762dfd0ecf9b00 (patch)
tree35ccd0bb96acfb17275b4a12c14ab611e91c1376 /docs
parentee71fa49c1d958f1723fd3fba59b8e247cb4af76 (diff)
downloadspark-874a9fd407943c7102395cfc64762dfd0ecf9b00.tar.gz
spark-874a9fd407943c7102395cfc64762dfd0ecf9b00.tar.bz2
spark-874a9fd407943c7102395cfc64762dfd0ecf9b00.zip
More updates to docs, including tuning guide
Diffstat (limited to 'docs')
-rwxr-xr-xdocs/_layouts/global.html1
-rw-r--r--docs/configuration.md174
-rwxr-xr-xdocs/css/main.css14
-rw-r--r--docs/ec2-scripts.md10
-rw-r--r--docs/index.md1
-rw-r--r--docs/scala-programming-guide.md63
-rw-r--r--docs/tuning.md168
7 files changed, 388 insertions, 43 deletions
diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html
index dee7f65d0d..9c84edd840 100755
--- a/docs/_layouts/global.html
+++ b/docs/_layouts/global.html
@@ -63,6 +63,7 @@
<a href="{{HOME_PATH}}api.html" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="configuration.html">Configuration</a></li>
+ <li><a href="tuning.html">Tuning</a></li>
<li><a href="bagel-programming-guide.html">Bagel (Pregel on Spark)</a></li>
<li><a href="contributing-to-spark.html">Contributing to Spark</a></li>
</ul>
diff --git a/docs/configuration.md b/docs/configuration.md
index 0f16676f6d..93a644910c 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -2,21 +2,181 @@
layout: global
title: Spark Configuration
---
-Spark is configured primarily through the `conf/spark-env.sh` script. This script doesn't exist in the Git repository, but you can create it by copying `conf/spark-env.sh.template`. Make sure the script is executable.
-Inside this script, you can set several environment variables:
+Spark provides three main locations to configure the system:
+
+* The [`conf/spark-env.sh` script](#environment-variables-in-spark-envsh), in which you can set environment variables
+ that affect how the JVM is launched, such as, most notably, the amount of memory per JVM.
+* [Java system properties](#system-properties), which control internal configuration parameters and can be set either
+ programmatically (by calling `System.setProperty` *before* creating a `SparkContext`) or through the
+ `SPARK_JAVA_OPTS` environment variable in `spark-env.sh`.
+* [Logging configuration](#configuring-logging), which is done through `log4j.properties`.
+
+
+# Environment Variables in spark-env.sh
+
+Spark determines how to initialize the JVM on worker nodes, or even on the local node when you run `spark-shell`,
+by running the `conf/spark-env.sh` script in the directory where it is installed. This script does not exist by default
+in the Git repository, but but you can create it by copying `conf/spark-env.sh.template`. Make sure that you make
+the copy executable.
+
+Inside `spark-env.sh`, you can set the following environment variables:
* `SCALA_HOME` to point to your Scala installation.
* `MESOS_NATIVE_LIBRARY` if you are [running on a Mesos cluster]({{HOME_PATH}}running-on-mesos.html).
* `SPARK_MEM` to set the amount of memory used per node (this should be in the same format as the JVM's -Xmx option, e.g. `300m` or `1g`)
-* `SPARK_JAVA_OPTS` to add JVM options. This includes system properties that you'd like to pass with `-D`.
+* `SPARK_JAVA_OPTS` to add JVM options. This includes any system properties that you'd like to pass with `-D`.
* `SPARK_CLASSPATH` to add elements to Spark's classpath.
* `SPARK_LIBRARY_PATH` to add search directories for native libraries.
-The `spark-env.sh` script is executed both when you submit jobs with `run`, when you start the interpreter with `spark-shell`, and on each worker node on a Mesos cluster to set up the environment for that worker.
+The most important things to set first will be `SCALA_HOME`, without which `spark-shell` cannot run, and `MESOS_NATIVE_LIBRARY`
+if running on Mesos. The next setting will probably be the memory (`SPARK_MEM`). Make sure you set it high enough to be able to run your job but lower than the total memory on the machines (leave at least 1 GB for the operating system).
+
+
+# System Properties
+
+To set a system property for configuring Spark, you need to either pass it with a -D flag to the JVM (for example `java -Dspark.cores.max=5 MyProgram`) or call `System.setProperty` in your code *before* creating your Spark context, as follows:
+
+{% highlight scala %}
+System.setProperty("spark.cores.max", "5")
+val sc = new SparkContext(...)
+{% endhighlight %}
+
+Most of the configurable system properties control internal settings that have reasonable default values. However,
+there are at least four properties that you will commonly want to control:
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+<tr>
+ <td>spark.serializer</td>
+ <td>spark.JavaSerializer</td>
+ <td>
+ Class to use for serializing objects that will be sent over the network or need to be cached
+ in serialized form. The default of Java serialization works with any Serializable Java object but is
+ quite slow, so we recommend <a href="{{HOME_PATH}}tuning.html">using <code>spark.KryoSerializer</code>
+ and configuring Kryo serialization</a> when speed is necessary. Can be any subclass of
+ <a href="{{HOME_PATH}}api/core/index.html#spark.Serializer"><code>spark.Serializer</code></a>).
+ </td>
+</tr>
+<tr>
+ <td>spark.kryo.registrator</td>
+ <td>(none)</td>
+ <td>
+ If you use Kryo serialization, set this class to register your custom classes with Kryo.
+ You need to set it to a class that extends
+ <a href="{{HOME_PATH}}api/core/index.html#spark.KryoRegistrator"><code>spark.KryoRegistrator</code></a>).
+ See the <a href="{{HOME_PATH}}tuning.html#data-serialization">tuning guide</a> for more details.
+ </td>
+</tr>
+<tr>
+ <td>spark.local.dir</td>
+ <td>/tmp</td>
+ <td>
+ Directory to use for "scratch" space in Spark, including map output files and RDDs that get stored
+ on disk. This should be on a fast, local disk in your system.
+ </td>
+</tr>
+<tr>
+ <td>spark.cores.max</td>
+ <td>(infinite)</td>
+ <td>
+ When running on a <a href="{{BASE_PATH}}spark-standalone.html">standalone deploy cluster</a> or a
+ <a href="{{BASE_PATH}}running-on-mesos.html">Mesos cluster in "coarse-grained" sharing mode</a>,
+ how many CPU cores to request at most. The default will use all available cores.
+ </td>
+</tr>
+</table>
+
+
+Apart from these, the following properties are also available, and may be useful in some situations:
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+<tr>
+ <td>spark.mesos.coarse</td>
+ <td>false</td>
+ <td>
+ If set to "true", runs over Mesos clusters in
+ <a href="{{BASE_PATH}}running-on-mesos.html">"coarse-grained" sharing mode</a>,
+ where Spark acquires one long-lived Mesos task on each machine instead of one Mesos task per Spark task.
+ This gives lower-latency scheduling for short queries, but leaves resources in use for the whole
+ duration of the Spark job.
+ </td>
+</tr>
+<tr>
+ <td>spark.default.parallelism</td>
+ <td>8</td>
+ <td>
+ Default number of tasks to use for distributed shuffle operations (<code>groupByKey</code>,
+ <code>reduceByKey</code>, etc) when not set by user.
+ </td>
+</tr>
+<tr>
+ <td>spark.storage.memoryFraction</td>
+ <td>0.66</td>
+ <td>
+ Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old"
+ generation of objects in the JVM, which by default is given 2/3 of the heap, but you can increase
+ it if you configure your own old generation size.
+ </td>
+</tr>
+<tr>
+ <td>spark.blockManager.parallelFetches</td>
+ <td>4</td>
+ <td>
+ Number of map output files to fetch concurrently from each reduce task.
+ </td>
+</tr>
+<tr>
+ <td>spark.closure.serializer</td>
+ <td>spark.JavaSerializer</td>
+ <td>
+ Serializer class to use for closures. Generally Java is fine unless your distributed functions
+ (e.g. map functions) reference large objects in the driver program.
+ </td>
+</tr>
+<tr>
+ <td>spark.kryoserializer.buffer.mb</td>
+ <td>32</td>
+ <td>
+ Maximum object size to allow within Kryo (the library needs to create a buffer at least as
+ large as the largest single object you'll serialize). Increase this if you get a "buffer limit
+ exceeded" exception inside Kryo. Note that there will be one buffer <i>per core</i> on each worker.
+ </td>
+<tr>
+ <td>spark.broadcast.factory</td>
+ <td>spark.broadcast. HttpBroadcastFactory</td>
+ <td>
+ Which broadcast implementation to use.
+ </td>
+</tr>
+<tr>
+ <td>spark.locality.wait</td>
+ <td>3000</td>
+ <td>
+ Number of milliseconds to wait to launch a data-local task before giving up and launching it
+ in a non-data-local location. You should increase this if your tasks are long and you are seeing
+ poor data locality, but the default generally works well.
+ </td>
+</tr>
+<tr>
+ <td>spark.master.host</td>
+ <td>(local hostname)</td>
+ <td>
+ Hostname for the master to listen on (it will bind to this hostname's IP address).
+ </td>
+</tr>
+<tr>
+ <td>spark.master.port</td>
+ <td>(random)</td>
+ <td>
+ Port for the master to listen on.
+ </td>
+</tr>
+</table>
-The most important thing to set first will probably be the memory (`SPARK_MEM`). Make sure you set it high enough to be able to run your job but lower than the total memory on the machines (leave at least 1 GB for the operating system).
-## Logging Configuration
+# Configuring Logging
-Spark uses [log4j](http://logging.apache.org/log4j/) for logging. You can configure it by adding a `log4j.properties` file in the `conf` directory. One way to start is to copy the existing `log4j.properties.template` located there.
+Spark uses [log4j](http://logging.apache.org/log4j/) for logging. You can configure it by adding a `log4j.properties`
+file in the `conf` directory. One way to start is to copy the existing `log4j.properties.template` located there.
diff --git a/docs/css/main.css b/docs/css/main.css
index c8aaa8ad22..84fe1d44ed 100755
--- a/docs/css/main.css
+++ b/docs/css/main.css
@@ -30,14 +30,21 @@ body #content {
h1 {
font-size: 28px;
+ margin-top: 12px;
}
h2 {
font-size: 24px;
+ margin-top: 12px;
}
h3 {
font-size: 21px;
+ margin-top: 10px;
+}
+
+pre {
+ font-family: "Menlo", "Lucida Console", monospace;
}
code {
@@ -45,15 +52,16 @@ code {
background: white;
border: none;
padding: 0;
- color: #902000;
+ color: #444444;
}
a code {
color: #0088cc;
}
-pre {
- font-family: "Menlo", "Lucida Console", monospace;
+a:hover code {
+ color: #005580;
+ text-decoration: underline;
}
.container {
diff --git a/docs/ec2-scripts.md b/docs/ec2-scripts.md
index a1cc7985b4..faa89b5d56 100644
--- a/docs/ec2-scripts.md
+++ b/docs/ec2-scripts.md
@@ -99,6 +99,16 @@ permissions on your private key file, you can run `launch` with the
- Finally, if you get errors while running your jobs, look at the slave's logs
for that job using the Mesos web UI (`http://<master-hostname>:8080`).
+# Configuration
+
+You can edit `/root/spark/conf/spark-env.sh` on each machine to set Spark configuration options, such
+as JVM options and, most crucially, the amount of memory to use per machine (`SPARK_MEM`).
+This file needs to be copied to **every machine** to reflect the change. The easiest way to do this
+is to use a script we provide called `copy-dir`. First edit your `spark-env.sh` file on the master,
+then run `~/mesos-ec2/copy-dir /root/spark/conf` to RSYNC it to all the workers.
+
+The [configuration guide]({{HOME_PATH}}configuration.html) describes the available configuration options.
+
# Terminating a Cluster
***Note that there is no way to recover data on EC2 nodes after shutting
diff --git a/docs/index.md b/docs/index.md
index 26b2cc0840..795a180353 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -70,6 +70,7 @@ of `project/SparkBuild.scala`, then rebuilding Spark (`sbt/sbt clean compile`).
**Other documents:**
* [Configuration]({{HOME_PATH}}configuration.html): customize Spark via its configuration system
+* [Tuning guide]({{HOME_PATH}}tuning.html): best practices to optimize performance and memory use
* [API docs (Scaladoc)]({{HOME_PATH}}api/core/index.html)
* [Bagel]({{HOME_PATH}}bagel-programming-guide.html): an implementation of Google's Pregel on Spark
* [Contributing to Spark](contributing-to-spark.html)
diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md
index ee897072ca..ffc11d8972 100644
--- a/docs/scala-programming-guide.md
+++ b/docs/scala-programming-guide.md
@@ -132,7 +132,13 @@ The following tables list the transformations and actions currently supported (s
</tr>
<tr>
<td> <b>flatMap</b>(<i>func</i>) </td>
- <td> Similar to map, but each input item can be mapped to 0 or more output items (so <i>func</i> should return a Seq rather than a single item). </td></tr>
+ <td> Similar to map, but each input item can be mapped to 0 or more output items (so <i>func</i> should return a Seq rather than a single item). </td>
+</tr>
+<tr>
+ <td> <b>mapPartitions</b>(<i>func</i>) </td>
+ <td> Similar to map, but runs separately on each partition (block) of the RDD, so <i>func</i> must be of type
+ Iterator[T] => Iterator[U] when running on an RDD of type T. </td>
+</tr>
<tr>
<td> <b>sample</b>(<i>withReplacement</i>, <i>fraction</i>, <i>seed</i>) </td>
<td> Sample a fraction <i>fraction</i> of the data, with or without replacement, using a given random number generator seed. </td>
@@ -143,7 +149,7 @@ The following tables list the transformations and actions currently supported (s
</tr>
<tr>
<td> <b>groupByKey</b>([<i>numTasks</i>]) </td>
- <td> When called on a dataset of (K, V) pairs, returns a dataset of (K, Seq[V]) pairs.
+ <td> When called on a dataset of (K, V) pairs, returns a dataset of (K, Seq[V]) pairs. <br />
<b>Note:</b> By default, this uses only 8 parallel tasks to do the grouping. You can pass an optional <code>numTasks</code> argument to set a different number of tasks.
</td>
</tr>
@@ -152,6 +158,10 @@ The following tables list the transformations and actions currently supported (s
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td>
</tr>
<tr>
+ <td> <b>sortByKey</b>([<i>ascending</i>], [<i>numTasks</i>]) </td>
+ <td> When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean <code>ascending</code> argument.</td>
+</tr>
+<tr>
<td> <b>join</b>(<i>otherDataset</i>, [<i>numTasks</i>]) </td>
<td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. </td>
</tr>
@@ -163,12 +173,10 @@ The following tables list the transformations and actions currently supported (s
<td> <b>cartesian</b>(<i>otherDataset</i>) </td>
<td> When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements). </td>
</tr>
-<tr>
- <td> <b>sortByKey</b>([<i>ascending</i>]) </td>
- <td> When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean <code>ascending</code> argument.</td>
-</tr>
</table>
+A complete list of transformations is available in the [RDD API doc]({{HOME_PATH}}api/core/index.html#spark.RDD).
+
### Actions
<table class="table">
@@ -215,6 +223,8 @@ The following tables list the transformations and actions currently supported (s
</tr>
</table>
+A complete list of actions is available in the [RDD API doc]({{HOME_PATH}}api/core/index.html#spark.RDD).
+
## RDD Persistence
One of the most important capabilities in Spark is *persisting* (or *caching*) a dataset in memory across operations. When you persist an RDD, each node stores any slices of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for building iterative algorithms with Spark and for interactive use from the interpreter.
@@ -224,22 +234,22 @@ You can mark an RDD to be persisted using the `persist()` or `cache()` methods o
In addition, each RDD can be stored using a different *storage level*, allowing you, for example, to persist the dataset on disk, or persist it in memory but as serialized Java objects (to save space), or even replicate it across nodes. These levels are chosen by passing a [`spark.storage.StorageLevel`]({{HOME_PATH}}api/core/index.html#spark.storage.StorageLevel) object to `persist()`. The `cache()` method is a shorthand for using the default storage level, which is `StorageLevel.MEMORY_ONLY_DESER` (store deserialized objects in memory). The complete set of available storage levels is:
<table class="table">
-<tr><th>Storage Level</th><th>Meaning</th></tr>
+<tr><th style="width:30%">Storage Level</th><th>Meaning</th></tr>
<tr>
<td> MEMORY_ONLY_DESER </td>
- <td> Store RDD as deserialized Java objects in the JVM. If the entire RDD does not fit in memory, some partitions will
+ <td> Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will
not be cached and will be recomputed on the fly each time they're needed. This is the default level. </td>
</tr>
<tr>
<td> DISK_AND_MEMORY_DESER </td>
- <td> Store RDD as deserialized Java objects in the JVM. If the entire RDD does not fit in memory, store the
- partitions that don't fit on disk, and read them from there the next time they're needed. </td>
+ <td> Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the
+ partitions that don't fit on disk, and read them from there when they're needed. </td>
</tr>
<tr>
<td> MEMORY_ONLY </td>
- <td> Store RDD as <i>serialized</i> Java objects (that is, one large byte array per partition).
+ <td> Store RDD as <i>serialized</i> Java objects (that is, one byte array per partition).
This is generally more space-efficient than deserialized objects, especially when using a
- [fast serializer]({{HOME_PATH}}tuning.html), but more CPU-intensive to read.
+ <a href="{{HOME_PATH}}tuning.html">fast serializer</a>, but more CPU-intensive to read.
</td>
</tr>
<tr>
@@ -252,31 +262,15 @@ In addition, each RDD can be stored using a different *storage level*, allowing
<td> Store the RDD partitions only on disk. </td>
</tr>
<tr>
- <td> MEMORY_ONLY_DESER_2 </td>
- <td> Same as MEMORY_ONLY_DESER, but replicate to two nodes. </td>
-</tr>
-<tr>
- <td> DISK_AND_MEMORY_DESER_2 </td>
- <td> Same as DISK_AND_MEMORY_DESER, but replicate to two nodes. </td>
-</tr>
-<tr>
- <td> MEMORY_ONLY_2 </td>
- <td> Same as MEMORY_ONLY, but replicate to two nodes. </td>
-</tr>
-<tr>
- <td> DISK_AND_MEMORY_2 </td>
- <td> Same as DISK_AND_MEMORY, but replicate to two nodes. </td>
-</tr>
-<tr>
- <td> DISK_ONLY_2 </td>
- <td> Same as DISK_ONLY, but replicate to two nodes. </td>
+ <td> MEMORY_ONLY_DESER_2 / DISK_AND_MEMORY_DESER_2 / MEMORY_ONLY_2 / DISK_ONLY_2 / DISK_AND_MEMORY_2 </td>
+ <td> Same as the levels above, but replicate each partition on two nodes. </td>
</tr>
</table>
### Which Storage Level to Choose?
-As you can see, Spark supports a variety of storage levels that give different tradeoffs between memory usage
-and CPU efficiency. We recommend going through the following process to select one:
+Spark's storage levels are meant to provide different tradeoffs between memory usage and CPU efficiency.
+We recommend going through the following process to select one:
* If your RDDs fit comfortably with the default storage level (`MEMORY_ONLY_DESER`), leave them that way. This is the most
CPU-efficient option, allowing operations on the RDDs to run as fast as possible.
@@ -333,5 +327,8 @@ res2: Int = 10
# Where to Go from Here
You can see some [example Spark programs](http://www.spark-project.org/examples.html) on the Spark website.
-
In addition, Spark includes several sample programs in `examples/src/main/scala`. Some of them have both Spark versions and local (non-parallel) versions, allowing you to see what had to be changed to make the program run on a cluster. You can run them using by passing the class name to the `run` script included in Spark -- for example, `./run spark.examples.SparkPi`. Each example program prints usage help when run without any arguments.
+
+For help on optimizing your program, the [configuration]({{HOME_PATH}}configuration.html) and
+[tuning]({{HOME_PATH}}tuning.html) guides provide information on best practices. They are especially important for
+making sure that your data is stored in memory in an efficient format.
diff --git a/docs/tuning.md b/docs/tuning.md
new file mode 100644
index 0000000000..292874ce59
--- /dev/null
+++ b/docs/tuning.md
@@ -0,0 +1,168 @@
+---
+layout: global
+title: Tuning Spark
+---
+
+Because of the in-memory nature of most Spark computations, Spark programs can be bottlenecked
+by any resource in the cluster: CPU, network bandwidth, or memory.
+Most often, if the data fits in memory, the bottleneck is network bandwidth, but sometimes, you
+also need to do some tuning, such as
+[storing RDDs in serialized form]({{HOME_PATH}}scala-programming-guide#rdd-persistence), to
+make the memory usage smaller.
+This guide will cover two main topics: data serialization, which is crucial for good network
+performance, and memory tuning. We also sketch several smaller topics.
+
+# Data Serialization
+
+One of the most important concerns in any distributed program is the format of data sent across
+the network -- formats that are slow to serialize objects into, or consume a large number of
+bytes, will greatly slow down the computation.
+Often, this will be the first thing you should tune to optimize a Spark application.
+Spark aims to strike a balance between convenience (allowing you to work with any Java type
+in your operations) and performance. It provides two serialization libraries:
+
+* [Java serialization](http://docs.oracle.com/javase/6/docs/api/java/io/Serializable.html):
+ By default, Spark serializes objects using Java's `ObjectOutputStream` framework, and can work
+ with any class you create that implements
+ [`java.io.Serializable`](http://docs.oracle.com/javase/6/docs/api/java/io/Serializable.html).
+ You can also control the performance of your serialization more closely by extending
+ [`java.io.Externalizable`](http://docs.oracle.com/javase/6/docs/api/java/io/Externalizable.html).
+ Java serialization is flexible but often quite slow, and leads to large
+ serialized formats for many classes.
+* [Kryo serialization](http://code.google.com/p/kryo/wiki/V1Documentation): Spark can also use
+ the Kryo library (currently just version 1) to serialize objects more quickly. Kryo is significantly
+ faster and more compact than Java serialization (often as much as 10x), but does not support all
+ `Serializable` types and requires you to *register* the classes you'll use in the program in advance
+ for best performance.
+
+You can switch to using Kryo by calling `System.setProperty("spark.serializer", "spark.KryoSerializer")`
+*before* creating your SparkContext. The only reason it is not the default is because of the custom
+registration requirement, but we recommend trying it in any network-intensive application.
+
+Finally, to register your classes with Kryo, create a public class that extends
+[`spark.KryoRegistrator`]({{HOME_PATH}}api/core/index.html#spark.KryoRegistrator) and set the
+`spark.kryo.registrator` system property to point to it, as follows:
+
+{% highlight scala %}
+class MyRegistrator extends KryoRegistrator {
+ override def registerClasses(kryo: Kryo) {
+ kryo.register(classOf[MyClass1])
+ kryo.register(classOf[MyClass2])
+ }
+}
+
+// Make sure to set these properties *before* creating a SparkContext!
+System.setProperty("spark.serializer", "spark.KryoSerializer")
+System.setProperty("spark.kryo.registrator", "mypackage.MyRegistrator")
+val sc = new SparkContext(...)
+{% endhighlight %}
+
+The [Kryo documentation](http://code.google.com/p/kryo/wiki/V1Documentation) describes more advanced
+registration options, such as adding custom serialization code.
+
+If your objects are large, you may also need to increase the `spark.kryoserializer.buffer.mb`
+system property. The default is 32, but this value needs to be large enough to hold the *largest*
+object you will serialize.
+
+Finally, if you don't register your classes, Kryo will still work, but it will have to store the
+full class name with each object, which is wasteful.
+
+
+# Memory Tuning
+
+There are three considerations in tuning memory usage: the *amount* of memory used by your objects
+(you likely want your entire dataset to fit in memory), the *cost* of accessing those objects, and the
+overhead of *garbage collection* (if you have high turnover in terms of objects).
+
+By default, Java objects are fast to access, but can easily consume a factor of 2-5x more space
+than the "raw" data inside their fields. This is due to several reasons:
+
+* Each distinct Java object has an "object header", which is about 16 bytes and contains information
+ such as a pointer to its class. For an object with very little data in it (say one `Int` field), this
+ can be bigger than the data.
+* Java Strings have about 40 bytes of overhead over the raw string data (since they store it in an
+ array of `Char`s and keep extra data such as the length), and store each character
+ as *two* bytes due to Unicode. Thus a 10-character string can easily consume 60 bytes.
+* Common collection classes, such as `HashMap` and `LinkedList`, use linked data structures, where
+ there is a "wrapper" object for each entry (e.g. `Map.Entry`). This object not only has a header,
+ but also pointers (typically 8 bytes each) to the next object in the list.
+* Collections of primitive types often store them as "boxed" objects such as `java.lang.Integer`.
+
+There are several ways to reduce this cost and still make Java objects efficient to work with:
+
+1. Design your data structures to prefer arrays of objects, and primitive types, instead of the
+ standard Java or Scala collection classes (e.g. `HashMap`). The [fastutil](http://fastutil.di.unimi.it)
+ library provides convenient collection classes for primitive types that are compatible with the
+ Java standard library.
+2. Avoid nested structures with a lot of small objects and pointers when possible.
+3. If you have less than 32 GB of RAM, set the JVM flag `-XX:+UseCompressedOops` to make pointers be
+ four bytes instead of eight. Also, on Java 7 or later, try `-XX:+UseCompressedStrings` to store
+ ASCII strings as just 8 bits per character. You can add these options in
+ [`spark-env.sh`]({{HOME_PATH}}configuration.html#environment-variables-in-spark-envsh).
+
+You can get a sense of the memory usage of each object by looking at the logs of your Spark worker
+nodes -- they will print the size of each RDD partition cached.
+
+When your objects are still too large to efficiently store despite this tuning, a much simpler way
+to reduce memory usage is to store them in *serialized* form, using the serialized StorageLevels in
+the [RDD persistence API]({{HOME_PATH}}scala-programming-guide#rdd-persistence).
+Spark will then store each RDD partition as one large byte array.
+The only downside of storing data in serialized form is slower access times, due to having to
+deserialize each object on the fly.
+We highly recommend [using Kryo](#data-serialization) if you want to cache data in serialized form, as
+it leads to much smaller sizes than Java serialization (and certainly than raw Java objects).
+
+Finally, JVM garbage collection can be a problem when you have large "churn" in terms of the RDDs
+stored by your program. (It is generally not a problem in programs that just read an RDD once
+and then run many operations on it.) When Java needs to evict old objects to make room for new ones, it will
+need to trace through all your Java objects and find the unused ones. The main point to remember here is
+that *the cost of garbage collection is proportional to the number of Java objects*, so using data
+structures with fewer objects (e.g. an array of `Int`s instead of a `LinkedList`) greatly reduces
+this cost. An even better method is to persist objects in serialized form, as described above: now
+there will be only *one* object (a byte array) per RDD partition. There is a lot of
+[detailed information on GC tuning](http://www.oracle.com/technetwork/java/javase/gc-tuning-6-140523.html)
+available online, but at a high level, the first thing to try if GC is a problem is to use serialized caching.
+
+
+# Other Considerations
+
+## Level of Parallelism
+
+Clusters will not be fully utilized unless you set the level of parallelism for each operation high
+enough. Spark automatically sets the number of "map" tasks to run on each file according to its size
+(though you can control it through optional parameters to `SparkContext.textFile`, etc), but for
+distributed "reduce" operations, such as `groupByKey` and `reduceByKey`, it uses a default value of 8.
+You can pass the level of parallelism as a second argument (see the
+[`spark.PairRDDFunctions`]({{HOME_PATH}}api/core/index.html#spark.PairRDDFunctions) documentation),
+or set the system property `spark.default.parallelism` to change the default.
+In general, we recommend 2-3 tasks per CPU core in your cluster.
+
+## Memory Usage of Reduce Tasks
+
+Sometimes, you will get an OutOfMemoryError not because your RDDs don't fit in memory, but because the
+working set of one of your tasks, such as one of the reduce tasks in `groupByKey`, was too large.
+Spark's shuffle operations (`sortByKey`, `groupByKey`, `reduceByKey`, `join`, etc) build a hash table
+within each task to perform the grouping, which can often be large. The simplest fix here is to
+*increase the level of parallelism*, so that each task's input set is smaller. Spark can efficiently
+support tasks as short as 200 ms, because it reuses one worker JVMs across all tasks and it has
+a low task launching cost, so you can safely increase the level of parallelism to more than the
+number of cores in your clusters.
+
+## Broadcasting Large Variables
+
+Using the [broadcast functionality]({{HOME_PATH}}scala-programming-guide#broadcast-variables)
+available in `SparkContext` can greatly reduce the size of each serialized task, and the cost
+of launching a job over a cluster. If your tasks use any large object from the driver program
+inside of them (e.g. a static lookup table), consider turning it into a broadcast variable.
+Spark prints the serialized size of each task on the master, so you can look at that to
+decide whether your tasks are too large; in general tasks larger than about 20 KB are probably
+worth optimizing.
+
+
+# Summary
+
+This has been a quick guide to point out the main concerns you should know about when tuning a
+Spark application -- most importantly, data serialization and memory tuning. For most programs,
+switching to Kryo serialization and persisting data in serialized form will solve most common
+performance issues. Feel free to ask on the
+[Spark mailing list](http://groups.google.com/group/spark-users) about other tuning best practices. \ No newline at end of file