aboutsummaryrefslogtreecommitdiff
path: root/external/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala13
1 files changed, 13 insertions, 0 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 0e33362d34..f3b01bd60b 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -670,4 +670,17 @@ private class KafkaUtilsPythonHelper {
TopicAndPartition(topic, partition)
def createBroker(host: String, port: JInt): Broker = Broker(host, port)
+
+ def offsetRangesOfKafkaRDD(rdd: RDD[_]): JList[OffsetRange] = {
+ val parentRDDs = rdd.getNarrowAncestors
+ val kafkaRDDs = parentRDDs.filter(rdd => rdd.isInstanceOf[KafkaRDD[_, _, _, _, _]])
+
+ require(
+ kafkaRDDs.length == 1,
+ "Cannot get offset ranges, as there may be multiple Kafka RDDs or no Kafka RDD associated" +
+ "with this RDD, please call this method only on a Kafka RDD.")
+
+ val kafkaRDD = kafkaRDDs.head.asInstanceOf[KafkaRDD[_, _, _, _, _]]
+ kafkaRDD.offsetRanges.toSeq
+ }
}