diff options
author | jerryshao <saisai.shao@intel.com> | 2015-04-27 23:48:02 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-04-27 23:48:02 -0700 |
commit | 9e4e82b7bca1129bcd5e0274b9ae1b1be3fb93da (patch) | |
tree | 5df0f823975fd6f0ef7132a7346ca993bc30d63b /examples | |
parent | 29576e786072bd4218e10036ddfc8d367b1c1446 (diff) | |
download | spark-9e4e82b7bca1129bcd5e0274b9ae1b1be3fb93da.tar.gz spark-9e4e82b7bca1129bcd5e0274b9ae1b1be3fb93da.tar.bz2 spark-9e4e82b7bca1129bcd5e0274b9ae1b1be3fb93da.zip |
[SPARK-5946] [STREAMING] Add Python API for direct Kafka stream
Currently only added `createDirectStream` API, I'm not sure if `createRDD` is also needed, since some Java object needs to be wrapped in Python. Please help to review, thanks a lot.
Author: jerryshao <saisai.shao@intel.com>
Author: Saisai Shao <saisai.shao@intel.com>
Closes #4723 from jerryshao/direct-kafka-python-api and squashes the following commits:
a1fe97c [jerryshao] Fix rebase issue
eebf333 [jerryshao] Address the comments
da40f4e [jerryshao] Fix Python 2.6 Syntax error issue
5c0ee85 [jerryshao] Style fix
4aeac18 [jerryshao] Fix bug in example code
7146d86 [jerryshao] Add unit test
bf3bdd6 [jerryshao] Add more APIs and address the comments
f5b3801 [jerryshao] Small style fix
8641835 [Saisai Shao] Rebase and update the code
589c05b [Saisai Shao] Fix the style
d6fcb6a [Saisai Shao] Address the comments
dfda902 [Saisai Shao] Style fix
0f7d168 [Saisai Shao] Add the doc and fix some style issues
67e6880 [Saisai Shao] Fix test bug
917b0db [Saisai Shao] Add Python createRDD API for Kakfa direct stream
c3fc11d [jerryshao] Modify the docs
2c00936 [Saisai Shao] address the comments
3360f44 [jerryshao] Fix code style
e0e0f0d [jerryshao] Code clean and bug fix
338c41f [Saisai Shao] Add python API and example for direct kafka stream
Diffstat (limited to 'examples')
-rw-r--r-- | examples/src/main/python/streaming/direct_kafka_wordcount.py | 55 |
1 files changed, 55 insertions, 0 deletions
diff --git a/examples/src/main/python/streaming/direct_kafka_wordcount.py b/examples/src/main/python/streaming/direct_kafka_wordcount.py new file mode 100644 index 0000000000..6ef188a220 --- /dev/null +++ b/examples/src/main/python/streaming/direct_kafka_wordcount.py @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" + Counts words in UTF8 encoded, '\n' delimited text directly received from Kafka in every 2 seconds. + Usage: direct_kafka_wordcount.py <broker_list> <topic> + + To run this on your local machine, you need to setup Kafka and create a producer first, see + http://kafka.apache.org/documentation.html#quickstart + + and then run the example + `$ bin/spark-submit --jars external/kafka-assembly/target/scala-*/\ + spark-streaming-kafka-assembly-*.jar \ + examples/src/main/python/streaming/direct_kafka_wordcount.py \ + localhost:9092 test` +""" + +import sys + +from pyspark import SparkContext +from pyspark.streaming import StreamingContext +from pyspark.streaming.kafka import KafkaUtils + +if __name__ == "__main__": + if len(sys.argv) != 3: + print >> sys.stderr, "Usage: direct_kafka_wordcount.py <broker_list> <topic>" + exit(-1) + + sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount") + ssc = StreamingContext(sc, 2) + + brokers, topic = sys.argv[1:] + kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) + lines = kvs.map(lambda x: x[1]) + counts = lines.flatMap(lambda line: line.split(" ")) \ + .map(lambda word: (word, 1)) \ + .reduceByKey(lambda a, b: a+b) + counts.pprint() + + ssc.start() + ssc.awaitTermination() |