aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-12-07 13:47:44 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-12-07 13:47:44 -0800
commitedc87e18922b98be47c298cdc3daa2b049a737e9 (patch)
tree1feaf9bb0c442fa745bc1e7a7824370ba29b819e /external
parentbb94f61a7ac97bf904ec0e8d5a4ab69a4142443f (diff)
downloadspark-edc87e18922b98be47c298cdc3daa2b049a737e9.tar.gz
spark-edc87e18922b98be47c298cdc3daa2b049a737e9.tar.bz2
spark-edc87e18922b98be47c298cdc3daa2b049a737e9.zip
[SPARK-18588][TESTS] Fix flaky test: KafkaSourceStressForDontFailOnDataLossSuite
## What changes were proposed in this pull request? Fixed the following failures: ``` org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 3745 times over 1.0000790851666665 minutes. Last failure message: assertion failed: failOnDataLoss-0 not deleted after timeout. ``` ``` sbt.ForkMain$ForkError: org.apache.spark.sql.streaming.StreamingQueryException: Query query-66 terminated with exception: null at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:252) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:146) Caused by: sbt.ForkMain$ForkError: java.lang.NullPointerException: null at java.util.ArrayList.addAll(ArrayList.java:577) at org.apache.kafka.clients.Metadata.getClusterForCurrentTopics(Metadata.java:257) at org.apache.kafka.clients.Metadata.update(Metadata.java:177) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleResponse(NetworkClient.java:605) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeHandleCompletedReceive(NetworkClient.java:582) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:450) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:260) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938) at ... ``` ## How was this patch tested? Tested in #16048 by running many times. Author: Shixiong Zhu <shixiong@databricks.com> Closes #16109 from zsxwing/fix-kafka-flaky-test.
Diffstat (limited to 'external')
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala39
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala2
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala11
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala75
4 files changed, 90 insertions, 37 deletions
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
index 3f438e9918..3f396a7e6b 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
@@ -86,7 +86,7 @@ private[kafka010] case class CachedKafkaConsumer private(
var toFetchOffset = offset
while (toFetchOffset != UNKNOWN_OFFSET) {
try {
- return fetchData(toFetchOffset, pollTimeoutMs)
+ return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
} catch {
case e: OffsetOutOfRangeException =>
// When there is some error thrown, it's better to use a new consumer to drop all cached
@@ -159,14 +159,18 @@ private[kafka010] case class CachedKafkaConsumer private(
}
/**
- * Get the record at `offset`.
+ * Get the record for the given offset if available. Otherwise it will either throw error
+ * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
+ * or null.
*
* @throws OffsetOutOfRangeException if `offset` is out of range
* @throws TimeoutException if cannot fetch the record in `pollTimeoutMs` milliseconds.
*/
private def fetchData(
offset: Long,
- pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
+ untilOffset: Long,
+ pollTimeoutMs: Long,
+ failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
// This is the first fetch, or the last pre-fetched data has been drained.
// Seek to the offset because we may call seekToBeginning or seekToEnd before this.
@@ -190,10 +194,31 @@ private[kafka010] case class CachedKafkaConsumer private(
} else {
val record = fetchedData.next()
nextOffsetInFetchedData = record.offset + 1
- // `seek` is always called before "poll". So "record.offset" must be same as "offset".
- assert(record.offset == offset,
- s"The fetched data has a different offset: expected $offset but was ${record.offset}")
- record
+ // In general, Kafka uses the specified offset as the start point, and tries to fetch the next
+ // available offset. Hence we need to handle offset mismatch.
+ if (record.offset > offset) {
+ // This may happen when some records aged out but their offsets already got verified
+ if (failOnDataLoss) {
+ reportDataLoss(true, s"Cannot fetch records in [$offset, ${record.offset})")
+ // Never happen as "reportDataLoss" will throw an exception
+ null
+ } else {
+ if (record.offset >= untilOffset) {
+ reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)")
+ null
+ } else {
+ reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})")
+ record
+ }
+ }
+ } else if (record.offset < offset) {
+ // This should not happen. If it does happen, then we probably misunderstand Kafka internal
+ // mechanism.
+ throw new IllegalStateException(
+ s"Tried to fetch $offset but the returned record offset was ${record.offset}")
+ } else {
+ record
+ }
}
}
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index d9ab4bb4f8..92ee0ed93d 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -102,7 +102,7 @@ private[kafka010] case class KafkaSource(
sourceOptions.getOrElse("fetchOffset.numRetries", "3").toInt
private val offsetFetchAttemptIntervalMs =
- sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "10").toLong
+ sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong
private val maxOffsetsPerTrigger =
sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index 2d6ccb22dd..0e40abac65 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -31,11 +31,12 @@ import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._
+import org.apache.spark.SparkContext
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
-import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
@@ -811,6 +812,11 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}"
+ override def createSparkSession(): TestSparkSession = {
+ // Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
+ new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf))
+ }
+
override def beforeAll(): Unit = {
super.beforeAll()
testUtils = new KafkaTestUtils {
@@ -839,7 +845,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
}
}
- ignore("stress test for failOnDataLoss=false") {
+ test("stress test for failOnDataLoss=false") {
val reader = spark
.readStream
.format("kafka")
@@ -848,6 +854,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
.option("subscribePattern", "failOnDataLoss.*")
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
+ .option("fetchOffset.retryIntervalMs", "3000")
val kafka = reader.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index f43917e151..fd1689acf6 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -184,7 +184,7 @@ class KafkaTestUtils extends Logging {
def deleteTopic(topic: String): Unit = {
val partitions = zkUtils.getPartitionsForTopics(Seq(topic))(topic).size
AdminUtils.deleteTopic(zkUtils, topic)
- verifyTopicDeletion(zkUtils, topic, partitions, List(this.server))
+ verifyTopicDeletionWithRetries(zkUtils, topic, partitions, List(this.server))
}
/** Add new paritions to a Kafka topic */
@@ -286,36 +286,57 @@ class KafkaTestUtils extends Logging {
props
}
+ /** Verify topic is deleted in all places, e.g, brokers, zookeeper. */
private def verifyTopicDeletion(
+ topic: String,
+ numPartitions: Int,
+ servers: Seq[KafkaServer]): Unit = {
+ val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
+
+ import ZkUtils._
+ // wait until admin path for delete topic is deleted, signaling completion of topic deletion
+ assert(
+ !zkUtils.pathExists(getDeleteTopicPath(topic)),
+ s"${getDeleteTopicPath(topic)} still exists")
+ assert(!zkUtils.pathExists(getTopicPath(topic)), s"${getTopicPath(topic)} still exists")
+ // ensure that the topic-partition has been deleted from all brokers' replica managers
+ assert(servers.forall(server => topicAndPartitions.forall(tp =>
+ server.replicaManager.getPartition(tp.topic, tp.partition) == None)),
+ s"topic $topic still exists in the replica manager")
+ // ensure that logs from all replicas are deleted if delete topic is marked successful
+ assert(servers.forall(server => topicAndPartitions.forall(tp =>
+ server.getLogManager().getLog(tp).isEmpty)),
+ s"topic $topic still exists in log mananger")
+ // ensure that topic is removed from all cleaner offsets
+ assert(servers.forall(server => topicAndPartitions.forall { tp =>
+ val checkpoints = server.getLogManager().logDirs.map { logDir =>
+ new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
+ }
+ checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
+ }), s"checkpoint for topic $topic still exists")
+ // ensure the topic is gone
+ assert(
+ !zkUtils.getAllTopics().contains(topic),
+ s"topic $topic still exists on zookeeper")
+ }
+
+ /** Verify topic is deleted. Retry to delete the topic if not. */
+ private def verifyTopicDeletionWithRetries(
zkUtils: ZkUtils,
topic: String,
numPartitions: Int,
servers: Seq[KafkaServer]) {
- import ZkUtils._
- val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
- def isDeleted(): Boolean = {
- // wait until admin path for delete topic is deleted, signaling completion of topic deletion
- val deletePath = !zkUtils.pathExists(getDeleteTopicPath(topic))
- val topicPath = !zkUtils.pathExists(getTopicPath(topic))
- // ensure that the topic-partition has been deleted from all brokers' replica managers
- val replicaManager = servers.forall(server => topicAndPartitions.forall(tp =>
- server.replicaManager.getPartition(tp.topic, tp.partition) == None))
- // ensure that logs from all replicas are deleted if delete topic is marked successful
- val logManager = servers.forall(server => topicAndPartitions.forall(tp =>
- server.getLogManager().getLog(tp).isEmpty))
- // ensure that topic is removed from all cleaner offsets
- val cleaner = servers.forall(server => topicAndPartitions.forall { tp =>
- val checkpoints = server.getLogManager().logDirs.map { logDir =>
- new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
- }
- checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
- })
- // ensure the topic is gone
- val deleted = !zkUtils.getAllTopics().contains(topic)
- deletePath && topicPath && replicaManager && logManager && cleaner && deleted
- }
- eventually(timeout(60.seconds)) {
- assert(isDeleted, s"$topic not deleted after timeout")
+ eventually(timeout(60.seconds), interval(200.millis)) {
+ try {
+ verifyTopicDeletion(topic, numPartitions, servers)
+ } catch {
+ case e: Throwable =>
+ // As pushing messages into Kafka updates Zookeeper asynchronously, there is a small
+ // chance that a topic will be recreated after deletion due to the asynchronous update.
+ // Hence, delete the topic and retry.
+ AdminUtils.deleteTopic(zkUtils, topic)
+ throw e
+ }
}
}
@@ -331,7 +352,7 @@ class KafkaTestUtils extends Logging {
case _ =>
false
}
- eventually(timeout(10.seconds)) {
+ eventually(timeout(60.seconds)) {
assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
}
}