diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2016-03-25 12:04:47 -0700 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-03-25 12:04:47 -0700 |
commit | 11fa8741ca5b550e4f373c5d6e520c64f5118d0c (patch) | |
tree | 67ada7c1559e7b99c776f0ec36974f52730bb4a3 | |
parent | b5f8c36e3c93750cea1473019ddd95538eccb4f3 (diff) | |
download | spark-11fa8741ca5b550e4f373c5d6e520c64f5118d0c.tar.gz spark-11fa8741ca5b550e4f373c5d6e520c64f5118d0c.tar.bz2 spark-11fa8741ca5b550e4f373c5d6e520c64f5118d0c.zip |
[SQL][HOTFIX] Fix flakiness in StateStoreRDDSuite
## What changes were proposed in this pull request?
StateStoreCoordinator.reportActiveInstance is async, so subsequence state checks must be in eventually.
## How was this patch tested?
Jenkins tests
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #11924 from tdas/state-store-flaky-fix.
2 files changed, 8 insertions, 4 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala index c99c2f505f..a7e3262626 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala @@ -71,7 +71,6 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext { assert(coordinatorRef.verifyIfInstanceActive(id1, exec) === true) assert(coordinatorRef.verifyIfInstanceActive(id2, exec) === true) assert(coordinatorRef.verifyIfInstanceActive(id3, exec) === true) - } coordinatorRef.deactivateInstances("x") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala index 24cec30fa3..df50cbde56 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala @@ -23,6 +23,8 @@ import java.nio.file.Files import scala.util.Random import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} +import org.scalatest.concurrent.Eventually._ +import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.LocalSparkContext._ @@ -121,9 +123,12 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter with BeforeAn val coordinatorRef = sqlContext.streams.stateStoreCoordinator coordinatorRef.reportActiveInstance(StateStoreId(path, opId, 0), "host1", "exec1") coordinatorRef.reportActiveInstance(StateStoreId(path, opId, 1), "host2", "exec2") - assert( - coordinatorRef.getLocation(StateStoreId(path, opId, 0)) === - Some(ExecutorCacheTaskLocation("host1", "exec1").toString)) + + eventually(timeout(10 seconds)) { + assert( + coordinatorRef.getLocation(StateStoreId(path, opId, 0)) === + Some(ExecutorCacheTaskLocation("host1", "exec1").toString)) + } val rdd = makeRDD(sc, Seq("a", "b", "a")).mapPartitionWithStateStore( increment, path, opId, storeVersion = 0, keySchema, valueSchema) |