From 0b7d4966ca7e02f351c4b92a74789cef4799fcb1 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 1 Apr 2016 15:00:38 -0700 Subject: [SPARK-14316][SQL] StateStoreCoordinator should extend ThreadSafeRpcEndpoint ## What changes were proposed in this pull request? RpcEndpoint is not thread safe and allows multiple messages to be processed at the same time. StateStoreCoordinator should use ThreadSafeRpcEndpoint. ## How was this patch tested? Existing unit tests. Author: Shixiong Zhu Closes #12100 from zsxwing/fix-StateStoreCoordinator. --- .../sql/execution/streaming/state/StateStoreCoordinator.scala | 4 ++-- .../spark/sql/execution/streaming/state/StateStoreRDDSuite.scala | 8 +++----- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala index 5aa0636850..812e1b0a39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging -import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv} +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.util.RpcUtils @@ -112,7 +112,7 @@ private[sql] class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointR * Class for coordinating instances of [[StateStore]]s loaded in executors across the cluster, * and get their locations for job scheduling. */ -private class StateStoreCoordinator(override val rpcEnv: RpcEnv) extends RpcEndpoint { +private class StateStoreCoordinator(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint { private val instances = new mutable.HashMap[StateStoreId, ExecutorCacheTaskLocation] override def receive: PartialFunction[Any, Unit] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala index df50cbde56..85db05157c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala @@ -124,11 +124,9 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter with BeforeAn coordinatorRef.reportActiveInstance(StateStoreId(path, opId, 0), "host1", "exec1") coordinatorRef.reportActiveInstance(StateStoreId(path, opId, 1), "host2", "exec2") - eventually(timeout(10 seconds)) { - assert( - coordinatorRef.getLocation(StateStoreId(path, opId, 0)) === - Some(ExecutorCacheTaskLocation("host1", "exec1").toString)) - } + assert( + coordinatorRef.getLocation(StateStoreId(path, opId, 0)) === + Some(ExecutorCacheTaskLocation("host1", "exec1").toString)) val rdd = makeRDD(sc, Seq("a", "b", "a")).mapPartitionWithStateStore( increment, path, opId, storeVersion = 0, keySchema, valueSchema) -- cgit v1.2.3