aboutsummaryrefslogtreecommitdiff
path: root/docs/scala-programming-guide.md
diff options
context:
space:
mode:
authorHaoyuan Li <haoyuan@cs.berkeley.edu>2014-04-04 20:36:24 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-04 20:38:20 -0700
commitb50ddfde0342990979979e58348f54c10b500c90 (patch)
treecc7fa4d089375cded5056d9a93079e0b23a32ae7 /docs/scala-programming-guide.md
parent1347ebd4b52ffb9197fc4137a55dff6badb149ba (diff)
downloadspark-b50ddfde0342990979979e58348f54c10b500c90.tar.gz
spark-b50ddfde0342990979979e58348f54c10b500c90.tar.bz2
spark-b50ddfde0342990979979e58348f54c10b500c90.zip
SPARK-1305: Support persisting RDD's directly to Tachyon
Move the PR#468 of apache-incubator-spark to the apache-spark "Adding an option to persist Spark RDD blocks into Tachyon." Author: Haoyuan Li <haoyuan@cs.berkeley.edu> Author: RongGu <gurongwalker@gmail.com> Closes #158 from RongGu/master and squashes the following commits: 72b7768 [Haoyuan Li] merge master 9f7fa1b [Haoyuan Li] fix code style ae7834b [Haoyuan Li] minor cleanup a8b3ec6 [Haoyuan Li] merge master branch e0f4891 [Haoyuan Li] better check offheap. 55b5918 [RongGu] address matei's comment on the replication of offHeap storagelevel 7cd4600 [RongGu] remove some logic code for tachyonstore's replication 51149e7 [RongGu] address aaron's comment on returning value of the remove() function in tachyonstore 8adfcfa [RongGu] address arron's comment on inTachyonSize 120e48a [RongGu] changed the root-level dir name in Tachyon 5cc041c [Haoyuan Li] address aaron's comments 9b97935 [Haoyuan Li] address aaron's comments d9a6438 [Haoyuan Li] fix for pspark 77d2703 [Haoyuan Li] change python api.git status 3dcace4 [Haoyuan Li] address matei's comments 91fa09d [Haoyuan Li] address patrick's comments 589eafe [Haoyuan Li] use TRY_CACHE instead of MUST_CACHE 64348b2 [Haoyuan Li] update conf docs. ed73e19 [Haoyuan Li] Merge branch 'master' of github.com:RongGu/spark-1 619a9a8 [RongGu] set number of directories in TachyonStore back to 64; added a TODO tag for duplicated code from the DiskStore be79d77 [RongGu] find a way to clean up some unnecessay metods and classed to make the code simpler 49cc724 [Haoyuan Li] update docs with off_headp option 4572f9f [RongGu] reserving the old apply function API of StorageLevel 04301d3 [RongGu] rename StorageLevel.TACHYON to Storage.OFF_HEAP c9aeabf [RongGu] rename the StorgeLevel.TACHYON as StorageLevel.OFF_HEAP 76805aa [RongGu] unifies the config properties name prefix; add the configs into docs/configuration.md e700d9c [RongGu] add the SparkTachyonHdfsLR example and some comments fd84156 [RongGu] use randomUUID to generate sparkapp directory name on tachyon;minor code style fix 939e467 [Haoyuan Li] 0.4.1-thrift from maven central 86a2eab [Haoyuan Li] tachyon 0.4.1-thrift is in the staging repo. but jenkins failed to download it. temporarily revert it back to 0.4.1 16c5798 [RongGu] make the dependency on tachyon as tachyon-0.4.1-thrift eacb2e8 [RongGu] Merge branch 'master' of https://github.com/RongGu/spark-1 bbeb4de [RongGu] fix the JsonProtocolSuite test failure problem 6adb58f [RongGu] Merge branch 'master' of https://github.com/RongGu/spark-1 d827250 [RongGu] fix JsonProtocolSuie test failure 716e93b [Haoyuan Li] revert the version ca14469 [Haoyuan Li] bump tachyon version to 0.4.1-thrift 2825a13 [RongGu] up-merging to the current master branch of the apache spark 6a22c1a [Haoyuan Li] fix scalastyle 8968b67 [Haoyuan Li] exclude more libraries from tachyon dependency to be the same as referencing tachyon-client. 77be7e8 [RongGu] address mateiz's comment about the temp folder name problem. The implementation followed mateiz's advice. 1dcadf9 [Haoyuan Li] typo bf278fa [Haoyuan Li] fix python tests e82909c [Haoyuan Li] minor cleanup 776a56c [Haoyuan Li] address patrick's and ali's comments from the previous PR 8859371 [Haoyuan Li] various minor fixes and clean up e3ddbba [Haoyuan Li] add doc to use Tachyon cache mode. fcaeab2 [Haoyuan Li] address Aaron's comment e554b1e [Haoyuan Li] add python code 47304b3 [Haoyuan Li] make tachyonStore in BlockMananger lazy val; add more comments StorageLevels. dc8ef24 [Haoyuan Li] add old storelevel constructor e01a271 [Haoyuan Li] update tachyon 0.4.1 8011a96 [RongGu] fix a brought-in mistake in StorageLevel 70ca182 [RongGu] a bit change in comment 556978b [RongGu] fix the scalastyle errors 791189b [RongGu] "Adding an option to persist Spark RDD blocks into Tachyon." move the PR#468 of apache-incubator-spark to the apache-spark
Diffstat (limited to 'docs/scala-programming-guide.md')
-rw-r--r--docs/scala-programming-guide.md127
1 files changed, 93 insertions, 34 deletions
diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md
index 99412733d4..77373890ee 100644
--- a/docs/scala-programming-guide.md
+++ b/docs/scala-programming-guide.md
@@ -23,7 +23,7 @@ To write a Spark application, you need to add a dependency on Spark. If you use
groupId = org.apache.spark
artifactId = spark-core_{{site.SCALA_BINARY_VERSION}}
- version = {{site.SPARK_VERSION}}
+ version = {{site.SPARK_VERSION}}
In addition, if you wish to access an HDFS cluster, you need to add a dependency on `hadoop-client` for your version of HDFS:
@@ -73,14 +73,14 @@ The master URL passed to Spark can be in one of the following formats:
<table class="table">
<tr><th>Master URL</th><th>Meaning</th></tr>
<tr><td> local </td><td> Run Spark locally with one worker thread (i.e. no parallelism at all). </td></tr>
-<tr><td> local[K] </td><td> Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine).
+<tr><td> local[K] </td><td> Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine).
</td></tr>
-<tr><td> spark://HOST:PORT </td><td> Connect to the given <a href="spark-standalone.html">Spark standalone
- cluster</a> master. The port must be whichever one your master is configured to use, which is 7077 by default.
+<tr><td> spark://HOST:PORT </td><td> Connect to the given <a href="spark-standalone.html">Spark standalone
+ cluster</a> master. The port must be whichever one your master is configured to use, which is 7077 by default.
</td></tr>
-<tr><td> mesos://HOST:PORT </td><td> Connect to the given <a href="running-on-mesos.html">Mesos</a> cluster.
- The host parameter is the hostname of the Mesos master. The port must be whichever one the master is configured to use,
- which is 5050 by default.
+<tr><td> mesos://HOST:PORT </td><td> Connect to the given <a href="running-on-mesos.html">Mesos</a> cluster.
+ The host parameter is the hostname of the Mesos master. The port must be whichever one the master is configured to use,
+ which is 5050 by default.
</td></tr>
</table>
@@ -265,11 +265,25 @@ A complete list of actions is available in the [RDD API doc](api/core/index.html
## RDD Persistence
-One of the most important capabilities in Spark is *persisting* (or *caching*) a dataset in memory across operations. When you persist an RDD, each node stores any slices of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for building iterative algorithms with Spark and for interactive use from the interpreter.
-
-You can mark an RDD to be persisted using the `persist()` or `cache()` methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. The cache is fault-tolerant -- if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.
-
-In addition, each RDD can be stored using a different *storage level*, allowing you, for example, to persist the dataset on disk, or persist it in memory but as serialized Java objects (to save space), or even replicate it across nodes. These levels are chosen by passing a [`org.apache.spark.storage.StorageLevel`](api/core/index.html#org.apache.spark.storage.StorageLevel) object to `persist()`. The `cache()` method is a shorthand for using the default storage level, which is `StorageLevel.MEMORY_ONLY` (store deserialized objects in memory). The complete set of available storage levels is:
+One of the most important capabilities in Spark is *persisting* (or *caching*) a dataset in memory
+across operations. When you persist an RDD, each node stores any slices of it that it computes in
+memory and reuses them in other actions on that dataset (or datasets derived from it). This allows
+future actions to be much faster (often by more than 10x). Caching is a key tool for building
+iterative algorithms with Spark and for interactive use from the interpreter.
+
+You can mark an RDD to be persisted using the `persist()` or `cache()` methods on it. The first time
+it is computed in an action, it will be kept in memory on the nodes. The cache is fault-tolerant --
+if any partition of an RDD is lost, it will automatically be recomputed using the transformations
+that originally created it.
+
+In addition, each RDD can be stored using a different *storage level*, allowing you, for example, to
+persist the dataset on disk, or persist it in memory but as serialized Java objects (to save space),
+or replicate it across nodes, or store the data in off-heap memory in [Tachyon](http://tachyon-project.org/).
+These levels are chosen by passing a
+[`org.apache.spark.storage.StorageLevel`](api/core/index.html#org.apache.spark.storage.StorageLevel)
+object to `persist()`. The `cache()` method is a shorthand for using the default storage level,
+which is `StorageLevel.MEMORY_ONLY` (store deserialized objects in memory). The complete set of
+available storage levels is:
<table class="table">
<tr><th style="width:23%">Storage Level</th><th>Meaning</th></tr>
@@ -292,8 +306,16 @@ In addition, each RDD can be stored using a different *storage level*, allowing
</tr>
<tr>
<td> MEMORY_AND_DISK_SER </td>
- <td> Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them
- on the fly each time they're needed. </td>
+ <td> Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of
+ recomputing them on the fly each time they're needed. </td>
+</tr>
+<tr>
+ <td> OFF_HEAP </td>
+ <td> Store RDD in a <i>serialized</i> format in Tachyon.
+ This is generally more space-efficient than deserialized objects, especially when using a
+ <a href="tuning.html">fast serializer</a>, but more CPU-intensive to read.
+ This also significantly reduces the overheads of GC.
+ </td>
</tr>
<tr>
<td> DISK_ONLY </td>
@@ -307,30 +329,59 @@ In addition, each RDD can be stored using a different *storage level*, allowing
### Which Storage Level to Choose?
-Spark's storage levels are meant to provide different tradeoffs between memory usage and CPU efficiency.
-We recommend going through the following process to select one:
-
-* If your RDDs fit comfortably with the default storage level (`MEMORY_ONLY`), leave them that way. This is the most
- CPU-efficient option, allowing operations on the RDDs to run as fast as possible.
-* If not, try using `MEMORY_ONLY_SER` and [selecting a fast serialization library](tuning.html) to make the objects
- much more space-efficient, but still reasonably fast to access.
-* Don't spill to disk unless the functions that computed your datasets are expensive, or they filter a large
- amount of the data. Otherwise, recomputing a partition is about as fast as reading it from disk.
-* Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve requests from a web
- application). *All* the storage levels provide full fault tolerance by recomputing lost data, but the replicated ones
- let you continue running tasks on the RDD without waiting to recompute a lost partition.
-
-If you want to define your own storage level (say, with replication factor of 3 instead of 2), then use the function factor method `apply()` of the [`StorageLevel`](api/core/index.html#org.apache.spark.storage.StorageLevel$) singleton object.
+Spark's storage levels are meant to provide different trade-offs between memory usage and CPU
+efficiency. It allows uses to choose memory, disk, or Tachyon for storing data. We recommend going
+through the following process to select one:
+
+* If your RDDs fit comfortably with the default storage level (`MEMORY_ONLY`), leave them that way.
+ This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.
+
+* If not, try using `MEMORY_ONLY_SER` and [selecting a fast serialization library](tuning.html) to
+make the objects much more space-efficient, but still reasonably fast to access. You can also use
+`OFF_HEAP` mode to store the data off the heap in [Tachyon](http://tachyon-project.org/). This will
+significantly reduce JVM GC overhead.
+
+* Don't spill to disk unless the functions that computed your datasets are expensive, or they filter
+a large amount of the data. Otherwise, recomputing a partition is about as fast as reading it from
+disk.
+
+* Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve
+requests from a web application). *All* the storage levels provide full fault tolerance by
+recomputing lost data, but the replicated ones let you continue running tasks on the RDD without
+waiting to recompute a lost partition.
+
+If you want to define your own storage level (say, with replication factor of 3 instead of 2), then
+use the function factor method `apply()` of the
+[`StorageLevel`](api/core/index.html#org.apache.spark.storage.StorageLevel$) singleton object.
+
+Spark has a block manager inside the Executors that let you chose memory, disk, or off-heap. The
+latter is for storing RDDs off-heap outside the Executor JVM on top of the memory management system
+[Tachyon](http://tachyon-project.org/). This mode has the following advantages:
+
+* Cached data will not be lost if individual executors crash.
+* Executors can have a smaller memory footprint, allowing you to run more executors on the same
+machine as the bulk of the memory will be inside Tachyon.
+* Reduced GC overhead since data is stored in Tachyon.
# Shared Variables
-Normally, when a function passed to a Spark operation (such as `map` or `reduce`) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of *shared variables* for two common usage patterns: broadcast variables and accumulators.
+Normally, when a function passed to a Spark operation (such as `map` or `reduce`) is executed on a
+remote cluster node, it works on separate copies of all the variables used in the function. These
+variables are copied to each machine, and no updates to the variables on the remote machine are
+propagated back to the driver program. Supporting general, read-write shared variables across tasks
+would be inefficient. However, Spark does provide two limited types of *shared variables* for two
+common usage patterns: broadcast variables and accumulators.
## Broadcast Variables
-Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.
+Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather
+than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a
+large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables
+using efficient broadcast algorithms to reduce communication cost.
-Broadcast variables are created from a variable `v` by calling `SparkContext.broadcast(v)`. The broadcast variable is a wrapper around `v`, and its value can be accessed by calling the `value` method. The interpreter session below shows this:
+Broadcast variables are created from a variable `v` by calling `SparkContext.broadcast(v)`. The
+broadcast variable is a wrapper around `v`, and its value can be accessed by calling the `value`
+method. The interpreter session below shows this:
{% highlight scala %}
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
@@ -340,13 +391,21 @@ scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
{% endhighlight %}
-After the broadcast variable is created, it should be used instead of the value `v` in any functions run on the cluster so that `v` is not shipped to the nodes more than once. In addition, the object `v` should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later).
+After the broadcast variable is created, it should be used instead of the value `v` in any functions
+run on the cluster so that `v` is not shipped to the nodes more than once. In addition, the object
+`v` should not be modified after it is broadcast in order to ensure that all nodes get the same
+value of the broadcast variable (e.g. if the variable is shipped to a new node later).
## Accumulators
-Accumulators are variables that are only "added" to through an associative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric value types and standard mutable collections, and programmers can add support for new types.
+Accumulators are variables that are only "added" to through an associative operation and can
+therefore be efficiently supported in parallel. They can be used to implement counters (as in
+MapReduce) or sums. Spark natively supports accumulators of numeric value types and standard mutable
+collections, and programmers can add support for new types.
-An accumulator is created from an initial value `v` by calling `SparkContext.accumulator(v)`. Tasks running on the cluster can then add to it using the `+=` operator. However, they cannot read its value. Only the driver program can read the accumulator's value, using its `value` method.
+An accumulator is created from an initial value `v` by calling `SparkContext.accumulator(v)`. Tasks
+running on the cluster can then add to it using the `+=` operator. However, they cannot read its
+value. Only the driver program can read the accumulator's value, using its `value` method.
The interpreter session below shows an accumulator being used to add up the elements of an array: