aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2016-10-20 10:50:34 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-10-20 10:50:34 -0700
commit947f4f25273161dc4719419a35613a71c2e2a150 (patch)
treeee1cf12ded9530cb16b2bf0456af2f24d0836468 /external
parent84b245f2dd31c1cebbf12458bf11f67e287e93f4 (diff)
downloadspark-947f4f25273161dc4719419a35613a71c2e2a150.tar.gz
spark-947f4f25273161dc4719419a35613a71c2e2a150.tar.bz2
spark-947f4f25273161dc4719419a35613a71c2e2a150.zip
[SPARK-17999][KAFKA][SQL] Add getPreferredLocations for KafkaSourceRDD
## What changes were proposed in this pull request? The newly implemented Structured Streaming `KafkaSource` did calculate the preferred locations for each topic partition, but didn't offer this information through RDD's `getPreferredLocations` method. So here propose to add this method in `KafkaSourceRDD`. ## How was this patch tested? Manual verification. Author: jerryshao <sshao@hortonworks.com> Closes #15545 from jerryshao/SPARK-17999.
Diffstat (limited to 'external')
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala5
1 files changed, 5 insertions, 0 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
index 496af7e39a..802dd040ae 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
@@ -112,6 +112,11 @@ private[kafka010] class KafkaSourceRDD(
buf.toArray
}
+ override def getPreferredLocations(split: Partition): Seq[String] = {
+ val part = split.asInstanceOf[KafkaSourceRDDPartition]
+ part.offsetRange.preferredLoc.map(Seq(_)).getOrElse(Seq.empty)
+ }
+
override def compute(
thePart: Partition,
context: TaskContext): Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = {