aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2015-04-27 23:48:02 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-04-27 23:48:02 -0700
commit9e4e82b7bca1129bcd5e0274b9ae1b1be3fb93da (patch)
tree5df0f823975fd6f0ef7132a7346ca993bc30d63b /python/pyspark
parent29576e786072bd4218e10036ddfc8d367b1c1446 (diff)
downloadspark-9e4e82b7bca1129bcd5e0274b9ae1b1be3fb93da.tar.gz
spark-9e4e82b7bca1129bcd5e0274b9ae1b1be3fb93da.tar.bz2
spark-9e4e82b7bca1129bcd5e0274b9ae1b1be3fb93da.zip
[SPARK-5946] [STREAMING] Add Python API for direct Kafka stream
Currently only added `createDirectStream` API, I'm not sure if `createRDD` is also needed, since some Java object needs to be wrapped in Python. Please help to review, thanks a lot. Author: jerryshao <saisai.shao@intel.com> Author: Saisai Shao <saisai.shao@intel.com> Closes #4723 from jerryshao/direct-kafka-python-api and squashes the following commits: a1fe97c [jerryshao] Fix rebase issue eebf333 [jerryshao] Address the comments da40f4e [jerryshao] Fix Python 2.6 Syntax error issue 5c0ee85 [jerryshao] Style fix 4aeac18 [jerryshao] Fix bug in example code 7146d86 [jerryshao] Add unit test bf3bdd6 [jerryshao] Add more APIs and address the comments f5b3801 [jerryshao] Small style fix 8641835 [Saisai Shao] Rebase and update the code 589c05b [Saisai Shao] Fix the style d6fcb6a [Saisai Shao] Address the comments dfda902 [Saisai Shao] Style fix 0f7d168 [Saisai Shao] Add the doc and fix some style issues 67e6880 [Saisai Shao] Fix test bug 917b0db [Saisai Shao] Add Python createRDD API for Kakfa direct stream c3fc11d [jerryshao] Modify the docs 2c00936 [Saisai Shao] address the comments 3360f44 [jerryshao] Fix code style e0e0f0d [jerryshao] Code clean and bug fix 338c41f [Saisai Shao] Add python API and example for direct kafka stream
Diffstat (limited to 'python/pyspark')
-rw-r--r--python/pyspark/streaming/kafka.py167
-rw-r--r--python/pyspark/streaming/tests.py84
2 files changed, 237 insertions, 14 deletions
diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py
index 8d610d6569..e278b29003 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -17,11 +17,12 @@
from py4j.java_gateway import Py4JJavaError
+from pyspark.rdd import RDD
from pyspark.storagelevel import StorageLevel
from pyspark.serializers import PairDeserializer, NoOpSerializer
from pyspark.streaming import DStream
-__all__ = ['KafkaUtils', 'utf8_decoder']
+__all__ = ['Broker', 'KafkaUtils', 'OffsetRange', 'TopicAndPartition', 'utf8_decoder']
def utf8_decoder(s):
@@ -67,7 +68,104 @@ class KafkaUtils(object):
except Py4JJavaError as e:
# TODO: use --jar once it also work on driver
if 'ClassNotFoundException' in str(e.java_exception):
- print("""
+ KafkaUtils._printErrorMsg(ssc.sparkContext)
+ raise e
+ ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
+ stream = DStream(jstream, ssc, ser)
+ return stream.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1])))
+
+ @staticmethod
+ def createDirectStream(ssc, topics, kafkaParams, fromOffsets={},
+ keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
+ """
+ .. note:: Experimental
+
+ Create an input stream that directly pulls messages from a Kafka Broker and specific offset.
+
+ This is not a receiver based Kafka input stream, it directly pulls the message from Kafka
+ in each batch duration and processed without storing.
+
+ This does not use Zookeeper to store offsets. The consumed offsets are tracked
+ by the stream itself. For interoperability with Kafka monitoring tools that depend on
+ Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
+ You can access the offsets used in each batch from the generated RDDs (see
+
+ To recover from driver failures, you have to enable checkpointing in the StreamingContext.
+ The information on consumed offset can be recovered from the checkpoint.
+ See the programming guide for details (constraints, etc.).
+
+ :param ssc: StreamingContext object.
+ :param topics: list of topic_name to consume.
+ :param kafkaParams: Additional params for Kafka.
+ :param fromOffsets: Per-topic/partition Kafka offsets defining the (inclusive) starting
+ point of the stream.
+ :param keyDecoder: A function used to decode key (default is utf8_decoder).
+ :param valueDecoder: A function used to decode value (default is utf8_decoder).
+ :return: A DStream object
+ """
+ if not isinstance(topics, list):
+ raise TypeError("topics should be list")
+ if not isinstance(kafkaParams, dict):
+ raise TypeError("kafkaParams should be dict")
+
+ try:
+ helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
+ .loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
+ helper = helperClass.newInstance()
+
+ jfromOffsets = dict([(k._jTopicAndPartition(helper),
+ v) for (k, v) in fromOffsets.items()])
+ jstream = helper.createDirectStream(ssc._jssc, kafkaParams, set(topics), jfromOffsets)
+ except Py4JJavaError as e:
+ if 'ClassNotFoundException' in str(e.java_exception):
+ KafkaUtils._printErrorMsg(ssc.sparkContext)
+ raise e
+
+ ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
+ stream = DStream(jstream, ssc, ser)
+ return stream.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1])))
+
+ @staticmethod
+ def createRDD(sc, kafkaParams, offsetRanges, leaders={},
+ keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
+ """
+ .. note:: Experimental
+
+ Create a RDD from Kafka using offset ranges for each topic and partition.
+ :param sc: SparkContext object
+ :param kafkaParams: Additional params for Kafka
+ :param offsetRanges: list of offsetRange to specify topic:partition:[start, end) to consume
+ :param leaders: Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty
+ map, in which case leaders will be looked up on the driver.
+ :param keyDecoder: A function used to decode key (default is utf8_decoder)
+ :param valueDecoder: A function used to decode value (default is utf8_decoder)
+ :return: A RDD object
+ """
+ if not isinstance(kafkaParams, dict):
+ raise TypeError("kafkaParams should be dict")
+ if not isinstance(offsetRanges, list):
+ raise TypeError("offsetRanges should be list")
+
+ try:
+ helperClass = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
+ .loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
+ helper = helperClass.newInstance()
+ joffsetRanges = [o._jOffsetRange(helper) for o in offsetRanges]
+ jleaders = dict([(k._jTopicAndPartition(helper),
+ v._jBroker(helper)) for (k, v) in leaders.items()])
+ jrdd = helper.createRDD(sc._jsc, kafkaParams, joffsetRanges, jleaders)
+ except Py4JJavaError as e:
+ if 'ClassNotFoundException' in str(e.java_exception):
+ KafkaUtils._printErrorMsg(sc)
+ raise e
+
+ ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
+ rdd = RDD(jrdd, sc, ser)
+ return rdd.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1])))
+
+ @staticmethod
+ def _printErrorMsg(sc):
+ print("""
________________________________________________________________________________________________
Spark Streaming's Kafka libraries not found in class path. Try one of the following.
@@ -85,8 +183,63 @@ ________________________________________________________________________________
________________________________________________________________________________________________
-""" % (ssc.sparkContext.version, ssc.sparkContext.version))
- raise e
- ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
- stream = DStream(jstream, ssc, ser)
- return stream.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1])))
+""" % (sc.version, sc.version))
+
+
+class OffsetRange(object):
+ """
+ Represents a range of offsets from a single Kafka TopicAndPartition.
+ """
+
+ def __init__(self, topic, partition, fromOffset, untilOffset):
+ """
+ Create a OffsetRange to represent range of offsets
+ :param topic: Kafka topic name.
+ :param partition: Kafka partition id.
+ :param fromOffset: Inclusive starting offset.
+ :param untilOffset: Exclusive ending offset.
+ """
+ self._topic = topic
+ self._partition = partition
+ self._fromOffset = fromOffset
+ self._untilOffset = untilOffset
+
+ def _jOffsetRange(self, helper):
+ return helper.createOffsetRange(self._topic, self._partition, self._fromOffset,
+ self._untilOffset)
+
+
+class TopicAndPartition(object):
+ """
+ Represents a specific top and partition for Kafka.
+ """
+
+ def __init__(self, topic, partition):
+ """
+ Create a Python TopicAndPartition to map to the Java related object
+ :param topic: Kafka topic name.
+ :param partition: Kafka partition id.
+ """
+ self._topic = topic
+ self._partition = partition
+
+ def _jTopicAndPartition(self, helper):
+ return helper.createTopicAndPartition(self._topic, self._partition)
+
+
+class Broker(object):
+ """
+ Represent the host and port info for a Kafka broker.
+ """
+
+ def __init__(self, host, port):
+ """
+ Create a Python Broker to map to the Java related object.
+ :param host: Broker's hostname.
+ :param port: Broker's port.
+ """
+ self._host = host
+ self._port = port
+
+ def _jBroker(self, helper):
+ return helper.createBroker(self._host, self._port)
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 5fa1e5ef08..7c06c20345 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -21,6 +21,7 @@ from itertools import chain
import time
import operator
import tempfile
+import random
import struct
from functools import reduce
@@ -35,7 +36,7 @@ else:
from pyspark.context import SparkConf, SparkContext, RDD
from pyspark.streaming.context import StreamingContext
-from pyspark.streaming.kafka import KafkaUtils
+from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPartition
class PySparkStreamingTestCase(unittest.TestCase):
@@ -590,9 +591,27 @@ class KafkaStreamTests(PySparkStreamingTestCase):
super(KafkaStreamTests, self).tearDown()
+ def _randomTopic(self):
+ return "topic-%d" % random.randint(0, 10000)
+
+ def _validateStreamResult(self, sendData, stream):
+ result = {}
+ for i in chain.from_iterable(self._collect(stream.map(lambda x: x[1]),
+ sum(sendData.values()))):
+ result[i] = result.get(i, 0) + 1
+
+ self.assertEqual(sendData, result)
+
+ def _validateRddResult(self, sendData, rdd):
+ result = {}
+ for i in rdd.map(lambda x: x[1]).collect():
+ result[i] = result.get(i, 0) + 1
+
+ self.assertEqual(sendData, result)
+
def test_kafka_stream(self):
"""Test the Python Kafka stream API."""
- topic = "topic1"
+ topic = self._randomTopic()
sendData = {"a": 3, "b": 5, "c": 10}
self._kafkaTestUtils.createTopic(topic)
@@ -601,13 +620,64 @@ class KafkaStreamTests(PySparkStreamingTestCase):
stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
"test-streaming-consumer", {topic: 1},
{"auto.offset.reset": "smallest"})
+ self._validateStreamResult(sendData, stream)
- result = {}
- for i in chain.from_iterable(self._collect(stream.map(lambda x: x[1]),
- sum(sendData.values()))):
- result[i] = result.get(i, 0) + 1
+ def test_kafka_direct_stream(self):
+ """Test the Python direct Kafka stream API."""
+ topic = self._randomTopic()
+ sendData = {"a": 1, "b": 2, "c": 3}
+ kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(),
+ "auto.offset.reset": "smallest"}
- self.assertEqual(sendData, result)
+ self._kafkaTestUtils.createTopic(topic)
+ self._kafkaTestUtils.sendMessages(topic, sendData)
+
+ stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams)
+ self._validateStreamResult(sendData, stream)
+
+ @unittest.skipIf(sys.version >= "3", "long type not support")
+ def test_kafka_direct_stream_from_offset(self):
+ """Test the Python direct Kafka stream API with start offset specified."""
+ topic = self._randomTopic()
+ sendData = {"a": 1, "b": 2, "c": 3}
+ fromOffsets = {TopicAndPartition(topic, 0): long(0)}
+ kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()}
+
+ self._kafkaTestUtils.createTopic(topic)
+ self._kafkaTestUtils.sendMessages(topic, sendData)
+
+ stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, fromOffsets)
+ self._validateStreamResult(sendData, stream)
+
+ @unittest.skipIf(sys.version >= "3", "long type not support")
+ def test_kafka_rdd(self):
+ """Test the Python direct Kafka RDD API."""
+ topic = self._randomTopic()
+ sendData = {"a": 1, "b": 2}
+ offsetRanges = [OffsetRange(topic, 0, long(0), long(sum(sendData.values())))]
+ kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()}
+
+ self._kafkaTestUtils.createTopic(topic)
+ self._kafkaTestUtils.sendMessages(topic, sendData)
+
+ rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges)
+ self._validateRddResult(sendData, rdd)
+
+ @unittest.skipIf(sys.version >= "3", "long type not support")
+ def test_kafka_rdd_with_leaders(self):
+ """Test the Python direct Kafka RDD API with leaders."""
+ topic = self._randomTopic()
+ sendData = {"a": 1, "b": 2, "c": 3}
+ offsetRanges = [OffsetRange(topic, 0, long(0), long(sum(sendData.values())))]
+ kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()}
+ address = self._kafkaTestUtils.brokerAddress().split(":")
+ leaders = {TopicAndPartition(topic, 0): Broker(address[0], int(address[1]))}
+
+ self._kafkaTestUtils.createTopic(topic)
+ self._kafkaTestUtils.sendMessages(topic, sendData)
+
+ rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders)
+ self._validateRddResult(sendData, rdd)
if __name__ == "__main__":
unittest.main()