aboutsummaryrefslogtreecommitdiff
path: root/docs
diff options
context:
space:
mode:
authorReza Zadeh <rizlar@gmail.com>2014-01-17 14:34:03 -0800
committerReza Zadeh <rizlar@gmail.com>2014-01-17 14:34:03 -0800
commitcaf97a25a2bd70ef5164c3ce0e8b59a8e39eb288 (patch)
tree3eaffca1eb0c9031c4f9acc2b91ea915beec25b1 /docs
parent4e96757793e7aee165381f80a60b3f46f60c9ebc (diff)
parentd749d472b37448edb322bc7208a3db925c9a4fc2 (diff)
downloadspark-caf97a25a2bd70ef5164c3ce0e8b59a8e39eb288.tar.gz
spark-caf97a25a2bd70ef5164c3ce0e8b59a8e39eb288.tar.bz2
spark-caf97a25a2bd70ef5164c3ce0e8b59a8e39eb288.zip
Merge remote-tracking branch 'upstream/master' into sparsesvd
Diffstat (limited to 'docs')
-rw-r--r--docs/building-with-maven.md6
-rw-r--r--docs/configuration.md2
-rw-r--r--docs/graphx-programming-guide.md315
-rw-r--r--docs/mllib-guide.md3
-rw-r--r--docs/python-programming-guide.md4
-rw-r--r--docs/streaming-programming-guide.md32
-rw-r--r--docs/tuning.md3
7 files changed, 228 insertions, 137 deletions
diff --git a/docs/building-with-maven.md b/docs/building-with-maven.md
index b9ff0af76f..6a9a8d6817 100644
--- a/docs/building-with-maven.md
+++ b/docs/building-with-maven.md
@@ -71,8 +71,8 @@ This setup works fine in IntelliJ IDEA 11.1.4. After opening the project via the
## Building Spark Debian Packages ##
-It includes support for building a Debian package containing a 'fat-jar' which includes the repl, the examples and bagel. This can be created by specifying the following profiles:
+The maven build includes support for building a Debian package containing the assembly 'fat-jar', PySpark, and the necessary scripts and configuration files. This can be created by specifying the following:
- $ mvn -Prepl-bin -Pdeb clean package
+ $ mvn -Pdeb -DskipTests clean package
-The debian package can then be found under repl/target. We added the short commit hash to the file name so that we can distinguish individual packages build for SNAPSHOT versions.
+The debian package can then be found under assembly/target. We added the short commit hash to the file name so that we can distinguish individual packages built for SNAPSHOT versions.
diff --git a/docs/configuration.md b/docs/configuration.md
index be06bd19be..da70cabba2 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -156,7 +156,7 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td>spark.shuffle.spill.compress</td>
- <td>false</td>
+ <td>true</td>
<td>
Whether to compress data spilled during shuffles.
</td>
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md
index 9fbde4eb09..3dfed7bea9 100644
--- a/docs/graphx-programming-guide.md
+++ b/docs/graphx-programming-guide.md
@@ -18,7 +18,7 @@ title: GraphX Programming Guide
GraphX is the new (alpha) Spark API for graphs and graph-parallel computation. At a high-level,
GraphX extends the Spark [RDD](api/core/index.html#org.apache.spark.rdd.RDD) by introducing the
-[Resilient Distributed property Graph (RDG)](#property_graph): a directed multigraph with properties
+[Resilient Distributed Property Graph](#property_graph): a directed multigraph with properties
attached to each vertex and edge. To support graph computation, GraphX exposes a set of fundamental
operators (e.g., [subgraph](#structural_operators), [joinVertices](#join_operators), and
[mapReduceTriplets](#mrTriplets)) as well as an optimized variant of the [Pregel](#pregel) API. In
@@ -29,7 +29,7 @@ addition, GraphX includes a growing collection of graph [algorithms](#graph_algo
From social networks to language modeling, the growing scale and importance of
graph data has driven the development of numerous new *graph-parallel* systems
-(e.g., [Giraph](http://http://giraph.apache.org) and
+(e.g., [Giraph](http://giraph.apache.org) and
[GraphLab](http://graphlab.org)). By restricting the types of computation that can be
expressed and introducing new techniques to partition and distribute graphs,
these systems can efficiently execute sophisticated graph algorithms orders of
@@ -43,12 +43,25 @@ magnitude faster than more general *data-parallel* systems.
<!-- Images are downsized intentionally to improve quality on retina displays -->
</p>
-However, the same restrictions that enable these substantial performance gains
-also make it difficult to express many of the important stages in a typical graph-analytics pipeline:
-constructing the graph, modifying its structure, or expressing computation that
-spans multiple graphs. As a consequence, existing graph analytics pipelines
-compose graph-parallel and data-parallel systems, leading to extensive data
-movement and duplication and a complicated programming model.
+However, the same restrictions that enable these substantial performance gains also make it
+difficult to express many of the important stages in a typical graph-analytics pipeline:
+constructing the graph, modifying its structure, or expressing computation that spans multiple
+graphs. Furthermore, how we look at data depends on our objectives and the same raw data may have
+many different table and graph views.
+
+<p style="text-align: center;">
+ <img src="img/tables_and_graphs.png"
+ title="Tables and Graphs"
+ alt="Tables and Graphs"
+ width="50%" />
+ <!-- Images are downsized intentionally to improve quality on retina displays -->
+</p>
+
+As a consequence, it is often necessary to be able to move between table and graph views of the same
+physical data and to leverage the properties of each view to easily and efficiently express
+computation. However, existing graph analytics pipelines must compose graph-parallel and data-
+parallel systems, leading to extensive data movement and duplication and a complicated programming
+model.
<p style="text-align: center;">
<img src="img/graph_analytics_pipeline.png"
@@ -95,17 +108,20 @@ with user defined objects attached to each vertex and edge. A directed multigra
graph with potentially multiple parallel edges sharing the same source and destination vertex. The
ability to support parallel edges simplifies modeling scenarios where there can be multiple
relationships (e.g., co-worker and friend) between the same vertices. Each vertex is keyed by a
-*unique* 64-bit long identifier (`VertexId`). Similarly, edges have corresponding source and
-destination vertex identifiers. GraphX does not impose any ordering or constraints on the vertex
-identifiers. The property graph is parameterized over the vertex `VD` and edge `ED` types. These
+*unique* 64-bit long identifier (`VertexID`). GraphX does not impose any ordering constraints on
+the vertex identifiers. Similarly, edges have corresponding source and destination vertex
+identifiers.
+
+The property graph is parameterized over the vertex (`VD`) and edge (`ED`) types. These
are the types of the objects associated with each vertex and edge respectively.
-> GraphX optimizes the representation of `VD` and `ED` when they are plain old data-types (e.g.,
-> int, double, etc...) reducing the in memory footprint.
+> GraphX optimizes the representation of vertex and edge types when they are plain old data-types
+> (e.g., int, double, etc...) reducing the in memory footprint by storing them in specialized
+> arrays.
-In some cases we may wish to have vertices with different property types in the same graph. This can
-be accomplished through inheritance. For example to model users and products as a bipartite graph
-we might do the following:
+In some cases it may be desirable to have vertices with different property types in the same graph.
+This can be accomplished through inheritance. For example to model users and products as a
+bipartite graph we might do the following:
{% highlight scala %}
class VertexProperty()
@@ -116,9 +132,11 @@ var graph: Graph[VertexProperty, String] = null
{% endhighlight %}
Like RDDs, property graphs are immutable, distributed, and fault-tolerant. Changes to the values or
-structure of the graph are accomplished by producing a new graph with the desired changes. The graph
-is partitioned across the workers using a range of vertex-partitioning heuristics. As with RDDs,
-each partition of the graph can be recreated on a different machine in the event of a failure.
+structure of the graph are accomplished by producing a new graph with the desired changes. Note
+that substantial parts of the original graph (i.e., unaffected structure, attributes, and indicies)
+are reused in the new graph reducing the cost of this inherently functional data-structure. The
+graph is partitioned across the workers using a range of vertex-partitioning heuristics. As with
+RDDs, each partition of the graph can be recreated on a different machine in the event of a failure.
Logically the property graph corresponds to a pair of typed collections (RDDs) encoding the
properties for each vertex and edge. As a consequence, the graph class contains members to access
@@ -131,12 +149,12 @@ class Graph[VD, ED] {
}
{% endhighlight %}
-The classes `VertexRDD[VD]` and `EdgeRDD[ED]` extend and are optimized versions of `RDD[(VertexId,
+The classes `VertexRDD[VD]` and `EdgeRDD[ED]` extend and are optimized versions of `RDD[(VertexID,
VD)]` and `RDD[Edge[ED]]` respectively. Both `VertexRDD[VD]` and `EdgeRDD[ED]` provide additional
functionality built around graph computation and leverage internal optimizations. We discuss the
`VertexRDD` and `EdgeRDD` API in greater detail in the section on [vertex and edge
RDDs](#vertex_and_edge_rdds) but for now they can be thought of as simply RDDs of the form:
-`RDD[(VertexId, VD)]` and `RDD[Edge[ED]]`.
+`RDD[(VertexID, VD)]` and `RDD[Edge[ED]]`.
### Example Property Graph
@@ -168,7 +186,7 @@ code constructs a graph from a collection of RDDs:
// Assume the SparkContext has already been constructed
val sc: SparkContext
// Create an RDD for the vertices
-val users: RDD[(VertexID, (String, String))] =
+val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
@@ -183,7 +201,7 @@ val graph = Graph(users, relationships, defaultUser)
In the above example we make use of the [`Edge`][Edge] case class. Edges have a `srcId` and a
`dstId` corresponding to the source and destination vertex identifiers. In addition, the `Edge`
-class contains the `attr` member which contains the edge property.
+class has an `attr` member which stores the edge property.
[Edge]: api/graphx/index.html#org.apache.spark.graphx.Edge
@@ -199,7 +217,7 @@ graph.edges.filter(e => e.srcId > e.dstId).count
{% endhighlight %}
> Note that `graph.vertices` returns an `VertexRDD[(String, String)]` which extends
-> `RDD[(VertexId, (String, String))]` and so we use the scala `case` expression to deconstruct the
+> `RDD[(VertexID, (String, String))]` and so we use the scala `case` expression to deconstruct the
> tuple. On the other hand, `graph.edges` returns an `EdgeRDD` containing `Edge[String]` objects.
> We could have also used the case class type constructor as in the following:
> {% highlight scala %}
@@ -266,6 +284,75 @@ able to support different graph representations in the future. Each graph repre
provide implementations of the core operations and reuse many of the useful operations defined in
[`GraphOps`][GraphOps].
+### Summary List of Operators
+The following is a quick summary of the functionality defined in both [`Graph`][Graph] and
+[`GraphOps`][GraphOps] but presented as members of Graph for simplicity. Note that some function
+signatures have been simplified (e.g., default arguments and type constraints removed) and some more
+advanced functionality has been removed so please consult the API docs for the official list of
+operations.
+
+{% highlight scala %}
+/** Summary of the functionality in the property graph */
+class Graph[VD, ED] {
+ // Information about the Graph ===================================================================
+ val numEdges: Long
+ val numVertices: Long
+ val inDegrees: VertexRDD[Int]
+ val outDegrees: VertexRDD[Int]
+ val degrees: VertexRDD[Int]
+ // Views of the graph as collections =============================================================
+ val vertices: VertexRDD[VD]
+ val edges: EdgeRDD[ED]
+ val triplets: RDD[EdgeTriplet[VD, ED]]
+ // Functions for caching graphs ==================================================================
+ def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
+ def cache(): Graph[VD, ED]
+ def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
+ // Change the partitioning heuristic ============================================================
+ def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
+ // Transform vertex and edge attributes ==========================================================
+ def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
+ def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
+ def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
+ def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
+ def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
+ : Graph[VD, ED2]
+ // Modify the graph structure ====================================================================
+ def reverse: Graph[VD, ED]
+ def subgraph(
+ epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
+ vpred: (VertexID, VD) => Boolean = ((v, d) => true))
+ : Graph[VD, ED]
+ def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
+ def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
+ // Join RDDs with the graph ======================================================================
+ def joinVertices[U](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD): Graph[VD, ED]
+ def outerJoinVertices[U, VD2](other: RDD[(VertexID, U)])
+ (mapFunc: (VertexID, VD, Option[U]) => VD2)
+ : Graph[VD2, ED]
+ // Aggregate information about adjacent triplets =================================================
+ def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]]
+ def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]]
+ def mapReduceTriplets[A: ClassTag](
+ mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
+ reduceFunc: (A, A) => A,
+ activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None)
+ : VertexRDD[A]
+ // Iterative graph-parallel computation ==========================================================
+ def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
+ vprog: (VertexID, VD, A) => VD,
+ sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
+ mergeMsg: (A, A) => A)
+ : Graph[VD, ED]
+ // Basic graph algorithms ========================================================================
+ def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
+ def connectedComponents(): Graph[VertexID, ED]
+ def triangleCount(): Graph[Int, ED]
+ def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED]
+}
+{% endhighlight %}
+
+
## Property Operators
In direct analogy to the RDD `map` operator, the property
@@ -273,7 +360,7 @@ graph contains the following:
{% highlight scala %}
class Graph[VD, ED] {
- def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
+ def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}
@@ -295,7 +382,7 @@ val newGraph = Graph(newVertices, graph.edges)
val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))
{% endhighlight %}
-[Graph.mapVertices]: api/graphx/index.html#org.apache.spark.graphx.Graph@mapVertices[VD2]((VertexID,VD)⇒VD2)(ClassTag[VD2]):Graph[VD2,ED]
+[Graph.mapVertices]: api/graphx/index.html#org.apache.spark.graphx.Graph@mapVertices[VD2]((VertexId,VD)⇒VD2)(ClassTag[VD2]):Graph[VD2,ED]
These operators are often used to initialize the graph for a particular computation or project away
unnecessary properties. For example, given a graph with the out-degrees as the vertex properties
@@ -321,7 +408,7 @@ add more in the future. The following is a list of the basic structural operato
class Graph[VD, ED] {
def reverse: Graph[VD, ED]
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
- vpred: (VertexID, VD) => Boolean): Graph[VD, ED]
+ vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}
@@ -340,11 +427,11 @@ satisfy the edge predicate *and connect vertices that satisfy the vertex predica
operator can be used in number of situations to restrict the graph to the vertices and edges of
interest or eliminate broken links. For example in the following code we remove broken links:
-[Graph.subgraph]: api/graphx/index.html#org.apache.spark.graphx.Graph@subgraph((EdgeTriplet[VD,ED])⇒Boolean,(VertexID,VD)⇒Boolean):Graph[VD,ED]
+[Graph.subgraph]: api/graphx/index.html#org.apache.spark.graphx.Graph@subgraph((EdgeTriplet[VD,ED])⇒Boolean,(VertexId,VD)⇒Boolean):Graph[VD,ED]
{% highlight scala %}
// Create an RDD for the vertices
-val users: RDD[(VertexID, (String, String))] =
+val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
(4L, ("peter", "student"))))
@@ -407,9 +494,9 @@ using the *join* operators. Below we list the key join operators:
{% highlight scala %}
class Graph[VD, ED] {
- def joinVertices[U](table: RDD[(VertexID, U)])(map: (VertexID, VD, U) => VD)
+ def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
: Graph[VD, ED]
- def outerJoinVertices[U, VD2](table: RDD[(VertexID, U)])(map: (VertexID, VD, Option[U]) => VD2)
+ def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]
}
{% endhighlight %}
@@ -419,13 +506,13 @@ returns a new graph with the vertex properties obtained by applying the user def
to the result of the joined vertices. Vertices without a matching value in the RDD retain their
original value.
-[GraphOps.joinVertices]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@joinVertices[U](RDD[(VertexID,U)])((VertexID,VD,U)⇒VD)(ClassTag[U]):Graph[VD,ED]
+[GraphOps.joinVertices]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@joinVertices[U](RDD[(VertexId,U)])((VertexId,VD,U)⇒VD)(ClassTag[U]):Graph[VD,ED]
> Note that if the RDD contains more than one value for a given vertex only one will be used. It
> is therefore recommended that the input RDD be first made unique using the following which will
> also *pre-index* the resulting values to substantially accelerate the subsequent join.
> {% highlight scala %}
-val nonUniqueCosts: RDD[(VertexId, Double)]
+val nonUniqueCosts: RDD[(VertexID, Double)]
val uniqueCosts: VertexRDD[Double] =
graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b)
val joinedGraph = graph.joinVertices(uniqueCosts)(
@@ -438,7 +525,7 @@ property type. Because not all vertices may have a matching value in the input
function takes an `Option` type. For example, we can setup a graph for PageRank by initializing
vertex properties with their `outDegree`.
-[Graph.outerJoinVertices]: api/graphx/index.html#org.apache.spark.graphx.Graph@outerJoinVertices[U,VD2](RDD[(VertexID,U)])((VertexID,VD,Option[U])⇒VD2)(ClassTag[U],ClassTag[VD2]):Graph[VD2,ED]
+[Graph.outerJoinVertices]: api/graphx/index.html#org.apache.spark.graphx.Graph@outerJoinVertices[U,VD2](RDD[(VertexId,U)])((VertexId,VD,Option[U])⇒VD2)(ClassTag[U],ClassTag[VD2]):Graph[VD2,ED]
{% highlight scala %}
@@ -457,7 +544,7 @@ val degreeGraph = graph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt)
> provide type annotation for the user defined function:
> {% highlight scala %}
val joinedGraph = graph.joinVertices(uniqueCosts,
- (id: VertexId, oldCost: Double, extraCost: Double) => oldCost + extraCost)
+ (id: VertexID, oldCost: Double, extraCost: Double) => oldCost + extraCost)
{% endhighlight %}
@@ -472,7 +559,7 @@ PageRank Value, shortest path to the source, and smallest reachable vertex id).
### Map Reduce Triplets (mapReduceTriplets)
<a name="mrTriplets"></a>
-[Graph.mapReduceTriplets]: api/graphx/index.html#org.apache.spark.graphx.Graph@mapReduceTriplets[A](mapFunc:org.apache.spark.graphx.EdgeTriplet[VD,ED]=&gt;Iterator[(org.apache.spark.graphx.VertexID,A)],reduceFunc:(A,A)=&gt;A,activeSetOpt:Option[(org.apache.spark.graphx.VertexRDD[_],org.apache.spark.graphx.EdgeDirection)])(implicitevidence$10:scala.reflect.ClassTag[A]):org.apache.spark.graphx.VertexRDD[A]
+[Graph.mapReduceTriplets]: api/graphx/index.html#org.apache.spark.graphx.Graph@mapReduceTriplets[A](mapFunc:org.apache.spark.graphx.EdgeTriplet[VD,ED]=&gt;Iterator[(org.apache.spark.graphx.VertexId,A)],reduceFunc:(A,A)=&gt;A,activeSetOpt:Option[(org.apache.spark.graphx.VertexRDD[_],org.apache.spark.graphx.EdgeDirection)])(implicitevidence$10:scala.reflect.ClassTag[A]):org.apache.spark.graphx.VertexRDD[A]
The core (heavily optimized) aggregation primitive in GraphX is the
[`mapReduceTriplets`][Graph.mapReduceTriplets] operator:
@@ -480,7 +567,7 @@ The core (heavily optimized) aggregation primitive in GraphX is the
{% highlight scala %}
class Graph[VD, ED] {
def mapReduceTriplets[A](
- map: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
+ map: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduce: (A, A) => A)
: VertexRDD[A]
}
@@ -495,26 +582,26 @@ containing the aggregate message (of type `A`) destined to each vertex. Vertice
receive a message are not included in the returned `VertexRDD`.
<blockquote>
-<p>
-Note that <code>mapReduceTriplets</code> takes an additional optional <code>activeSet</code>
-(see API docs) which restricts the map phase to edges adjacent to the vertices in the provided
-<code>VertexRDD</code>:
-</p>
+
+<p>Note that <code>mapReduceTriplets</code> takes an additional optional <code>activeSet</code>
+(not shown above see API docs for details) which restricts the map phase to edges adjacent to the
+vertices in the provided <code>VertexRDD</code>: </p>
+
{% highlight scala %}
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None
{% endhighlight %}
-<p>
-The EdgeDirection specifies which edges adjacent to the vertex set are included in the map phase. If
-the direction is <code>In</code>, <code>mapFunc</code> will only be run only on edges with
-destination in the active set. If the direction is <code>Out</code>, <code>mapFunc</code> will only
-be run only on edges originating from vertices in the active set. If the direction is
-<code>Either</code>, <code>mapFunc</code> will be run only on edges with <i>either</i> vertex in the
-active set. If the direction is <code>Both</code>, <code>mapFunc</code> will be run only on edges
-with both vertices in the active set. The active set must be derived from the set of vertices in
-the graph. Restricting computation to triplets adjacent to a subset of the vertices is often
-necessary in incremental iterative computation and is a key part of the GraphX implementation of
-Pregel.
-</p>
+
+<p>The EdgeDirection specifies which edges adjacent to the vertex set are included in the map
+phase. If the direction is <code>In</code>, then the user defined <code>map</code> function will
+only be run only on edges with the destination vertex in the active set. If the direction is
+<code>Out</code>, then the <code>map</code> function will only be run only on edges originating from
+vertices in the active set. If the direction is <code>Either</code>, then the <code>map</code>
+function will be run only on edges with <i>either</i> vertex in the active set. If the direction is
+<code>Both</code>, then the <code>map</code> function will be run only on edges with both vertices
+in the active set. The active set must be derived from the set of vertices in the graph.
+Restricting computation to triplets adjacent to a subset of the vertices is often necessary in
+incremental iterative computation and is a key part of the GraphX implementation of Pregel. </p>
+
</blockquote>
In the following example we use the `mapReduceTriplets` operator to compute the average age of the
@@ -547,8 +634,8 @@ val avgAgeOfOlderFollowers: VertexRDD[Double] =
avgAgeOfOlderFollowers.collect.foreach(println(_))
{% endhighlight %}
-> Note that the `mapReduceTriplets` operation performs optimally when the messages (and their sums)
-> are constant sized (e.g., floats and addition instead of lists and concatenation). More
+> Note that the `mapReduceTriplets` operation performs optimally when the messages (and the sums of
+> messages) are constant sized (e.g., floats and addition instead of lists and concatenation). More
> precisely, the result of `mapReduceTriplets` should ideally be sub-linear in the degree of each
> vertex.
@@ -562,13 +649,13 @@ compute the max in, out, and total degrees:
{% highlight scala %}
// Define a reduce operation to compute the highest degree vertex
-def max(a: (VertexID, Int), b: (VertexID, Int)): (VertexID, Int) = {
+def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 > b._2) a else b
}
// Compute the max degrees
-val maxInDegree: (VertexID, Int) = graph.inDegrees.reduce(max)
-val maxOutDegree: (VertexID, Int) = graph.outDegrees.reduce(max)
-val maxDegrees: (VertexID, Int) = graph.degrees.reduce(max)
+val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)
+val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
+val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)
{% endhighlight %}
### Collecting Neighbors
@@ -578,14 +665,14 @@ attributes at each vertex. This can be easily accomplished using the
[`collectNeighborIds`][GraphOps.collectNeighborIds] and the
[`collectNeighbors`][GraphOps.collectNeighbors] operators.
-[GraphOps.collectNeighborIds]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighborIds(EdgeDirection):VertexRDD[Array[VertexID]]
-[GraphOps.collectNeighbors]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighbors(EdgeDirection):VertexRDD[Array[(VertexID,VD)]]
+[GraphOps.collectNeighborIds]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighborIds(EdgeDirection):VertexRDD[Array[VertexId]]
+[GraphOps.collectNeighbors]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighbors(EdgeDirection):VertexRDD[Array[(VertexId,VD)]]
{% highlight scala %}
class GraphOps[VD, ED] {
- def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]]
- def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexID, VD)] ]
+ def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
+ def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ]
}
{% endhighlight %}
@@ -593,11 +680,20 @@ class GraphOps[VD, ED] {
> substantial communication. If possible try expressing the same computation using the
> `mapReduceTriplets` operator directly.
+## Caching and Uncaching
+
+In Spark, RDDs are not persisted in memory by default. To avoid recomputation, they must be explicitly cached when using them multiple times (see the [Spark Programming Guide][RDD Persistence]). Graphs in GraphX behave the same way. **When using a graph multiple times, make sure to call [`Graph.cache()`][Graph.cache] on it first.**
+
+[RDD Persistence]: scala-programming-guide.html#rdd-persistence
+[Graph.cache]: api/graphx/index.html#org.apache.spark.graphx.Graph@cache():Graph[VD,ED]
+
+In iterative computations, *uncaching* may also be necessary for best performance. By default, cached RDDs and graphs will remain in memory until memory pressure forces them to be evicted in LRU order. For iterative computation, intermediate results from previous iterations will fill up the cache. Though they will eventually be evicted, the unnecessary data stored in memory will slow down garbage collection. It would be more efficient to uncache intermediate results as soon as they are no longer necessary. This involves materializing (caching and forcing) a graph or RDD every iteration, uncaching all other datasets, and only using the materialized dataset in future iterations. However, because graphs are composed of multiple RDDs, it can be difficult to unpersist them correctly. **For iterative computation we recommend using the Pregel API, which correctly unpersists intermediate results.**
+
# Pregel API
<a name="pregel"></a>
Graphs are inherently recursive data-structures as properties of vertices depend on properties of
-their neighbors which intern depend on properties of *their* neighbors. As a
+their neighbors which in turn depend on properties of *their* neighbors. As a
consequence many important graph algorithms iteratively recompute the properties of each vertex
until a fixed-point condition is reached. A range of graph-parallel abstractions have been proposed
to express these iterative algorithms. GraphX exposes a Pregel-like operator which is a fusion of
@@ -620,7 +716,7 @@ messages remaining.
The following is the type signature of the [Pregel operator][GraphOps.pregel] as well as a *sketch*
of its implementation (note calls to graph.cache have been removed):
-[GraphOps.pregel]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@pregel[A](A,Int,EdgeDirection)((VertexID,VD,A)⇒VD,(EdgeTriplet[VD,ED])⇒Iterator[(VertexID,A)],(A,A)⇒A)(ClassTag[A]):Graph[VD,ED]
+[GraphOps.pregel]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@pregel[A](A,Int,EdgeDirection)((VertexId,VD,A)⇒VD,(EdgeTriplet[VD,ED])⇒Iterator[(VertexId,A)],(A,A)⇒A)(ClassTag[A]):Graph[VD,ED]
{% highlight scala %}
class GraphOps[VD, ED] {
@@ -628,8 +724,8 @@ class GraphOps[VD, ED] {
(initialMsg: A,
maxIter: Int = Int.MaxValue,
activeDir: EdgeDirection = EdgeDirection.Out)
- (vprog: (VertexID, VD, A) => VD,
- sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
+ (vprog: (VertexId, VD, A) => VD,
+ sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
// Receive the initial message at each vertex
@@ -674,7 +770,7 @@ import org.apache.spark.graphx.util.GraphGenerators
// A graph with edge attributes containing distances
val graph: Graph[Int, Double] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
-val sourceId: VertexID = 42 // The ultimate source
+val sourceId: VertexId = 42 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
@@ -721,7 +817,7 @@ It creates a `Graph` from the specified edges, automatically creating any vertic
{% highlight scala %}
object Graph {
def apply[VD, ED](
- vertices: RDD[(VertexID, VD)],
+ vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD = null)
: Graph[VD, ED]
@@ -731,7 +827,7 @@ object Graph {
defaultValue: VD): Graph[VD, ED]
def fromEdgeTuples[VD](
- rawEdges: RDD[(VertexID, VertexID)],
+ rawEdges: RDD[(VertexId, VertexId)],
defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int]
@@ -747,8 +843,8 @@ object Graph {
[PartitionStrategy]: api/graphx/index.html#org.apache.spark.graphx.PartitionStrategy$
[GraphLoader.edgeListFile]: api/graphx/index.html#org.apache.spark.graphx.GraphLoader$@edgeListFile(SparkContext,String,Boolean,Int):Graph[Int,Int]
-[Graph.apply]: api/graphx/index.html#org.apache.spark.graphx.Graph$@apply[VD,ED](RDD[(VertexID,VD)],RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED]
-[Graph.fromEdgeTuples]: api/graphx/index.html#org.apache.spark.graphx.Graph$@fromEdgeTuples[VD](RDD[(VertexID,VertexID)],VD,Option[PartitionStrategy])(ClassTag[VD]):Graph[VD,Int]
+[Graph.apply]: api/graphx/index.html#org.apache.spark.graphx.Graph$@apply[VD,ED](RDD[(VertexId,VD)],RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED]
+[Graph.fromEdgeTuples]: api/graphx/index.html#org.apache.spark.graphx.Graph$@fromEdgeTuples[VD](RDD[(VertexId,VertexId)],VD,Option[PartitionStrategy])(ClassTag[VD]):Graph[VD,Int]
[Graph.fromEdges]: api/graphx/index.html#org.apache.spark.graphx.Graph$@fromEdges[VD,ED](RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED]
# Vertex and Edge RDDs
@@ -761,47 +857,46 @@ respectively. In this section we review some of the additional useful functiona
## VertexRDDs
-The `VertexRDD[A]` extends the more traditional `RDD[(VertexId, A)]` but adds the additional
-constraint that each `VertexId` occurs only *once*. Moreover, `VertexRDD[A]` represents a *set* of
-vertices each with an attribute of type `A`. Internally, this is achieved by storing the vertex
-attributes in a reusable hash-map data-structure. As a consequence if two `VertexRDD`s are derived
-from the same base `VertexRDD` (e.g., by `filter` or `mapValues`) they can be joined in constant
-time without hash evaluations. To leverage this indexed data-structure, the `VertexRDD` exposes the
-following additional functionality:
+The `VertexRDD[A]` extends `RDD[(VertexID, A)]` and adds the additional constraint that each
+`VertexID` occurs only *once*. Moreover, `VertexRDD[A]` represents a *set* of vertices each with an
+attribute of type `A`. Internally, this is achieved by storing the vertex attributes in a reusable
+hash-map data-structure. As a consequence if two `VertexRDD`s are derived from the same base
+`VertexRDD` (e.g., by `filter` or `mapValues`) they can be joined in constant time without hash
+evaluations. To leverage this indexed data-structure, the `VertexRDD` exposes the following
+additional functionality:
{% highlight scala %}
-class VertexRDD[VD] {
+class VertexRDD[VD] extends RDD[(VertexID, VD)] {
// Filter the vertex set but preserves the internal index
- def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD]
+ def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
// Transform the values without changing the ids (preserves the internal index)
def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
- def mapValues[VD2](map: (VertexID, VD) => VD2): VertexRDD[VD2]
+ def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
// Remove vertices from this set that appear in the other set
def diff(other: VertexRDD[VD]): VertexRDD[VD]
// Join operators that take advantage of the internal indexing to accelerate joins (substantially)
- def leftJoin[VD2, VD3](other: RDD[(VertexID, VD2)])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3]
- def innerJoin[U, VD2](other: RDD[(VertexID, U)])(f: (VertexID, VD, U) => VD2): VertexRDD[VD2]
+ def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
+ def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
// Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
- def aggregateUsingIndex[VD2](other: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
+ def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
}
{% endhighlight %}
Notice, for example, how the `filter` operator returns an `VertexRDD`. Filter is actually
implemented using a `BitSet` thereby reusing the index and preserving the ability to do fast joins
with other `VertexRDD`s. Likewise, the `mapValues` operators do not allow the `map` function to
-change the `VertexId` thereby enabling the same `HashMap` data-structures to be reused. Both the
+change the `VertexID` thereby enabling the same `HashMap` data-structures to be reused. Both the
`leftJoin` and `innerJoin` are able to identify when joining two `VertexRDD`s derived from the same
`HashMap` and implement the join by linear scan rather than costly point lookups.
-The `aggregateUsingIndex` operator can be slightly confusing but is also useful for efficient
-construction of a new `VertexRDD` from an `RDD[(VertexId, A)]`. Conceptually, if I have constructed
-a `VertexRDD[B]` over a set of vertices, *which is a super-set* of the vertices in some
-`RDD[(VertexId, A)]` then I can reuse the index to both aggregate and then subsequently index the
-RDD. For example:
+The `aggregateUsingIndex` operator is useful for efficient construction of a new `VertexRDD` from an
+`RDD[(VertexID, A)]`. Conceptually, if I have constructed a `VertexRDD[B]` over a set of vertices,
+*which is a super-set* of the vertices in some `RDD[(VertexID, A)]` then I can reuse the index to
+both aggregate and then subsequently index the `RDD[(VertexID, A)]`. For example:
{% highlight scala %}
val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
-val rddB: RDD[(VertexID, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
+val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
// There should be 200 entries in rddB
rddB.count
val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
@@ -813,10 +908,10 @@ val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
## EdgeRDDs
-The `EdgeRDD[ED]`, which extends `RDD[Edge[ED]]` is considerably simpler than the `VertexRDD`.
-GraphX organizes the edges in blocks partitioned using one of the various partitioning strategies
-defined in [`PartitionStrategy`][PartitionStrategy]. Within each partition, edge attributes and
-adjacency structure, are stored separately enabling maximum reuse when changing attribute values.
+The `EdgeRDD[ED]`, which extends `RDD[Edge[ED]]` organizes the edges in blocks partitioned using one
+of the various partitioning strategies defined in [`PartitionStrategy`][PartitionStrategy]. Within
+each partition, edge attributes and adjacency structure, are stored separately enabling maximum
+reuse when changing attribute values.
[PartitionStrategy]: api/graphx/index.html#org.apache.spark.graphx.PartitionStrategy
@@ -827,11 +922,11 @@ def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
// Revere the edges reusing both attributes and structure
def reverse: EdgeRDD[ED]
// Join two `EdgeRDD`s partitioned using the same partitioning strategy.
-def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexID, VertexID, ED, ED2) => ED3): EdgeRDD[ED3]
+def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
{% endhighlight %}
In most applications we have found that operations on the `EdgeRDD` are accomplished through the
-graph or rely on operations defined in the base `RDD` class.
+graph operators or rely on operations defined in the base `RDD` class.
# Optimized Representation
@@ -853,7 +948,9 @@ reduce both the communication and storage overhead. Logically, this corresponds
to machines and allowing vertices to span multiple machines. The exact method of assigning edges
depends on the [`PartitionStrategy`][PartitionStrategy] and there are several tradeoffs to the
various heuristics. Users can choose between different strategies by repartitioning the graph with
-the [`Graph.partitionBy`][Graph.partitionBy] operator.
+the [`Graph.partitionBy`][Graph.partitionBy] operator. The default partitioning strategy is to use
+the initial partitioning of the edges as provided on graph construction. However, users can easily
+switch to 2D-partitioning or other heuristics included in GraphX.
[Graph.partitionBy]: api/graphx/index.html#org.apache.spark.graphx.Graph$@partitionBy(partitionStrategy:org.apache.spark.graphx.PartitionStrategy):org.apache.spark.graphx.Graph[VD,ED]
@@ -867,16 +964,15 @@ the [`Graph.partitionBy`][Graph.partitionBy] operator.
Once the edges have be partitioned the key challenge to efficient graph-parallel computation is
efficiently joining vertex attributes with the edges. Because real-world graphs typically have more
-edges than vertices, we move vertex attributes to the edges.
-
-
-
-
+edges than vertices, we move vertex attributes to the edges. Because not all partitions will
+contain edges adjacent to all vertices we internally maintain a routing table which identifies where
+to broadcast vertices when implementing the join required for operations like `triplets` and
+`mapReduceTriplets`.
# Graph Algorithms
<a name="graph_algorithms"></a>
-GraphX includes a set of graph algorithms in to simplify analytics. The algorithms are contained in the `org.apache.spark.graphx.lib` package and can be accessed directly as methods on `Graph` via [`GraphOps`][GraphOps]. This section describes the algorithms and how they are used.
+GraphX includes a set of graph algorithms to simplify analytics tasks. The algorithms are contained in the `org.apache.spark.graphx.lib` package and can be accessed directly as methods on `Graph` via [`GraphOps`][GraphOps]. This section describes the algorithms and how they are used.
## PageRank
<a name="pagerank"></a>
@@ -953,13 +1049,6 @@ val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =
println(triCountByUsername.collect().mkString("\n"))
{% endhighlight %}
-<p style="text-align: center;">
- <img src="img/tables_and_graphs.png"
- title="Tables and Graphs"
- alt="Tables and Graphs"
- width="50%" />
- <!-- Images are downsized intentionally to improve quality on retina displays -->
-</p>
# Examples
diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md
index 5be8ce1ebe..0cc5505b50 100644
--- a/docs/mllib-guide.md
+++ b/docs/mllib-guide.md
@@ -21,7 +21,8 @@ depends on native Fortran routines. You may need to install the
if it is not already present on your nodes. MLlib will throw a linking error if it cannot
detect these libraries automatically.
-To use MLlib in Python, you will also need [NumPy](http://www.numpy.org) version 1.7 or newer.
+To use MLlib in Python, you will need [NumPy](http://www.numpy.org) version 1.7 or newer
+and Python 2.7.
# Binary Classification
diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md
index b07899c2e1..7c5283fb0b 100644
--- a/docs/python-programming-guide.md
+++ b/docs/python-programming-guide.md
@@ -52,7 +52,7 @@ In addition, PySpark fully supports interactive use---simply run `./bin/pyspark`
# Installing and Configuring PySpark
-PySpark requires Python 2.7 or higher.
+PySpark requires Python 2.6 or higher.
PySpark applications are executed using a standard CPython interpreter in order to support Python modules that use C extensions.
We have not tested PySpark with Python 3 or with alternative Python interpreters, such as [PyPy](http://pypy.org/) or [Jython](http://www.jython.org/).
@@ -152,7 +152,7 @@ Many of the methods also contain [doctests](http://docs.python.org/2/library/doc
# Libraries
[MLlib](mllib-guide.html) is also available in PySpark. To use it, you'll need
-[NumPy](http://www.numpy.org) version 1.7 or newer. The [MLlib guide](mllib-guide.html) contains
+[NumPy](http://www.numpy.org) version 1.7 or newer, and Python 2.7. The [MLlib guide](mllib-guide.html) contains
some example applications.
# Where to Go from Here
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 4e8a680a75..07c4c55633 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -21,6 +21,8 @@ Add the following SBT or Maven dependency to your project to use Spark Streaming
artifactId = spark-streaming_{{site.SCALA_VERSION}}
version = {{site.SPARK_VERSION}}
+For ingesting data from sources like Kafka and Flume, add the corresponding artifact `spark-streaming-xyz_{{site.SCALA_VERSION}}` to the dependencies. For example, `spark-streaming-kafka_{{site.SCALA_VERSION}}` for Kafka, `spark-streaming-flume_{{site.SCALA_VERSION}}`, etc. Please refer to the [Apache repository](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20AND%20v%3A%22{{site.SPARK_VERSION}}%22) for the full list of supported sources / artifacts.
+
# Initializing Spark Streaming
The first thing a Spark Streaming program must do is create a `StreamingContext` object, which tells Spark how to access a cluster. A `StreamingContext` can be created by using
@@ -28,26 +30,28 @@ The first thing a Spark Streaming program must do is create a `StreamingContext`
new StreamingContext(master, appName, batchDuration, [sparkHome], [jars])
{% endhighlight %}
-The `master` parameter is a standard [Spark cluster URL](scala-programming-guide.html#master-urls) and can be "local" for local testing. The `appName` is a name of your program, which will be shown on your cluster's web UI. The `batchDuration` is the size of the batches (as explained earlier). This must be set carefully such that the cluster can keep up with the processing of the data streams. Start with something conservative like 5 seconds. See the [Performance Tuning](#setting-the-right-batch-size) section for a detailed discussion. Finally, `sparkHome` and `jars` are necessary when running on a cluster to specify the location of your code, as described in the [Spark programming guide](scala-programming-guide.html#deploying-code-on-a-cluster).
+The `master` parameter is a standard [Spark cluster URL](scala-programming-guide.html#master-urls) and can be "local" for local testing. The `appName` is a name of your program, which will be shown on your cluster's web UI. The `batchDuration` is the size of the batches (as explained earlier). This must be set carefully such that the cluster can keep up with the processing of the data streams. Start with something conservative like 5 seconds. See the [Performance Tuning](#setting-the-right-batch-size) section for a detailed discussion. Finally, `sparkHome` and `jars` are optional parameters, which need to be set when running on a cluster to specify the location of your code, as described in the [Spark programming guide](scala-programming-guide.html#deploying-code-on-a-cluster).
-This constructor creates a SparkContext for your job as well, which can be accessed with `streamingContext.sparkContext`.
+{% highlight scala %}
+new SparkConf(conf, batchDuration)
+{% endhighlight %}
+where `conf` is a [SparkConf](api/core/index.html#org.apache.spark.SparkConf)
+object used for more advanced configuration. In both cases, a [SparkContext](api/core/index.html#org.apache.spark.SparkContext) is created as well which can be accessed with `streamingContext.sparkContext`.
-# Attaching Input Sources - InputDStreams
-The StreamingContext is used to creating InputDStreams from input sources:
+# Attaching Input Sources
+The StreamingContext is used to creating input streams from data sources:
{% highlight scala %}
// Assuming ssc is the StreamingContext
-ssc.textFileStream(directory) // Creates a stream by monitoring and processing new files in a HDFS directory
-ssc.socketStream(hostname, port) // Creates a stream that uses a TCP socket to read data from hostname:port
+ssc.textFileStream(directory) // Creates a stream that monitors and processes new files in a HDFS directory
+ssc.socketStream(hostname, port) // Creates a stream that uses a TCP socket to read data from hostname:port
{% endhighlight %}
-We also provide a input streams for Kafka, Flume, Akka actor, etc. For a complete list of input streams, take a look at the [StreamingContext API documentation](api/streaming/index.html#org.apache.spark.streaming.StreamingContext).
-
-
+The core Spark Streaming API provides input streams for files, sockets, and Akka actors. Additional functionality for Kafka, Flume, ZeroMQ, Twitter, etc. can be imported by adding the right dependencies as explained in the [linking](#linking-with-spark-streaming) section.
# DStream Operations
-Data received from the input streams can be processed using _DStream operations_. There are two kinds of operations - _transformations_ and _output operations_. Similar to RDD transformations, DStream transformations operate on one or more DStreams to create new DStreams with transformed data. After applying a sequence of transformations to the input streams, you'll need to call the output operations, which writies data out to an external source.
+Data received from the input streams can be processed using _DStream operations_. There are two kinds of operations - _transformations_ and _output operations_. Similar to RDD transformations, DStream transformations operate on one or more DStreams to create new DStreams with transformed data. After applying a sequence of transformations to the input streams, output operations need to called, which write data out to an external data sink like a file system or a database.
## Transformations
@@ -234,7 +238,7 @@ wordCounts.print()
ssc.start()
{% endhighlight %}
-The `socketTextStream` returns a DStream of lines received from a TCP socket-based source. The `lines` DStream is _transformed_ into a DStream using the `flatMap` operation, where each line is split into words. This `words` DStream is then mapped to a DStream of `(word, 1)` pairs, which is finally reduced to get the word counts. `wordCounts.print()` will print 10 of the counts generated every second.
+The `socketTextStream` returns a DStream of text data received from a TCP server socket. The `lines` DStream is _transformed_ into a DStream using the `flatMap` operation, where each line is split into words. This `words` DStream is then mapped to a DStream of `(word, 1)` pairs, which is finally reduced to get the word counts. `wordCounts.print()` will print 10 of the counts generated every second.
To run this example on your local machine, you need to first run a Netcat server by using
@@ -270,14 +274,12 @@ hello world
{% highlight bash %}
# TERMINAL 2: RUNNING NetworkWordCount
...
-2012-12-31 18:47:10,446 INFO SparkContext: Job finished: run at ThreadPoolExecutor.java:886, took 0.038817 s
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
-2012-12-31 18:47:10,447 INFO JobManager: Total delay: 0.44700 s for job 8 (execution: 0.44000 s)
...
{% endhighlight %}
</td>
@@ -384,7 +386,7 @@ A system that is required to operate 24/7 needs to be able tolerate the failure
1. The configuration of each DStream (checkpoint interval, etc.)
1. The RDD checkpoint files of each DStream
-All this is periodically saved in the file `<checkpoint directory>/graph`. To recover, a new Streaming Context can be created with this directory by using
+All this is periodically saved in the checkpoint directory. To recover, a new `StreamingContext` can be created with this directory by using
{% highlight scala %}
val ssc = new StreamingContext(checkpointDirectory)
@@ -395,7 +397,7 @@ On calling `ssc.start()` on this new context, the following steps are taken by t
1. Schedule the transformations and output operations for all the time steps between the time when the driver failed and when it last checkpointed. This is also done for those time steps that were previously scheduled but not processed due to the failure. This will make the system recompute all the intermediate data from the checkpointed RDD files, etc.
1. Restart the network receivers, if any, and continue receiving new data.
-In the current _alpha_ release, there are two different failure behaviors based on which input sources are used.
+There are two different failure behaviors based on which input sources are used.
1. _Using HDFS files as input source_ - Since the data is reliably stored on HDFS, all data can re-computed and therefore no data will be lost due to any failure.
1. _Using any input source that receives data through a network_ - The received input data is replicated in memory to multiple nodes. Since, all the data in the Spark worker's memory is lost when the Spark driver fails, the past input data will not be accessible and driver recovers. Hence, if stateful and window-based operations are used (like `updateStateByKey`, `window`, `countByValueAndWindow`, etc.), then the intermediate state will not be recovered completely.
diff --git a/docs/tuning.md b/docs/tuning.md
index bbb870085c..6b010aed61 100644
--- a/docs/tuning.md
+++ b/docs/tuning.md
@@ -119,8 +119,7 @@ pointer-based data structures and wrapper objects. There are several ways to do
2. Avoid nested structures with a lot of small objects and pointers when possible.
3. Consider using numeric IDs or enumeration objects instead of strings for keys.
4. If you have less than 32 GB of RAM, set the JVM flag `-XX:+UseCompressedOops` to make pointers be
- four bytes instead of eight. Also, on Java 7 or later, try `-XX:+UseCompressedStrings` to store
- ASCII strings as just 8 bits per character. You can add these options in
+ four bytes instead of eight. You can add these options in
[`spark-env.sh`](configuration.html#environment-variables-in-spark-envsh).
## Serialized RDD Storage