diff options
Diffstat (limited to 'external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala')
-rw-r--r-- | external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala | 21 |
1 files changed, 20 insertions, 1 deletions
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 7e60410c90..2ce2760b7f 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -50,7 +50,7 @@ import org.apache.spark.SparkConf * * The reason to put Kafka test utility class in src is to test Python related Kafka APIs. */ -class KafkaTestUtils extends Logging { +class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends Logging { // Zookeeper related configurations private val zkHost = "localhost" @@ -249,6 +249,24 @@ class KafkaTestUtils extends Logging { offsets } + def cleanupLogs(): Unit = { + server.logManager.cleanupLogs() + } + + def getEarliestOffsets(topics: Set[String]): Map[TopicPartition, Long] = { + val kc = new KafkaConsumer[String, String](consumerConfiguration) + logInfo("Created consumer to get earliest offsets") + kc.subscribe(topics.asJavaCollection) + kc.poll(0) + val partitions = kc.assignment() + kc.pause(partitions) + kc.seekToBeginning(partitions) + val offsets = partitions.asScala.map(p => p -> kc.position(p)).toMap + kc.close() + logInfo("Closed consumer to get earliest offsets") + offsets + } + def getLatestOffsets(topics: Set[String]): Map[TopicPartition, Long] = { val kc = new KafkaConsumer[String, String](consumerConfiguration) logInfo("Created consumer to get latest offsets") @@ -274,6 +292,7 @@ class KafkaTestUtils extends Logging { props.put("log.flush.interval.messages", "1") props.put("replica.socket.timeout.ms", "1500") props.put("delete.topic.enable", "true") + props.putAll(withBrokerProps.asJava) props } |