aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-11-20 14:31:26 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-11-20 14:31:26 -0800
commit89fd9bd06160fa89dedbf685bfe159ffe4a06ec6 (patch)
tree1ae829b6427f41b226f90ab7eddc5094d95d71b9 /core
parentbe7a2cfd978143f6f265eca63e9e24f755bc9f22 (diff)
downloadspark-89fd9bd06160fa89dedbf685bfe159ffe4a06ec6.tar.gz
spark-89fd9bd06160fa89dedbf685bfe159ffe4a06ec6.tar.bz2
spark-89fd9bd06160fa89dedbf685bfe159ffe4a06ec6.zip
[SPARK-11887] Close PersistenceEngine at the end of PersistenceEngineSuite tests
In PersistenceEngineSuite, we do not call `close()` on the PersistenceEngine at the end of the test. For the ZooKeeperPersistenceEngine, this causes us to leak a ZooKeeper client, causing the logs of unrelated tests to be periodically spammed with connection error messages from that client: ``` 15/11/20 05:13:35.789 pool-1-thread-1-ScalaTest-running-PersistenceEngineSuite-SendThread(localhost:15741) INFO ClientCnxn: Opening socket connection to server localhost/127.0.0.1:15741. Will not attempt to authenticate using SASL (unknown error) 15/11/20 05:13:35.790 pool-1-thread-1-ScalaTest-running-PersistenceEngineSuite-SendThread(localhost:15741) WARN ClientCnxn: Session 0x15124ff48dd0000 for server null, unexpected error, closing socket connection and attempting reconnect java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:350) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1068) ``` This patch fixes this by using a `finally` block. Author: Josh Rosen <joshrosen@databricks.com> Closes #9864 from JoshRosen/close-zookeeper-client-in-tests.
Diffstat (limited to 'core')
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala100
1 files changed, 52 insertions, 48 deletions
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
index 34775577de..7a44728675 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
@@ -63,56 +63,60 @@ class PersistenceEngineSuite extends SparkFunSuite {
conf: SparkConf, persistenceEngineCreator: Serializer => PersistenceEngine): Unit = {
val serializer = new JavaSerializer(conf)
val persistenceEngine = persistenceEngineCreator(serializer)
- persistenceEngine.persist("test_1", "test_1_value")
- assert(Seq("test_1_value") === persistenceEngine.read[String]("test_"))
- persistenceEngine.persist("test_2", "test_2_value")
- assert(Set("test_1_value", "test_2_value") === persistenceEngine.read[String]("test_").toSet)
- persistenceEngine.unpersist("test_1")
- assert(Seq("test_2_value") === persistenceEngine.read[String]("test_"))
- persistenceEngine.unpersist("test_2")
- assert(persistenceEngine.read[String]("test_").isEmpty)
-
- // Test deserializing objects that contain RpcEndpointRef
- val testRpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
try {
- // Create a real endpoint so that we can test RpcEndpointRef deserialization
- val workerEndpoint = testRpcEnv.setupEndpoint("worker", new RpcEndpoint {
- override val rpcEnv: RpcEnv = testRpcEnv
- })
-
- val workerToPersist = new WorkerInfo(
- id = "test_worker",
- host = "127.0.0.1",
- port = 10000,
- cores = 0,
- memory = 0,
- endpoint = workerEndpoint,
- webUiPort = 0,
- publicAddress = ""
- )
-
- persistenceEngine.addWorker(workerToPersist)
-
- val (storedApps, storedDrivers, storedWorkers) =
- persistenceEngine.readPersistedData(testRpcEnv)
-
- assert(storedApps.isEmpty)
- assert(storedDrivers.isEmpty)
-
- // Check deserializing WorkerInfo
- assert(storedWorkers.size == 1)
- val recoveryWorkerInfo = storedWorkers.head
- assert(workerToPersist.id === recoveryWorkerInfo.id)
- assert(workerToPersist.host === recoveryWorkerInfo.host)
- assert(workerToPersist.port === recoveryWorkerInfo.port)
- assert(workerToPersist.cores === recoveryWorkerInfo.cores)
- assert(workerToPersist.memory === recoveryWorkerInfo.memory)
- assert(workerToPersist.endpoint === recoveryWorkerInfo.endpoint)
- assert(workerToPersist.webUiPort === recoveryWorkerInfo.webUiPort)
- assert(workerToPersist.publicAddress === recoveryWorkerInfo.publicAddress)
+ persistenceEngine.persist("test_1", "test_1_value")
+ assert(Seq("test_1_value") === persistenceEngine.read[String]("test_"))
+ persistenceEngine.persist("test_2", "test_2_value")
+ assert(Set("test_1_value", "test_2_value") === persistenceEngine.read[String]("test_").toSet)
+ persistenceEngine.unpersist("test_1")
+ assert(Seq("test_2_value") === persistenceEngine.read[String]("test_"))
+ persistenceEngine.unpersist("test_2")
+ assert(persistenceEngine.read[String]("test_").isEmpty)
+
+ // Test deserializing objects that contain RpcEndpointRef
+ val testRpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
+ try {
+ // Create a real endpoint so that we can test RpcEndpointRef deserialization
+ val workerEndpoint = testRpcEnv.setupEndpoint("worker", new RpcEndpoint {
+ override val rpcEnv: RpcEnv = testRpcEnv
+ })
+
+ val workerToPersist = new WorkerInfo(
+ id = "test_worker",
+ host = "127.0.0.1",
+ port = 10000,
+ cores = 0,
+ memory = 0,
+ endpoint = workerEndpoint,
+ webUiPort = 0,
+ publicAddress = ""
+ )
+
+ persistenceEngine.addWorker(workerToPersist)
+
+ val (storedApps, storedDrivers, storedWorkers) =
+ persistenceEngine.readPersistedData(testRpcEnv)
+
+ assert(storedApps.isEmpty)
+ assert(storedDrivers.isEmpty)
+
+ // Check deserializing WorkerInfo
+ assert(storedWorkers.size == 1)
+ val recoveryWorkerInfo = storedWorkers.head
+ assert(workerToPersist.id === recoveryWorkerInfo.id)
+ assert(workerToPersist.host === recoveryWorkerInfo.host)
+ assert(workerToPersist.port === recoveryWorkerInfo.port)
+ assert(workerToPersist.cores === recoveryWorkerInfo.cores)
+ assert(workerToPersist.memory === recoveryWorkerInfo.memory)
+ assert(workerToPersist.endpoint === recoveryWorkerInfo.endpoint)
+ assert(workerToPersist.webUiPort === recoveryWorkerInfo.webUiPort)
+ assert(workerToPersist.publicAddress === recoveryWorkerInfo.publicAddress)
+ } finally {
+ testRpcEnv.shutdown()
+ testRpcEnv.awaitTermination()
+ }
} finally {
- testRpcEnv.shutdown()
- testRpcEnv.awaitTermination()
+ persistenceEngine.close()
}
}