diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-04-01 15:00:38 -0700 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-04-01 15:00:38 -0700 |
commit | 0b7d4966ca7e02f351c4b92a74789cef4799fcb1 (patch) | |
tree | ca899869e31b7d717f22284f834d468394e7e971 /sql/core/src/test | |
parent | e41acb757327e3226ffe312766ec759c16616588 (diff) | |
download | spark-0b7d4966ca7e02f351c4b92a74789cef4799fcb1.tar.gz spark-0b7d4966ca7e02f351c4b92a74789cef4799fcb1.tar.bz2 spark-0b7d4966ca7e02f351c4b92a74789cef4799fcb1.zip |
[SPARK-14316][SQL] StateStoreCoordinator should extend ThreadSafeRpcEndpoint
## What changes were proposed in this pull request?
RpcEndpoint is not thread safe and allows multiple messages to be processed at the same time. StateStoreCoordinator should use ThreadSafeRpcEndpoint.
## How was this patch tested?
Existing unit tests.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #12100 from zsxwing/fix-StateStoreCoordinator.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala | 8 |
1 files changed, 3 insertions, 5 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala index df50cbde56..85db05157c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala @@ -124,11 +124,9 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter with BeforeAn coordinatorRef.reportActiveInstance(StateStoreId(path, opId, 0), "host1", "exec1") coordinatorRef.reportActiveInstance(StateStoreId(path, opId, 1), "host2", "exec2") - eventually(timeout(10 seconds)) { - assert( - coordinatorRef.getLocation(StateStoreId(path, opId, 0)) === - Some(ExecutorCacheTaskLocation("host1", "exec1").toString)) - } + assert( + coordinatorRef.getLocation(StateStoreId(path, opId, 0)) === + Some(ExecutorCacheTaskLocation("host1", "exec1").toString)) val rdd = makeRDD(sc, Seq("a", "b", "a")).mapPartitionWithStateStore( increment, path, opId, storeVersion = 0, keySchema, valueSchema) |