aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-04-01 15:00:38 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-04-01 15:00:38 -0700
commit0b7d4966ca7e02f351c4b92a74789cef4799fcb1 (patch)
treeca899869e31b7d717f22284f834d468394e7e971 /sql/core/src/test
parente41acb757327e3226ffe312766ec759c16616588 (diff)
downloadspark-0b7d4966ca7e02f351c4b92a74789cef4799fcb1.tar.gz
spark-0b7d4966ca7e02f351c4b92a74789cef4799fcb1.tar.bz2
spark-0b7d4966ca7e02f351c4b92a74789cef4799fcb1.zip
[SPARK-14316][SQL] StateStoreCoordinator should extend ThreadSafeRpcEndpoint
## What changes were proposed in this pull request? RpcEndpoint is not thread safe and allows multiple messages to be processed at the same time. StateStoreCoordinator should use ThreadSafeRpcEndpoint. ## How was this patch tested? Existing unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #12100 from zsxwing/fix-StateStoreCoordinator.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala8
1 files changed, 3 insertions, 5 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala
index df50cbde56..85db05157c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala
@@ -124,11 +124,9 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter with BeforeAn
coordinatorRef.reportActiveInstance(StateStoreId(path, opId, 0), "host1", "exec1")
coordinatorRef.reportActiveInstance(StateStoreId(path, opId, 1), "host2", "exec2")
- eventually(timeout(10 seconds)) {
- assert(
- coordinatorRef.getLocation(StateStoreId(path, opId, 0)) ===
- Some(ExecutorCacheTaskLocation("host1", "exec1").toString))
- }
+ assert(
+ coordinatorRef.getLocation(StateStoreId(path, opId, 0)) ===
+ Some(ExecutorCacheTaskLocation("host1", "exec1").toString))
val rdd = makeRDD(sc, Seq("a", "b", "a")).mapPartitionWithStateStore(
increment, path, opId, storeVersion = 0, keySchema, valueSchema)