aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala')
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala21
1 files changed, 20 insertions, 1 deletions
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index 7e60410c90..2ce2760b7f 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -50,7 +50,7 @@ import org.apache.spark.SparkConf
*
* The reason to put Kafka test utility class in src is to test Python related Kafka APIs.
*/
-class KafkaTestUtils extends Logging {
+class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends Logging {
// Zookeeper related configurations
private val zkHost = "localhost"
@@ -249,6 +249,24 @@ class KafkaTestUtils extends Logging {
offsets
}
+ def cleanupLogs(): Unit = {
+ server.logManager.cleanupLogs()
+ }
+
+ def getEarliestOffsets(topics: Set[String]): Map[TopicPartition, Long] = {
+ val kc = new KafkaConsumer[String, String](consumerConfiguration)
+ logInfo("Created consumer to get earliest offsets")
+ kc.subscribe(topics.asJavaCollection)
+ kc.poll(0)
+ val partitions = kc.assignment()
+ kc.pause(partitions)
+ kc.seekToBeginning(partitions)
+ val offsets = partitions.asScala.map(p => p -> kc.position(p)).toMap
+ kc.close()
+ logInfo("Closed consumer to get earliest offsets")
+ offsets
+ }
+
def getLatestOffsets(topics: Set[String]): Map[TopicPartition, Long] = {
val kc = new KafkaConsumer[String, String](consumerConfiguration)
logInfo("Created consumer to get latest offsets")
@@ -274,6 +292,7 @@ class KafkaTestUtils extends Logging {
props.put("log.flush.interval.messages", "1")
props.put("replica.socket.timeout.ms", "1500")
props.put("delete.topic.enable", "true")
+ props.putAll(withBrokerProps.asJava)
props
}