aboutsummaryrefslogtreecommitdiff
path: root/docs/streaming-programming-guide.md
diff options
context:
space:
mode:
authoradesharatushar <tushar_adeshara@persistent.com>2016-12-29 22:03:34 +0000
committerSean Owen <sowen@cloudera.com>2016-12-29 22:03:34 +0000
commitdba81e1dcdea1e8bd196c88d4810f9a04312acbf (patch)
tree5fbcb789a4ce92c420c330f752fd2367b2b80bef /docs/streaming-programming-guide.md
parent87bc4112c5d766839aaa3876e19dae3a67108265 (diff)
downloadspark-dba81e1dcdea1e8bd196c88d4810f9a04312acbf.tar.gz
spark-dba81e1dcdea1e8bd196c88d4810f9a04312acbf.tar.bz2
spark-dba81e1dcdea1e8bd196c88d4810f9a04312acbf.zip
[SPARK-19003][DOCS] Add Java example in Spark Streaming Guide, section Design Patterns for using foreachRDD
## What changes were proposed in this pull request? Added missing Java example under section "Design Patterns for using foreachRDD". Now this section has examples in all 3 languages, improving consistency of documentation. ## How was this patch tested? Manual. Generated docs using command "SKIP_API=1 jekyll build" and verified generated HTML page manually. The syntax of example has been tested for correctness using sample code on Java1.7 and Spark 2.2.0-SNAPSHOT. Author: adesharatushar <tushar_adeshara@persistent.com> Closes #16408 from adesharatushar/streaming-doc-fix.
Diffstat (limited to 'docs/streaming-programming-guide.md')
-rw-r--r--docs/streaming-programming-guide.md72
1 files changed, 72 insertions, 0 deletions
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 1fcd198685..38b4f78177 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -1246,6 +1246,22 @@ dstream.foreachRDD { rdd =>
}
{% endhighlight %}
</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
+ @Override
+ public void call(JavaRDD<String> rdd) {
+ final Connection connection = createNewConnection(); // executed at the driver
+ rdd.foreach(new VoidFunction<String>() {
+ @Override
+ public void call(String record) {
+ connection.send(record); // executed at the worker
+ }
+ });
+ }
+});
+{% endhighlight %}
+</div>
<div data-lang="python" markdown="1">
{% highlight python %}
def sendRecord(rdd):
@@ -1279,6 +1295,23 @@ dstream.foreachRDD { rdd =>
}
{% endhighlight %}
</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
+ @Override
+ public void call(JavaRDD<String> rdd) {
+ rdd.foreach(new VoidFunction<String>() {
+ @Override
+ public void call(String record) {
+ Connection connection = createNewConnection();
+ connection.send(record);
+ connection.close();
+ }
+ });
+ }
+});
+{% endhighlight %}
+</div>
<div data-lang="python" markdown="1">
{% highlight python %}
def sendRecord(record):
@@ -1309,6 +1342,25 @@ dstream.foreachRDD { rdd =>
}
{% endhighlight %}
</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
+ @Override
+ public void call(JavaRDD<String> rdd) {
+ rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
+ @Override
+ public void call(Iterator<String> partitionOfRecords) {
+ Connection connection = createNewConnection();
+ while (partitionOfRecords.hasNext()) {
+ connection.send(partitionOfRecords.next());
+ }
+ connection.close();
+ }
+ });
+ }
+});
+{% endhighlight %}
+</div>
<div data-lang="python" markdown="1">
{% highlight python %}
def sendPartition(iter):
@@ -1342,6 +1394,26 @@ dstream.foreachRDD { rdd =>
{% endhighlight %}
</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
+ @Override
+ public void call(JavaRDD<String> rdd) {
+ rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
+ @Override
+ public void call(Iterator<String> partitionOfRecords) {
+ // ConnectionPool is a static, lazily initialized pool of connections
+ Connection connection = ConnectionPool.getConnection();
+ while (partitionOfRecords.hasNext()) {
+ connection.send(partitionOfRecords.next());
+ }
+ ConnectionPool.returnConnection(connection); // return to the pool for future reuse
+ }
+ });
+ }
+});
+{% endhighlight %}
+</div>
<div data-lang="python" markdown="1">
{% highlight python %}
def sendPartition(iter):