diff options
author | jerryshao <sshao@hortonworks.com> | 2016-10-20 10:50:34 -0700 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-10-20 10:50:34 -0700 |
commit | 947f4f25273161dc4719419a35613a71c2e2a150 (patch) | |
tree | ee1cf12ded9530cb16b2bf0456af2f24d0836468 /external | |
parent | 84b245f2dd31c1cebbf12458bf11f67e287e93f4 (diff) | |
download | spark-947f4f25273161dc4719419a35613a71c2e2a150.tar.gz spark-947f4f25273161dc4719419a35613a71c2e2a150.tar.bz2 spark-947f4f25273161dc4719419a35613a71c2e2a150.zip |
[SPARK-17999][KAFKA][SQL] Add getPreferredLocations for KafkaSourceRDD
## What changes were proposed in this pull request?
The newly implemented Structured Streaming `KafkaSource` did calculate the preferred locations for each topic partition, but didn't offer this information through RDD's `getPreferredLocations` method. So here propose to add this method in `KafkaSourceRDD`.
## How was this patch tested?
Manual verification.
Author: jerryshao <sshao@hortonworks.com>
Closes #15545 from jerryshao/SPARK-17999.
Diffstat (limited to 'external')
-rw-r--r-- | external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala | 5 |
1 files changed, 5 insertions, 0 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala index 496af7e39a..802dd040ae 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala @@ -112,6 +112,11 @@ private[kafka010] class KafkaSourceRDD( buf.toArray } + override def getPreferredLocations(split: Partition): Seq[String] = { + val part = split.asInstanceOf[KafkaSourceRDDPartition] + part.offsetRange.preferredLoc.map(Seq(_)).getOrElse(Seq.empty) + } + override def compute( thePart: Partition, context: TaskContext): Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = { |