aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10-sql
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka-0-10-sql')
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala5
1 files changed, 5 insertions, 0 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
index 496af7e39a..802dd040ae 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
@@ -112,6 +112,11 @@ private[kafka010] class KafkaSourceRDD(
buf.toArray
}
+ override def getPreferredLocations(split: Partition): Seq[String] = {
+ val part = split.asInstanceOf[KafkaSourceRDDPartition]
+ part.offsetRange.preferredLoc.map(Seq(_)).getOrElse(Seq.empty)
+ }
+
override def compute(
thePart: Partition,
context: TaskContext): Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = {