aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-03-25 12:04:47 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-03-25 12:04:47 -0700
commit11fa8741ca5b550e4f373c5d6e520c64f5118d0c (patch)
tree67ada7c1559e7b99c776f0ec36974f52730bb4a3
parentb5f8c36e3c93750cea1473019ddd95538eccb4f3 (diff)
downloadspark-11fa8741ca5b550e4f373c5d6e520c64f5118d0c.tar.gz
spark-11fa8741ca5b550e4f373c5d6e520c64f5118d0c.tar.bz2
spark-11fa8741ca5b550e4f373c5d6e520c64f5118d0c.zip
[SQL][HOTFIX] Fix flakiness in StateStoreRDDSuite
## What changes were proposed in this pull request? StateStoreCoordinator.reportActiveInstance is async, so subsequence state checks must be in eventually. ## How was this patch tested? Jenkins tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #11924 from tdas/state-store-flaky-fix.
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala1
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala11
2 files changed, 8 insertions, 4 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
index c99c2f505f..a7e3262626 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
@@ -71,7 +71,6 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
assert(coordinatorRef.verifyIfInstanceActive(id1, exec) === true)
assert(coordinatorRef.verifyIfInstanceActive(id2, exec) === true)
assert(coordinatorRef.verifyIfInstanceActive(id3, exec) === true)
-
}
coordinatorRef.deactivateInstances("x")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala
index 24cec30fa3..df50cbde56 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala
@@ -23,6 +23,8 @@ import java.nio.file.Files
import scala.util.Random
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.LocalSparkContext._
@@ -121,9 +123,12 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter with BeforeAn
val coordinatorRef = sqlContext.streams.stateStoreCoordinator
coordinatorRef.reportActiveInstance(StateStoreId(path, opId, 0), "host1", "exec1")
coordinatorRef.reportActiveInstance(StateStoreId(path, opId, 1), "host2", "exec2")
- assert(
- coordinatorRef.getLocation(StateStoreId(path, opId, 0)) ===
- Some(ExecutorCacheTaskLocation("host1", "exec1").toString))
+
+ eventually(timeout(10 seconds)) {
+ assert(
+ coordinatorRef.getLocation(StateStoreId(path, opId, 0)) ===
+ Some(ExecutorCacheTaskLocation("host1", "exec1").toString))
+ }
val rdd = makeRDD(sc, Seq("a", "b", "a")).mapPartitionWithStateStore(
increment, path, opId, storeVersion = 0, keySchema, valueSchema)