aboutsummaryrefslogtreecommitdiff
path: root/external/kafka
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2015-07-09 13:54:44 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-07-09 13:54:44 -0700
commit3ccebf36c5abe04702d4cf223552a94034d980fb (patch)
tree40990a75e75399422a2d34926505d5521db8edbe /external/kafka
parent1f6b0b1234cc03aa2e07aea7fec2de7563885238 (diff)
downloadspark-3ccebf36c5abe04702d4cf223552a94034d980fb.tar.gz
spark-3ccebf36c5abe04702d4cf223552a94034d980fb.tar.bz2
spark-3ccebf36c5abe04702d4cf223552a94034d980fb.zip
[SPARK-8389] [STREAMING] [PYSPARK] Expose KafkaRDDs offsetRange in Python
This PR propose a simple way to expose OffsetRange in Python code, also the usage of offsetRanges is similar to Scala/Java way, here in Python we could get OffsetRange like: ``` dstream.foreachRDD(lambda r: KafkaUtils.offsetRanges(r)) ``` Reason I didn't follow the way what SPARK-8389 suggested is that: Python Kafka API has one more step to decode the message compared to Scala/Java, Which makes Python API return a transformed RDD/DStream, not directly wrapped so-called JavaKafkaRDD, so it is hard to backtrack to the original RDD to get the offsetRange. Author: jerryshao <saisai.shao@intel.com> Closes #7185 from jerryshao/SPARK-8389 and squashes the following commits: 4c6d320 [jerryshao] Another way to fix subclass deserialization issue e6a8011 [jerryshao] Address the comments fd13937 [jerryshao] Fix serialization bug 7debf1c [jerryshao] bug fix cff3893 [jerryshao] refactor the code according to the comments 2aabf9e [jerryshao] Style fix 848c708 [jerryshao] Add HasOffsetRanges for Python
Diffstat (limited to 'external/kafka')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala13
1 files changed, 13 insertions, 0 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 0e33362d34..f3b01bd60b 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -670,4 +670,17 @@ private class KafkaUtilsPythonHelper {
TopicAndPartition(topic, partition)
def createBroker(host: String, port: JInt): Broker = Broker(host, port)
+
+ def offsetRangesOfKafkaRDD(rdd: RDD[_]): JList[OffsetRange] = {
+ val parentRDDs = rdd.getNarrowAncestors
+ val kafkaRDDs = parentRDDs.filter(rdd => rdd.isInstanceOf[KafkaRDD[_, _, _, _, _]])
+
+ require(
+ kafkaRDDs.length == 1,
+ "Cannot get offset ranges, as there may be multiple Kafka RDDs or no Kafka RDD associated" +
+ "with this RDD, please call this method only on a Kafka RDD.")
+
+ val kafkaRDD = kafkaRDDs.head.asInstanceOf[KafkaRDD[_, _, _, _, _]]
+ kafkaRDD.offsetRanges.toSeq
+ }
}