aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2015-04-27 23:48:02 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-04-27 23:48:02 -0700
commit9e4e82b7bca1129bcd5e0274b9ae1b1be3fb93da (patch)
tree5df0f823975fd6f0ef7132a7346ca993bc30d63b /examples
parent29576e786072bd4218e10036ddfc8d367b1c1446 (diff)
downloadspark-9e4e82b7bca1129bcd5e0274b9ae1b1be3fb93da.tar.gz
spark-9e4e82b7bca1129bcd5e0274b9ae1b1be3fb93da.tar.bz2
spark-9e4e82b7bca1129bcd5e0274b9ae1b1be3fb93da.zip
[SPARK-5946] [STREAMING] Add Python API for direct Kafka stream
Currently only added `createDirectStream` API, I'm not sure if `createRDD` is also needed, since some Java object needs to be wrapped in Python. Please help to review, thanks a lot. Author: jerryshao <saisai.shao@intel.com> Author: Saisai Shao <saisai.shao@intel.com> Closes #4723 from jerryshao/direct-kafka-python-api and squashes the following commits: a1fe97c [jerryshao] Fix rebase issue eebf333 [jerryshao] Address the comments da40f4e [jerryshao] Fix Python 2.6 Syntax error issue 5c0ee85 [jerryshao] Style fix 4aeac18 [jerryshao] Fix bug in example code 7146d86 [jerryshao] Add unit test bf3bdd6 [jerryshao] Add more APIs and address the comments f5b3801 [jerryshao] Small style fix 8641835 [Saisai Shao] Rebase and update the code 589c05b [Saisai Shao] Fix the style d6fcb6a [Saisai Shao] Address the comments dfda902 [Saisai Shao] Style fix 0f7d168 [Saisai Shao] Add the doc and fix some style issues 67e6880 [Saisai Shao] Fix test bug 917b0db [Saisai Shao] Add Python createRDD API for Kakfa direct stream c3fc11d [jerryshao] Modify the docs 2c00936 [Saisai Shao] address the comments 3360f44 [jerryshao] Fix code style e0e0f0d [jerryshao] Code clean and bug fix 338c41f [Saisai Shao] Add python API and example for direct kafka stream
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/python/streaming/direct_kafka_wordcount.py55
1 files changed, 55 insertions, 0 deletions
diff --git a/examples/src/main/python/streaming/direct_kafka_wordcount.py b/examples/src/main/python/streaming/direct_kafka_wordcount.py
new file mode 100644
index 0000000000..6ef188a220
--- /dev/null
+++ b/examples/src/main/python/streaming/direct_kafka_wordcount.py
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+ Counts words in UTF8 encoded, '\n' delimited text directly received from Kafka in every 2 seconds.
+ Usage: direct_kafka_wordcount.py <broker_list> <topic>
+
+ To run this on your local machine, you need to setup Kafka and create a producer first, see
+ http://kafka.apache.org/documentation.html#quickstart
+
+ and then run the example
+ `$ bin/spark-submit --jars external/kafka-assembly/target/scala-*/\
+ spark-streaming-kafka-assembly-*.jar \
+ examples/src/main/python/streaming/direct_kafka_wordcount.py \
+ localhost:9092 test`
+"""
+
+import sys
+
+from pyspark import SparkContext
+from pyspark.streaming import StreamingContext
+from pyspark.streaming.kafka import KafkaUtils
+
+if __name__ == "__main__":
+ if len(sys.argv) != 3:
+ print >> sys.stderr, "Usage: direct_kafka_wordcount.py <broker_list> <topic>"
+ exit(-1)
+
+ sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
+ ssc = StreamingContext(sc, 2)
+
+ brokers, topic = sys.argv[1:]
+ kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
+ lines = kvs.map(lambda x: x[1])
+ counts = lines.flatMap(lambda line: line.split(" ")) \
+ .map(lambda word: (word, 1)) \
+ .reduceByKey(lambda a, b: a+b)
+ counts.pprint()
+
+ ssc.start()
+ ssc.awaitTermination()