aboutsummaryrefslogtreecommitdiff
path: root/docs
diff options
context:
space:
mode:
Diffstat (limited to 'docs')
-rw-r--r--docs/cluster-overview.md14
-rw-r--r--docs/configuration.md10
-rw-r--r--docs/ec2-scripts.md2
-rw-r--r--docs/python-programming-guide.md11
-rw-r--r--docs/running-on-yarn.md9
-rw-r--r--docs/scala-programming-guide.md6
-rw-r--r--docs/spark-standalone.md75
-rw-r--r--docs/streaming-programming-guide.md9
-rw-r--r--docs/tuning.md2
9 files changed, 127 insertions, 11 deletions
diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md
index f679cad713..5927f736f3 100644
--- a/docs/cluster-overview.md
+++ b/docs/cluster-overview.md
@@ -13,7 +13,7 @@ object in your main program (called the _driver program_).
Specifically, to run on a cluster, the SparkContext can connect to several types of _cluster managers_
(either Spark's own standalone cluster manager or Mesos/YARN), which allocate resources across
applications. Once connected, Spark acquires *executors* on nodes in the cluster, which are
-worker processes that run computations and store data for your application.
+worker processes that run computations and store data for your application.
Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to
the executors. Finally, SparkContext sends *tasks* for the executors to run.
@@ -57,6 +57,18 @@ which takes a list of JAR files (Java/Scala) or .egg and .zip libraries (Python)
worker nodes. You can also dynamically add new files to be sent to executors with `SparkContext.addJar`
and `addFile`.
+## URIs for addJar / addFile
+
+- **file:** - Absolute paths and `file:/` URIs are served by the driver's HTTP file server, and every executor
+ pulls the file from the driver HTTP server
+- **hdfs:**, **http:**, **https:**, **ftp:** - these pull down files and JARs from the URI as expected
+- **local:** - a URI starting with local:/ is expected to exist as a local file on each worker node. This
+ means that no network IO will be incurred, and works well for large files/JARs that are pushed to each worker,
+ or shared via NFS, GlusterFS, etc.
+
+Note that JARs and files are copied to the working directory for each SparkContext on the executor nodes.
+Over time this can use up a significant amount of space and will need to be cleaned up.
+
# Monitoring
Each driver program has a web UI, typically on port 4040, that displays information about running
diff --git a/docs/configuration.md b/docs/configuration.md
index 7940d41a27..97183bafdb 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -149,7 +149,7 @@ Apart from these, the following properties are also available, and may be useful
<td>spark.io.compression.codec</td>
<td>org.apache.spark.io.<br />LZFCompressionCodec</td>
<td>
- The compression codec class to use for various compressions. By default, Spark provides two
+ The codec used to compress internal data such as RDD partitions and shuffle outputs. By default, Spark provides two
codecs: <code>org.apache.spark.io.LZFCompressionCodec</code> and <code>org.apache.spark.io.SnappyCompressionCodec</code>.
</td>
</tr>
@@ -319,6 +319,14 @@ Apart from these, the following properties are also available, and may be useful
Should be greater than or equal to 1. Number of allowed retries = this value - 1.
</td>
</tr>
+<tr>
+ <td>spark.broadcast.blockSize</td>
+ <td>4096</td>
+ <td>
+ Size of each piece of a block in kilobytes for <code>TorrentBroadcastFactory</code>.
+ Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, <code>BlockManager</code> might take a performance hit.
+ </td>
+</tr>
</table>
diff --git a/docs/ec2-scripts.md b/docs/ec2-scripts.md
index 1e5575d657..156a727026 100644
--- a/docs/ec2-scripts.md
+++ b/docs/ec2-scripts.md
@@ -98,7 +98,7 @@ permissions on your private key file, you can run `launch` with the
`bin/hadoop` script in that directory. Note that the data in this
HDFS goes away when you stop and restart a machine.
- There is also a *persistent HDFS* instance in
- `/root/presistent-hdfs` that will keep data across cluster restarts.
+ `/root/persistent-hdfs` that will keep data across cluster restarts.
Typically each node has relatively little space of persistent data
(about 3 GB), but you can use the `--ebs-vol-size` option to
`spark-ec2` to attach a persistent EBS volume to each node for
diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md
index 6c2336ad0c..55e39b1de1 100644
--- a/docs/python-programming-guide.md
+++ b/docs/python-programming-guide.md
@@ -131,6 +131,17 @@ sc = SparkContext("local", "App Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg
Files listed here will be added to the `PYTHONPATH` and shipped to remote worker machines.
Code dependencies can be added to an existing SparkContext using its `addPyFile()` method.
+You can set [system properties](configuration.html#system-properties)
+using `SparkContext.setSystemProperty()` class method *before*
+instantiating SparkContext. For example, to set the amount of memory
+per executor process:
+
+{% highlight python %}
+from pyspark import SparkContext
+SparkContext.setSystemProperty('spark.executor.memory', '2g')
+sc = SparkContext("local", "App Name")
+{% endhighlight %}
+
# API Docs
[API documentation](api/pyspark/index.html) for PySpark is available as Epydoc.
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 30128ec45d..2898af0bed 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -34,6 +34,8 @@ Environment variables:
System Properties:
* 'spark.yarn.applicationMaster.waitTries', property to set the number of times the ApplicationMaster waits for the the spark master and then also the number of tries it waits for the Spark Context to be intialized. Default is 10.
+* 'spark.yarn.submit.file.replication', the HDFS replication level for the files uploaded into HDFS for the application. These include things like the spark jar, the app jar, and any distributed cache files/archives.
+* 'spark.yarn.preserve.staging.files', set to true to preserve the staged files(spark jar, app jar, distributed cache files) at the end of the job rather then delete them.
# Launching Spark on YARN
@@ -51,7 +53,10 @@ The command to launch the YARN Client is as follows:
--worker-memory <MEMORY_PER_WORKER> \
--worker-cores <CORES_PER_WORKER> \
--name <application_name> \
- --queue <queue_name>
+ --queue <queue_name> \
+ --addJars <any_local_files_used_in_SparkContext.addJar> \
+ --files <files_for_distributed_cache> \
+ --archives <archives_for_distributed_cache>
For example:
@@ -84,3 +89,5 @@ The above starts a YARN Client programs which periodically polls the Application
- When your application instantiates a Spark context it must use a special "yarn-standalone" master url. This starts the scheduler without forcing it to connect to a cluster. A good way to handle this is to pass "yarn-standalone" as an argument to your program, as shown in the example above.
- We do not requesting container resources based on the number of cores. Thus the numbers of cores given via command line arguments cannot be guaranteed.
- The local directories used for spark will be the local directories configured for YARN (Hadoop Yarn config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored.
+- The --files and --archives options support specifying file names with the # similar to Hadoop. For example you can specify: --files localtest.txt#appSees.txt and this will upload the file you have locally named localtest.txt into HDFS but this will be linked to by the name appSees.txt and your application should use the name as appSees.txt to reference it when running on YARN.
+- The --addJars option allows the SparkContext.addJar function to work if you are using it with local files. It does not need to be used if you are using it with HDFS, HTTP, HTTPS, or FTP files.
diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md
index 03647a2ad2..94e8563a8b 100644
--- a/docs/scala-programming-guide.md
+++ b/docs/scala-programming-guide.md
@@ -142,7 +142,7 @@ All transformations in Spark are <i>lazy</i>, in that they do not compute their
By default, each transformed RDD is recomputed each time you run an action on it. However, you may also *persist* an RDD in memory using the `persist` (or `cache`) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting datasets on disk, or replicated across the cluster. The next section in this document describes these options.
-The following tables list the transformations and actions currently supported (see also the [RDD API doc](api/core/index.html#org.apache.spark.RDD) for details):
+The following tables list the transformations and actions currently supported (see also the [RDD API doc](api/core/index.html#org.apache.spark.rdd.RDD) for details):
### Transformations
@@ -211,7 +211,7 @@ The following tables list the transformations and actions currently supported (s
</tr>
</table>
-A complete list of transformations is available in the [RDD API doc](api/core/index.html#org.apache.spark.RDD).
+A complete list of transformations is available in the [RDD API doc](api/core/index.html#org.apache.spark.rdd.RDD).
### Actions
@@ -259,7 +259,7 @@ A complete list of transformations is available in the [RDD API doc](api/core/in
</tr>
</table>
-A complete list of actions is available in the [RDD API doc](api/core/index.html#org.apache.spark.RDD).
+A complete list of actions is available in the [RDD API doc](api/core/index.html#org.apache.spark.rdd.RDD).
## RDD Persistence
diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index 81cdbefd0c..17066ef0dd 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -3,6 +3,9 @@ layout: global
title: Spark Standalone Mode
---
+* This will become a table of contents (this text will be scraped).
+{:toc}
+
In addition to running on the Mesos or YARN cluster managers, Spark also provides a simple standalone deploy mode. You can launch a standalone cluster either manually, by starting a master and workers by hand, or use our provided [launch scripts](#cluster-launch-scripts). It is also possible to run these daemons on a single machine for testing.
# Installing Spark Standalone to a Cluster
@@ -169,3 +172,75 @@ In addition, detailed log output for each job is also written to the work direct
You can run Spark alongside your existing Hadoop cluster by just launching it as a separate service on the same machines. To access Hadoop data from Spark, just use a hdfs:// URL (typically `hdfs://<namenode>:9000/path`, but you can find the right URL on your Hadoop Namenode's web UI). Alternatively, you can set up a separate cluster for Spark, and still have it access HDFS over the network; this will be slower than disk-local access, but may not be a concern if you are still running in the same local area network (e.g. you place a few Spark machines on each rack that you have Hadoop on).
+
+# High Availability
+
+By default, standalone scheduling clusters are resilient to Worker failures (insofar as Spark itself is resilient to losing work by moving it to other workers). However, the scheduler uses a Master to make scheduling decisions, and this (by default) creates a single point of failure: if the Master crashes, no new applications can be created. In order to circumvent this, we have two high availability schemes, detailed below.
+
+## Standby Masters with ZooKeeper
+
+**Overview**
+
+Utilizing ZooKeeper to provide leader election and some state storage, you can launch multiple Masters in your cluster connected to the same ZooKeeper instance. One will be elected "leader" and the others will remain in standby mode. If the current leader dies, another Master will be elected, recover the old Master's state, and then resume scheduling. The entire recovery process (from the time the the first leader goes down) should take between 1 and 2 minutes. Note that this delay only affects scheduling _new_ applications -- applications that were already running during Master failover are unaffected.
+
+Learn more about getting started with ZooKeeper [here](http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html).
+
+**Configuration**
+
+In order to enable this recovery mode, you can set SPARK_DAEMON_JAVA_OPTS in spark-env using this configuration:
+
+<table class="table">
+ <tr><th style="width:21%">System property</th><th>Meaning</th></tr>
+ <tr>
+ <td><code>spark.deploy.recoveryMode</code></td>
+ <td>Set to ZOOKEEPER to enable standby Master recovery mode (default: NONE).</td>
+ </tr>
+ <tr>
+ <td><code>spark.deploy.zookeeper.url</code></td>
+ <td>The ZooKeeper cluster url (e.g., 192.168.1.100:2181,192.168.1.101:2181).</td>
+ </tr>
+ <tr>
+ <td><code>spark.deploy.zookeeper.dir</code></td>
+ <td>The directory in ZooKeeper to store recovery state (default: /spark).</td>
+ </tr>
+</table>
+
+Possible gotcha: If you have multiple Masters in your cluster but fail to correctly configure the Masters to use ZooKeeper, the Masters will fail to discover each other and think they're all leaders. This will not lead to a healthy cluster state (as all Masters will schedule independently).
+
+**Details**
+
+After you have a ZooKeeper cluster set up, enabling high availability is straightforward. Simply start multiple Master processes on different nodes with the same ZooKeeper configuration (ZooKeeper URL and directory). Masters can be added and removed at any time.
+
+In order to schedule new applications or add Workers to the cluster, they need to know the IP address of the current leader. This can be accomplished by simply passing in a list of Masters where you used to pass in a single one. For example, you might start your SparkContext pointing to ``spark://host1:port1,host2:port2``. This would cause your SparkContext to try registering with both Masters -- if ``host1`` goes down, this configuration would still be correct as we'd find the new leader, ``host2``.
+
+There's an important distinction to be made between "registering with a Master" and normal operation. When starting up, an application or Worker needs to be able to find and register with the current lead Master. Once it successfully registers, though, it is "in the system" (i.e., stored in ZooKeeper). If failover occurs, the new leader will contact all previously registered applications and Workers to inform them of the change in leadership, so they need not even have known of the existence of the new Master at startup.
+
+Due to this property, new Masters can be created at any time, and the only thing you need to worry about is that _new_ applications and Workers can find it to register with in case it becomes the leader. Once registered, you're taken care of.
+
+## Single-Node Recovery with Local File System
+
+**Overview**
+
+ZooKeeper is the best way to go for production-level high availability, but if you just want to be able to restart the Master if it goes down, FILESYSTEM mode can take care of it. When applications and Workers register, they have enough state written to the provided directory so that they can be recovered upon a restart of the Master process.
+
+**Configuration**
+
+In order to enable this recovery mode, you can set SPARK_DAEMON_JAVA_OPTS in spark-env using this configuration:
+
+<table class="table">
+ <tr><th style="width:21%">System property</th><th>Meaning</th></tr>
+ <tr>
+ <td><code>spark.deploy.recoveryMode</code></td>
+ <td>Set to FILESYSTEM to enable single-node recovery mode (default: NONE).</td>
+ </tr>
+ <tr>
+ <td><code>spark.deploy.recoveryDirectory</code></td>
+ <td>The directory in which Spark will store recovery state, accessible from the Master's perspective.</td>
+ </tr>
+</table>
+
+**Details**
+
+* This solution can be used in tandem with a process monitor/manager like [monit](http://mmonit.com/monit/), or just to enable manual recovery via restart.
+* While filesystem recovery seems straightforwardly better than not doing any recovery at all, this mode may be suboptimal for certain development or experimental purposes. In particular, killing a master via stop-master.sh does not clean up its recovery state, so whenever you start a new Master, it will enter recovery mode. This could increase the startup time by up to 1 minute if it needs to wait for all previously-registered Workers/clients to timeout.
+* While it's not officially supported, you could mount an NFS directory as the recovery directory. If the original Master node dies completely, you could then start a Master on a different node, which would correctly recover all previously registered Workers/applications (equivalent to ZooKeeper recovery). Future applications will have to be able to find the new Master, however, in order to register.
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index c7df172024..851e30fe76 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -73,6 +73,10 @@ DStreams support many of the transformations available on normal Spark RDD's:
Iterator[T] => Iterator[U] when running on an DStream of type T. </td>
</tr>
<tr>
+ <td> <b>repartition</b>(<i>numPartitions</i>) </td>
+ <td> Changes the level of parallelism in this DStream by creating more or fewer partitions. </td>
+</tr>
+<tr>
<td> <b>union</b>(<i>otherStream</i>) </td>
<td> Return a new DStream that contains the union of the elements in the source DStream and the argument DStream. </td>
</tr>
@@ -122,12 +126,12 @@ Spark Streaming features windowed computations, which allow you to apply transfo
<table class="table">
<tr><th style="width:30%">Transformation</th><th>Meaning</th></tr>
<tr>
- <td> <b>window</b>(<i>windowDuration</i>, </i>slideDuration</i>) </td>
+ <td> <b>window</b>(<i>windowDuration</i>, <i>slideDuration</i>) </td>
<td> Return a new DStream which is computed based on windowed batches of the source DStream. <i>windowDuration</i> is the width of the window and <i>slideTime</i> is the frequency during which the window is calculated. Both times must be multiples of the batch interval.
</td>
</tr>
<tr>
- <td> <b>countByWindow</b>(<i>windowDuration</i>, </i>slideDuration</i>) </td>
+ <td> <b>countByWindow</b>(<i>windowDuration</i>, <i>slideDuration</i>) </td>
<td> Return a sliding count of elements in the stream. <i>windowDuration</i> and <i>slideDuration</i> are exactly as defined in <code>window()</code>.
</td>
</tr>
@@ -161,7 +165,6 @@ Spark Streaming features windowed computations, which allow you to apply transfo
<i>windowDuration</i> and <i>slideDuration</i> are exactly as defined in <code>window()</code>.
</td>
</tr>
-
</table>
A complete list of DStream operations is available in the API documentation of [DStream](api/streaming/index.html#org.apache.spark.streaming.DStream) and [PairDStreamFunctions](api/streaming/index.html#org.apache.spark.streaming.PairDStreamFunctions).
diff --git a/docs/tuning.md b/docs/tuning.md
index 28d88a2659..f491ae9b95 100644
--- a/docs/tuning.md
+++ b/docs/tuning.md
@@ -175,7 +175,7 @@ To further tune garbage collection, we first need to understand some basic infor
* Java Heap space is divided in to two regions Young and Old. The Young generation is meant to hold short-lived objects
while the Old generation is intended for objects with longer lifetimes.
-* The Young generation is further divided into three regions [Eden, Survivor1, Survivor2].
+* The Young generation is further divided into three regions \[Eden, Survivor1, Survivor2\].
* A simplified description of the garbage collection procedure: When Eden is full, a minor GC is run on Eden and objects
that are alive from Eden and Survivor1 are copied to Survivor2. The Survivor regions are swapped. If an object is old