aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/custom-streaming-receiver.md (renamed from docs/plugin-custom-receiver.md)0
-rw-r--r--docs/streaming-custom-receivers.md101
-rw-r--r--docs/streaming-programming-guide.md6
-rw-r--r--pom.xml6
-rw-r--r--project/SparkBuild.scala5
-rwxr-xr-xrun5
-rw-r--r--run2.cmd3
-rw-r--r--streaming/pom.xml11
8 files changed, 113 insertions, 24 deletions
diff --git a/docs/plugin-custom-receiver.md b/docs/custom-streaming-receiver.md
index 0eb4246158..0eb4246158 100644
--- a/docs/plugin-custom-receiver.md
+++ b/docs/custom-streaming-receiver.md
diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md
new file mode 100644
index 0000000000..0eb4246158
--- /dev/null
+++ b/docs/streaming-custom-receivers.md
@@ -0,0 +1,101 @@
+---
+layout: global
+title: Tutorial - Spark streaming, Plugging in a custom receiver.
+---
+
+A "Spark streaming" receiver can be a simple network stream, streams of messages from a message queue, files etc. A receiver can also assume roles more than just receiving data like filtering, preprocessing, to name a few of the possibilities. The api to plug-in any user defined custom receiver is thus provided to encourage development of receivers which may be well suited to ones specific need.
+
+This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application.
+
+
+## A quick and naive walk-through
+
+### Write a simple receiver
+
+This starts with implementing [Actor](#References)
+
+Following is a simple socket text-stream receiver, which is appearently overly simplified using Akka's socket.io api.
+
+{% highlight scala %}
+
+ class SocketTextStreamReceiver (host:String,
+ port:Int,
+ bytesToString: ByteString => String) extends Actor with Receiver {
+
+ override def preStart = IOManager(context.system).connect(host, port)
+
+ def receive = {
+ case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes))
+ }
+
+ }
+
+
+{% endhighlight %}
+
+All we did here is mixed in trait Receiver and called pushBlock api method to push our blocks of data. Please refer to scala-docs of Receiver for more details.
+
+### A sample spark application
+
+* First create a Spark streaming context with master url and batchduration.
+
+{% highlight scala %}
+
+ val ssc = new StreamingContext(master, "WordCountCustomStreamSource",
+ Seconds(batchDuration))
+
+{% endhighlight %}
+
+* Plug-in the actor configuration into the spark streaming context and create a DStream.
+
+{% highlight scala %}
+
+ val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
+ "localhost",8445, z => z.utf8String)),"SocketReceiver")
+
+{% endhighlight %}
+
+* Process it.
+
+{% highlight scala %}
+
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+
+ wordCounts.print()
+ ssc.start()
+
+
+{% endhighlight %}
+
+* After processing it, stream can be tested using the netcat utility.
+
+ $ nc -l localhost 8445
+ hello world
+ hello hello
+
+
+## Multiple homogeneous/heterogeneous receivers.
+
+A DStream union operation is provided for taking union on multiple input streams.
+
+{% highlight scala %}
+
+ val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
+ "localhost",8445, z => z.utf8String)),"SocketReceiver")
+
+ // Another socket stream receiver
+ val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
+ "localhost",8446, z => z.utf8String)),"SocketReceiver")
+
+ val union = lines.union(lines2)
+
+{% endhighlight %}
+
+Above stream can be easily process as described earlier.
+
+_A more comprehensive example is provided in the spark streaming examples_
+
+## References
+
+1.[Akka Actor documentation](http://doc.akka.io/docs/akka/2.0.5/scala/actors.html)
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index ded43e67cd..0e618a06c7 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -365,14 +365,14 @@ There are two failure behaviors based on which input sources are used.
Since all data is modeled as RDDs with their lineage of deterministic operations, any recomputation always leads to the same result. As a result, all DStream transformations are guaranteed to have _exactly-once_ semantics. That is, the final transformed result will be same even if there were was a worker node failure. However, output operations (like `foreach`) have _at-least once_ semantics, that is, the transformed data may get written to an external entity more than once in the event of a worker failure. While this is acceptable for saving to HDFS using the `saveAs*Files` operations (as the file will simply get over-written by the same data), additional transactions-like mechanisms may be necessary to achieve exactly-once semantics for output operations.
-## Failure of a Driver Node
-A system that is required to operate 24/7 needs to be able tolerate the failure of the drive node as well. Spark Streaming does this by saving the state of the DStream computation periodically to a HDFS file, that can be used to restart the streaming computation in the event of a failure of the driver node. To elaborate, the following state is periodically saved to a file.
+## Failure of the Driver Node
+A system that is required to operate 24/7 needs to be able tolerate the failure of the driver node as well. Spark Streaming does this by saving the state of the DStream computation periodically to a HDFS file, that can be used to restart the streaming computation in the event of a failure of the driver node. This checkpointing is enabled by setting a HDFS directory for checkpointing using `ssc.checkpoint(<checkpoint directory>)` as described [earlier](#rdd-checkpointing-within-dstreams). To elaborate, the following state is periodically saved to a file.
1. The DStream operator graph (input streams, output streams, etc.)
1. The configuration of each DStream (checkpoint interval, etc.)
1. The RDD checkpoint files of each DStream
-All this is periodically saved in the file `<checkpoint directory>/graph` where `<checkpoint directory>` is the HDFS path set using `ssc.checkpoint(...)` as described earlier. To recover, a new Streaming Context can be created with this directory by using
+All this is periodically saved in the file `<checkpoint directory>/graph`. To recover, a new Streaming Context can be created with this directory by using
{% highlight scala %}
val ssc = new StreamingContext(checkpointDirectory)
diff --git a/pom.xml b/pom.xml
index 7e06cae052..99eb17856a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,9 +84,9 @@
</snapshots>
</repository>
<repository>
- <id>typesafe-repo</id>
- <name>Typesafe Repository</name>
- <url>http://repo.typesafe.com/typesafe/releases/</url>
+ <id>akka-repo</id>
+ <name>Akka Repository</name>
+ <url>http://repo.akka.io/releases/</url>
<releases>
<enabled>true</enabled>
</releases>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 22bdc93602..b0b6e21681 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -114,7 +114,7 @@ object SparkBuild extends Build {
def coreSettings = sharedSettings ++ Seq(
name := "spark-core",
resolvers ++= Seq(
- "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
+ "Akka Repository" at "http://repo.akka.io/releases/",
"JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
"Spray Repository" at "http://repo.spray.cc/",
"Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/",
@@ -162,9 +162,6 @@ object SparkBuild extends Build {
def streamingSettings = sharedSettings ++ Seq(
name := "spark-streaming",
- resolvers ++= Seq(
- "Akka Repository" at "http://repo.akka.io/releases"
- ),
libraryDependencies ++= Seq(
"org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile",
"com.github.sgroschupf" % "zkclient" % "0.1",
diff --git a/run b/run
index 6b2d84d48d..ecbf7673c6 100755
--- a/run
+++ b/run
@@ -111,14 +111,13 @@ CLASSPATH+=":$FWDIR/conf"
CLASSPATH+=":$CORE_DIR/target/scala-$SCALA_VERSION/classes"
if [ -n "$SPARK_TESTING" ] ; then
CLASSPATH+=":$CORE_DIR/target/scala-$SCALA_VERSION/test-classes"
+ CLASSPATH+=":$STREAMING_DIR/target/scala-$SCALA_VERSION/test-classes"
fi
CLASSPATH+=":$CORE_DIR/src/main/resources"
CLASSPATH+=":$REPL_DIR/target/scala-$SCALA_VERSION/classes"
CLASSPATH+=":$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes"
CLASSPATH+=":$STREAMING_DIR/target/scala-$SCALA_VERSION/classes"
-for jar in `find "$STREAMING_DIR/lib" -name '*jar'`; do
- CLASSPATH+=":$jar"
-done
+CLASSPATH+=":$STREAMING_DIR/lib/org/apache/kafka/kafka/0.7.2-spark/*" # <-- our in-project Kafka Jar
if [ -e "$FWDIR/lib_managed" ]; then
CLASSPATH+=":$FWDIR/lib_managed/jars/*"
CLASSPATH+=":$FWDIR/lib_managed/bundles/*"
diff --git a/run2.cmd b/run2.cmd
index c913a5195e..705a4d1ff6 100644
--- a/run2.cmd
+++ b/run2.cmd
@@ -47,11 +47,14 @@ set CORE_DIR=%FWDIR%core
set REPL_DIR=%FWDIR%repl
set EXAMPLES_DIR=%FWDIR%examples
set BAGEL_DIR=%FWDIR%bagel
+set STREAMING_DIR=%FWDIR%streaming
set PYSPARK_DIR=%FWDIR%python
rem Build up classpath
set CLASSPATH=%SPARK_CLASSPATH%;%MESOS_CLASSPATH%;%FWDIR%conf;%CORE_DIR%\target\scala-%SCALA_VERSION%\classes
set CLASSPATH=%CLASSPATH%;%CORE_DIR%\target\scala-%SCALA_VERSION%\test-classes;%CORE_DIR%\src\main\resources
+set CLASSPATH=%CLASSPATH%;%STREAMING_DIR%\target\scala-%SCALA_VERSION%\classes;%STREAMING_DIR%\target\scala-%SCALA_VERSION%\test-classes
+set CLASSPATH=%CLASSPATH%;%STREAMING_DIR%\lib\org\apache\kafka\kafka\0.7.2-spark\*
set CLASSPATH=%CLASSPATH%;%REPL_DIR%\target\scala-%SCALA_VERSION%\classes;%EXAMPLES_DIR%\target\scala-%SCALA_VERSION%\classes
set CLASSPATH=%CLASSPATH%;%FWDIR%lib_managed\jars\*
set CLASSPATH=%CLASSPATH%;%FWDIR%lib_managed\bundles\*
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 92b17fc3af..15523eadcb 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -20,17 +20,6 @@
<id>lib</id>
<url>file://${project.basedir}/lib</url>
</repository>
- <repository>
- <id>akka-repo</id>
- <name>Akka Repository</name>
- <url>http://repo.akka.io/releases</url>
- <releases>
- <enabled>true</enabled>
- </releases>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </repository>
</repositories>
<dependencies>