aboutsummaryrefslogtreecommitdiff
path: root/repl/src/test
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-03-24 14:38:20 -0700
committerAndrew Or <andrew@databricks.com>2015-03-24 14:38:20 -0700
commit7215aa745590a3eec9c1ff35d28194235a550db7 (patch)
treed4f59accb3689715013848452a7d19958e3c5e52 /repl/src/test
parenta8f51b82968147abebbe61b8b68b066d21a0c6e6 (diff)
downloadspark-7215aa745590a3eec9c1ff35d28194235a550db7.tar.gz
spark-7215aa745590a3eec9c1ff35d28194235a550db7.tar.bz2
spark-7215aa745590a3eec9c1ff35d28194235a550db7.zip
[SPARK-6209] Clean up connections in ExecutorClassLoader after failing to load classes (master branch PR)
ExecutorClassLoader does not ensure proper cleanup of network connections that it opens. If it fails to load a class, it may leak partially-consumed InputStreams that are connected to the REPL's HTTP class server, causing that server to exhaust its thread pool, which can cause the entire job to hang. See [SPARK-6209](https://issues.apache.org/jira/browse/SPARK-6209) for more details, including a bug reproduction. This patch fixes this issue by ensuring proper cleanup of these resources. It also adds logging for unexpected error cases. This PR is an extended version of #4935 and adds a regression test. Author: Josh Rosen <joshrosen@databricks.com> Closes #4944 from JoshRosen/executorclassloader-leak-master-branch and squashes the following commits: e0e3c25 [Josh Rosen] Wrap try block around getReponseCode; re-enable keep-alive by closing error stream 961c284 [Josh Rosen] Roll back changes that were added to get the regression test to fail 7ee2261 [Josh Rosen] Add a failing regression test e2d70a3 [Josh Rosen] Properly clean up after errors in ExecutorClassLoader
Diffstat (limited to 'repl/src/test')
-rw-r--r--repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala70
1 files changed, 68 insertions, 2 deletions
diff --git a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
index 6a79e76a34..c709cde740 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
@@ -20,13 +20,25 @@ package org.apache.spark.repl
import java.io.File
import java.net.{URL, URLClassLoader}
+import scala.concurrent.duration._
+import scala.language.implicitConversions
+import scala.language.postfixOps
+
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
+import org.scalatest.concurrent.Interruptor
+import org.scalatest.concurrent.Timeouts._
+import org.scalatest.mock.MockitoSugar
+import org.mockito.Mockito._
-import org.apache.spark.{SparkConf, TestUtils}
+import org.apache.spark._
import org.apache.spark.util.Utils
-class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll {
+class ExecutorClassLoaderSuite
+ extends FunSuite
+ with BeforeAndAfterAll
+ with MockitoSugar
+ with Logging {
val childClassNames = List("ReplFakeClass1", "ReplFakeClass2")
val parentClassNames = List("ReplFakeClass1", "ReplFakeClass2", "ReplFakeClass3")
@@ -34,6 +46,7 @@ class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll {
var tempDir2: File = _
var url1: String = _
var urls2: Array[URL] = _
+ var classServer: HttpServer = _
override def beforeAll() {
super.beforeAll()
@@ -47,8 +60,12 @@ class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll {
override def afterAll() {
super.afterAll()
+ if (classServer != null) {
+ classServer.stop()
+ }
Utils.deleteRecursively(tempDir1)
Utils.deleteRecursively(tempDir2)
+ SparkEnv.set(null)
}
test("child first") {
@@ -83,4 +100,53 @@ class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll {
}
}
+ test("failing to fetch classes from HTTP server should not leak resources (SPARK-6209)") {
+ // This is a regression test for SPARK-6209, a bug where each failed attempt to load a class
+ // from the driver's class server would leak a HTTP connection, causing the class server's
+ // thread / connection pool to be exhausted.
+ val conf = new SparkConf()
+ val securityManager = new SecurityManager(conf)
+ classServer = new HttpServer(conf, tempDir1, securityManager)
+ classServer.start()
+ // ExecutorClassLoader uses SparkEnv's SecurityManager, so we need to mock this
+ val mockEnv = mock[SparkEnv]
+ when(mockEnv.securityManager).thenReturn(securityManager)
+ SparkEnv.set(mockEnv)
+ // Create an ExecutorClassLoader that's configured to load classes from the HTTP server
+ val parentLoader = new URLClassLoader(Array.empty, null)
+ val classLoader = new ExecutorClassLoader(conf, classServer.uri, parentLoader, false)
+ classLoader.httpUrlConnectionTimeoutMillis = 500
+ // Check that this class loader can actually load classes that exist
+ val fakeClass = classLoader.loadClass("ReplFakeClass2").newInstance()
+ val fakeClassVersion = fakeClass.toString
+ assert(fakeClassVersion === "1")
+ // Try to perform a full GC now, since GC during the test might mask resource leaks
+ System.gc()
+ // When the original bug occurs, the test thread becomes blocked in a classloading call
+ // and does not respond to interrupts. Therefore, use a custom ScalaTest interruptor to
+ // shut down the HTTP server when the test times out
+ val interruptor: Interruptor = new Interruptor {
+ override def apply(thread: Thread): Unit = {
+ classServer.stop()
+ classServer = null
+ thread.interrupt()
+ }
+ }
+ def tryAndFailToLoadABunchOfClasses(): Unit = {
+ // The number of trials here should be much larger than Jetty's thread / connection limit
+ // in order to expose thread or connection leaks
+ for (i <- 1 to 1000) {
+ if (Thread.currentThread().isInterrupted) {
+ throw new InterruptedException()
+ }
+ // Incorporate the iteration number into the class name in order to avoid any response
+ // caching that might be added in the future
+ intercept[ClassNotFoundException] {
+ classLoader.loadClass(s"ReplFakeClassDoesNotExist$i").newInstance()
+ }
+ }
+ }
+ failAfter(10 seconds)(tryAndFailToLoadABunchOfClasses())(interruptor)
+ }
+
}