aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-08-31 23:01:50 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-09-01 14:13:16 -0700
commit5b4dea21439e86b61447bdb1613b2ddff9ffba9f (patch)
treeb8aff502ccebb71e84c5eff3420436e0c9f3898e
parent5701eb92c7ac75176e0daebd3d551a07eea63cb5 (diff)
downloadspark-5b4dea21439e86b61447bdb1613b2ddff9ffba9f.tar.gz
spark-5b4dea21439e86b61447bdb1613b2ddff9ffba9f.tar.bz2
spark-5b4dea21439e86b61447bdb1613b2ddff9ffba9f.zip
More fixes
-rw-r--r--docs/bagel-programming-guide.md27
-rw-r--r--docs/streaming-custom-receivers.md40
-rw-r--r--docs/streaming-programming-guide.md12
-rw-r--r--docs/tuning.md13
4 files changed, 41 insertions, 51 deletions
diff --git a/docs/bagel-programming-guide.md b/docs/bagel-programming-guide.md
index f95627865a..583684913d 100644
--- a/docs/bagel-programming-guide.md
+++ b/docs/bagel-programming-guide.md
@@ -9,16 +9,15 @@ In the Pregel programming model, jobs run as a sequence of iterations called _su
This guide shows the programming model and features of Bagel by walking through an example implementation of PageRank on Bagel.
-## Linking with Bagel
+# Linking with Bagel
-To write a Bagel application, you will need to add Spark, its dependencies, and Bagel to your CLASSPATH:
+To use Bagel in your program, add the following SBT or Maven dependency:
-1. Run `sbt/sbt update` to fetch Spark's dependencies, if you haven't already done so.
-2. Run `sbt/sbt assembly` to build Spark and its dependencies into one JAR (`core/target/spark-core-assembly-{{site.SPARK_VERSION}}.jar`)
-3. Run `sbt/sbt package` build the Bagel JAR (`bagel/target/scala_{{site.SCALA_VERSION}}/spark-bagel_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}.jar`).
-4. Add these two JARs to your CLASSPATH.
+ groupId = org.apache.spark
+ artifactId = spark-bagel_{{site.SCALA_VERSION}}
+ version = {{site.SPARK_VERSION}}
-## Programming Model
+# Programming Model
Bagel operates on a graph represented as a [distributed dataset](scala-programming-guide.html) of (K, V) pairs, where keys are vertex IDs and values are vertices plus their associated state. In each superstep, Bagel runs a user-specified compute function on each vertex that takes as input the current vertex state and a list of messages sent to that vertex during the previous superstep, and returns the new vertex state and a list of outgoing messages.
@@ -89,7 +88,7 @@ Finally, we print the results.
println(result.map(v => "%s\t%s\n".format(v.id, v.rank)).collect.mkString)
{% endhighlight %}
-### Combiners
+## Combiners
Sending a message to another vertex generally involves expensive communication over the network. For certain algorithms, it's possible to reduce the amount of communication using _combiners_. For example, if the compute function receives integer messages and only uses their sum, it's possible for Bagel to combine multiple messages to the same vertex by summing them.
@@ -97,7 +96,7 @@ For combiner support, Bagel can optionally take a set of combiner functions that
_Example: PageRank with combiners_
-### Aggregators
+## Aggregators
Aggregators perform a reduce across all vertices after each superstep, and provide the result to each vertex in the next superstep.
@@ -105,11 +104,11 @@ For aggregator support, Bagel can optionally take an aggregator function that re
_Example_
-### Operations
+## Operations
Here are the actions and types in the Bagel API. See [Bagel.scala](https://github.com/apache/incubator-spark/blob/master/bagel/src/main/scala/spark/bagel/Bagel.scala) for details.
-#### Actions
+### Actions
{% highlight scala %}
/*** Full form ***/
@@ -133,7 +132,7 @@ Bagel.run(sc, vertices, messages, numSplits)(compute)
// and returns (newVertex: V, outMessages: Array[M])
{% endhighlight %}
-#### Types
+### Types
{% highlight scala %}
trait Combiner[M, C] {
@@ -156,10 +155,10 @@ trait Message[K] {
}
{% endhighlight %}
-## Where to Go from Here
+# Where to Go from Here
Two example jobs, PageRank and shortest path, are included in `examples/src/main/scala/org/apache/spark/examples/bagel`. You can run them by passing the class name to the `run-example` script included in Spark; e.g.:
- ./run-example org.apache.spark.examples.bagel.WikipediaPageRank`
+ ./run-example org.apache.spark.examples.bagel.WikipediaPageRank
Each example program prints usage help when run without any arguments.
diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md
index 981cdfe0b2..4e27d6559c 100644
--- a/docs/streaming-custom-receivers.md
+++ b/docs/streaming-custom-receivers.md
@@ -1,24 +1,22 @@
---
layout: global
-title: Tutorial - Spark Streaming, Plugging in a custom receiver.
+title: Spark Streaming Custom Receivers
---
A "Spark Streaming" receiver can be a simple network stream, streams of messages from a message queue, files etc. A receiver can also assume roles more than just receiving data like filtering, preprocessing, to name a few of the possibilities. The api to plug-in any user defined custom receiver is thus provided to encourage development of receivers which may be well suited to ones specific need.
This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application.
-### Write a simple receiver
+### Writing a Simple Receiver
-This starts with implementing [NetworkReceiver](#References)
+This starts with implementing [NetworkReceiver](api/streaming/index.html#org.apache.spark.streaming.dstream.NetworkReceiver).
-Following is a simple socket text-stream receiver.
+The following is a simple socket text-stream receiver.
{% highlight scala %}
-
- class SocketTextStreamReceiver(host: String,
- port: Int
- ) extends NetworkReceiver[String] {
-
+ class SocketTextStreamReceiver(host: String, port: Int(
+ extends NetworkReceiver[String]
+ {
protected lazy val blocksGenerator: BlockGenerator =
new BlockGenerator(StorageLevel.MEMORY_ONLY_SER_2)
@@ -36,23 +34,20 @@ Following is a simple socket text-stream receiver.
protected def onStop() {
blocksGenerator.stop()
}
-
}
-
{% endhighlight %}
All we did here is extended NetworkReceiver and called blockGenerator's API method (i.e. +=) to push our blocks of data. Please refer to scala-docs of NetworkReceiver for more details.
-### An Actor as Receiver.
+### An Actor as Receiver
This starts with implementing [Actor](#References)
Following is a simple socket text-stream receiver, which is appearently overly simplified using Akka's socket.io api.
{% highlight scala %}
-
class SocketTextStreamReceiver (host:String,
port:Int,
bytesToString: ByteString => String) extends Actor with Receiver {
@@ -64,52 +59,41 @@ Following is a simple socket text-stream receiver, which is appearently overly s
}
}
-
-
{% endhighlight %}
All we did here is mixed in trait Receiver and called pushBlock api method to push our blocks of data. Please refer to scala-docs of Receiver for more details.
-### A sample spark application
+### A Sample Spark Application
* First create a Spark streaming context with master url and batchduration.
{% highlight scala %}
-
val ssc = new StreamingContext(master, "WordCountCustomStreamSource",
Seconds(batchDuration))
-
{% endhighlight %}
* Plug-in the custom receiver into the spark streaming context and create a DStream.
{% highlight scala %}
-
val lines = ssc.networkStream[String](new SocketTextStreamReceiver(
"localhost", 8445))
-
{% endhighlight %}
* OR Plug-in the actor as receiver into the spark streaming context and create a DStream.
{% highlight scala %}
-
val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8445, z => z.utf8String)),"SocketReceiver")
-
{% endhighlight %}
* Process it.
{% highlight scala %}
-
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
-
-
{% endhighlight %}
* After processing it, stream can be tested using the netcat utility.
@@ -119,12 +103,11 @@ All we did here is mixed in trait Receiver and called pushBlock api method to pu
hello hello
-## Multiple homogeneous/heterogeneous receivers.
+## Multiple Homogeneous/Heterogeneous Receivers.
A DStream union operation is provided for taking union on multiple input streams.
{% highlight scala %}
-
val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8445, z => z.utf8String)),"SocketReceiver")
@@ -133,7 +116,6 @@ A DStream union operation is provided for taking union on multiple input streams
"localhost",8446, z => z.utf8String)),"SocketReceiver")
val union = lines.union(lines2)
-
{% endhighlight %}
Above stream can be easily process as described earlier.
@@ -143,4 +125,4 @@ _A more comprehensive example is provided in the spark streaming examples_
## References
1.[Akka Actor documentation](http://doc.akka.io/docs/akka/2.0.5/scala/actors.html)
-2.[NetworkReceiver](http://spark.incubator.apache.org/docs/latest/api/streaming/index.html#spark.streaming.dstream.NetworkReceiver)
+2.[NetworkReceiver](api/streaming/index.html#org.apache.spark.streaming.dstream.NetworkReceiver)
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index bc2f4f884f..c7df172024 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -13,6 +13,14 @@ A Spark Streaming application is very similar to a Spark application; it consist
This guide shows some how to start programming with DStreams.
+# Linking with Spark Streaming
+
+Add the following SBT or Maven dependency to your project to use Spark Streaming:
+
+ groupId = org.apache.spark
+ artifactId = spark-streaming_{{site.SCALA_VERSION}}
+ version = {{site.SPARK_VERSION}}
+
# Initializing Spark Streaming
The first thing a Spark Streaming program must do is create a `StreamingContext` object, which tells Spark how to access a cluster. A `StreamingContext` can be created by using
@@ -301,8 +309,8 @@ dstream.checkpoint(checkpointInterval) // checkpointInterval must be a multiple
For DStreams that must be checkpointed (that is, DStreams created by `updateStateByKey` and `reduceByKeyAndWindow` with inverse function), the checkpoint interval of the DStream is by default set to a multiple of the DStream's sliding interval such that its at least 10 seconds.
-## Customizing Receiver
-Spark comes with a built in support for most common usage scenarios where input stream source can be either a network socket stream to support for a few message queues. Apart from that it is also possible to supply your own custom receiver via a convenient API. Find more details at [Custom Receiver Guide](streaming-custom-receivers.html)
+## Custom Receivers
+Spark comes with a built in support for most common usage scenarios where input stream source can be either a network socket stream to support for a few message queues. Apart from that it is also possible to supply your own custom receiver via a convenient API. Find more details at [Custom Receiver Guide](streaming-custom-receivers.html).
# Performance Tuning
Getting the best performance of a Spark Streaming application on a cluster requires a bit of tuning. This section explains a number of the parameters and configurations that can tuned to improve the performance of you application. At a high level, you need to consider two things:
diff --git a/docs/tuning.md b/docs/tuning.md
index 5ffca54481..3563d110c9 100644
--- a/docs/tuning.md
+++ b/docs/tuning.md
@@ -32,24 +32,25 @@ in your operations) and performance. It provides two serialization libraries:
[`java.io.Externalizable`](http://docs.oracle.com/javase/6/docs/api/java/io/Externalizable.html).
Java serialization is flexible but often quite slow, and leads to large
serialized formats for many classes.
-* [Kryo serialization](http://code.google.com/p/kryo/wiki/V1Documentation): Spark can also use
+* [Kryo serialization](http://code.google.com/p/kryo/): Spark can also use
the Kryo library (version 2) to serialize objects more quickly. Kryo is significantly
faster and more compact than Java serialization (often as much as 10x), but does not support all
`Serializable` types and requires you to *register* the classes you'll use in the program in advance
for best performance.
-You can switch to using Kryo by calling `System.setProperty("spark.serializer", "spark.KryoSerializer")`
+You can switch to using Kryo by calling `System.setProperty("spark.serializer", "org.apache.spark.KryoSerializer")`
*before* creating your SparkContext. The only reason it is not the default is because of the custom
registration requirement, but we recommend trying it in any network-intensive application.
Finally, to register your classes with Kryo, create a public class that extends
-[`spark.KryoRegistrator`](api/core/index.html#spark.KryoRegistrator) and set the
+[`org.apache.spark.KryoRegistrator`](api/core/index.html#org.apache.spark.KryoRegistrator) and set the
`spark.kryo.registrator` system property to point to it, as follows:
{% highlight scala %}
import com.esotericsoftware.kryo.Kryo
+import org.apache.spark.KryoRegistrator
-class MyRegistrator extends spark.KryoRegistrator {
+class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[MyClass1])
kryo.register(classOf[MyClass2])
@@ -57,7 +58,7 @@ class MyRegistrator extends spark.KryoRegistrator {
}
// Make sure to set these properties *before* creating a SparkContext!
-System.setProperty("spark.serializer", "spark.KryoSerializer")
+System.setProperty("spark.serializer", "org.apache.spark.KryoSerializer")
System.setProperty("spark.kryo.registrator", "mypackage.MyRegistrator")
val sc = new SparkContext(...)
{% endhighlight %}
@@ -216,7 +217,7 @@ enough. Spark automatically sets the number of "map" tasks to run on each file a
(though you can control it through optional parameters to `SparkContext.textFile`, etc), and for
distributed "reduce" operations, such as `groupByKey` and `reduceByKey`, it uses the largest
parent RDD's number of partitions. You can pass the level of parallelism as a second argument
-(see the [`spark.PairRDDFunctions`](api/core/index.html#spark.PairRDDFunctions) documentation),
+(see the [`spark.PairRDDFunctions`](api/core/index.html#org.apache.spark.PairRDDFunctions) documentation),
or set the system property `spark.default.parallelism` to change the default.
In general, we recommend 2-3 tasks per CPU core in your cluster.