diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-12-07 13:47:44 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2016-12-07 13:47:44 -0800 |
commit | edc87e18922b98be47c298cdc3daa2b049a737e9 (patch) | |
tree | 1feaf9bb0c442fa745bc1e7a7824370ba29b819e /sql/core/src/test/scala | |
parent | bb94f61a7ac97bf904ec0e8d5a4ab69a4142443f (diff) | |
download | spark-edc87e18922b98be47c298cdc3daa2b049a737e9.tar.gz spark-edc87e18922b98be47c298cdc3daa2b049a737e9.tar.bz2 spark-edc87e18922b98be47c298cdc3daa2b049a737e9.zip |
[SPARK-18588][TESTS] Fix flaky test: KafkaSourceStressForDontFailOnDataLossSuite
## What changes were proposed in this pull request?
Fixed the following failures:
```
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 3745 times over 1.0000790851666665 minutes. Last failure message: assertion failed: failOnDataLoss-0 not deleted after timeout.
```
```
sbt.ForkMain$ForkError: org.apache.spark.sql.streaming.StreamingQueryException: Query query-66 terminated with exception: null
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:252)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:146)
Caused by: sbt.ForkMain$ForkError: java.lang.NullPointerException: null
at java.util.ArrayList.addAll(ArrayList.java:577)
at org.apache.kafka.clients.Metadata.getClusterForCurrentTopics(Metadata.java:257)
at org.apache.kafka.clients.Metadata.update(Metadata.java:177)
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleResponse(NetworkClient.java:605)
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeHandleCompletedReceive(NetworkClient.java:582)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:450)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:260)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at
...
```
## How was this patch tested?
Tested in #16048 by running many times.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #16109 from zsxwing/fix-kafka-flaky-test.
Diffstat (limited to 'sql/core/src/test/scala')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala | 8 |
1 files changed, 6 insertions, 2 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala index db24ee8b46..2239f10870 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala @@ -48,14 +48,18 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach { */ protected implicit def sqlContext: SQLContext = _spark.sqlContext + protected def createSparkSession: TestSparkSession = { + new TestSparkSession( + sparkConf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)) + } + /** * Initialize the [[TestSparkSession]]. */ protected override def beforeAll(): Unit = { SparkSession.sqlListener.set(null) if (_spark == null) { - _spark = new TestSparkSession( - sparkConf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)) + _spark = createSparkSession } // Ensure we have initialized the context before calling parent code super.beforeAll() |