aboutsummaryrefslogtreecommitdiff
path: root/repl/src/main
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-03-24 14:38:20 -0700
committerAndrew Or <andrew@databricks.com>2015-03-24 14:38:20 -0700
commit7215aa745590a3eec9c1ff35d28194235a550db7 (patch)
treed4f59accb3689715013848452a7d19958e3c5e52 /repl/src/main
parenta8f51b82968147abebbe61b8b68b066d21a0c6e6 (diff)
downloadspark-7215aa745590a3eec9c1ff35d28194235a550db7.tar.gz
spark-7215aa745590a3eec9c1ff35d28194235a550db7.tar.bz2
spark-7215aa745590a3eec9c1ff35d28194235a550db7.zip
[SPARK-6209] Clean up connections in ExecutorClassLoader after failing to load classes (master branch PR)
ExecutorClassLoader does not ensure proper cleanup of network connections that it opens. If it fails to load a class, it may leak partially-consumed InputStreams that are connected to the REPL's HTTP class server, causing that server to exhaust its thread pool, which can cause the entire job to hang. See [SPARK-6209](https://issues.apache.org/jira/browse/SPARK-6209) for more details, including a bug reproduction. This patch fixes this issue by ensuring proper cleanup of these resources. It also adds logging for unexpected error cases. This PR is an extended version of #4935 and adds a regression test. Author: Josh Rosen <joshrosen@databricks.com> Closes #4944 from JoshRosen/executorclassloader-leak-master-branch and squashes the following commits: e0e3c25 [Josh Rosen] Wrap try block around getReponseCode; re-enable keep-alive by closing error stream 961c284 [Josh Rosen] Roll back changes that were added to get the regression test to fail 7ee2261 [Josh Rosen] Add a failing regression test e2d70a3 [Josh Rosen] Properly clean up after errors in ExecutorClassLoader
Diffstat (limited to 'repl/src/main')
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala85
1 files changed, 67 insertions, 18 deletions
diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index 9805609120..004941d5f5 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -17,9 +17,10 @@
package org.apache.spark.repl
-import java.io.{ByteArrayOutputStream, InputStream, FileNotFoundException}
-import java.net.{URI, URL, URLEncoder}
-import java.util.concurrent.{Executors, ExecutorService}
+import java.io.{IOException, ByteArrayOutputStream, InputStream}
+import java.net.{HttpURLConnection, URI, URL, URLEncoder}
+
+import scala.util.control.NonFatal
import org.apache.hadoop.fs.{FileSystem, Path}
@@ -43,6 +44,9 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader
val parentLoader = new ParentClassLoader(parent)
+ // Allows HTTP connect and read timeouts to be controlled for testing / debugging purposes
+ private[repl] var httpUrlConnectionTimeoutMillis: Int = -1
+
// Hadoop FileSystem object for our URI, if it isn't using HTTP
var fileSystem: FileSystem = {
if (Set("http", "https", "ftp").contains(uri.getScheme)) {
@@ -71,30 +75,66 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader
}
}
+ private def getClassFileInputStreamFromHttpServer(pathInDirectory: String): InputStream = {
+ val url = if (SparkEnv.get.securityManager.isAuthenticationEnabled()) {
+ val uri = new URI(classUri + "/" + urlEncode(pathInDirectory))
+ val newuri = Utils.constructURIForAuthentication(uri, SparkEnv.get.securityManager)
+ newuri.toURL
+ } else {
+ new URL(classUri + "/" + urlEncode(pathInDirectory))
+ }
+ val connection: HttpURLConnection = Utils.setupSecureURLConnection(url.openConnection(),
+ SparkEnv.get.securityManager).asInstanceOf[HttpURLConnection]
+ // Set the connection timeouts (for testing purposes)
+ if (httpUrlConnectionTimeoutMillis != -1) {
+ connection.setConnectTimeout(httpUrlConnectionTimeoutMillis)
+ connection.setReadTimeout(httpUrlConnectionTimeoutMillis)
+ }
+ connection.connect()
+ try {
+ if (connection.getResponseCode != 200) {
+ // Close the error stream so that the connection is eligible for re-use
+ try {
+ connection.getErrorStream.close()
+ } catch {
+ case ioe: IOException =>
+ logError("Exception while closing error stream", ioe)
+ }
+ throw new ClassNotFoundException(s"Class file not found at URL $url")
+ } else {
+ connection.getInputStream
+ }
+ } catch {
+ case NonFatal(e) if !e.isInstanceOf[ClassNotFoundException] =>
+ connection.disconnect()
+ throw e
+ }
+ }
+
+ private def getClassFileInputStreamFromFileSystem(pathInDirectory: String): InputStream = {
+ val path = new Path(directory, pathInDirectory)
+ if (fileSystem.exists(path)) {
+ fileSystem.open(path)
+ } else {
+ throw new ClassNotFoundException(s"Class file not found at path $path")
+ }
+ }
+
def findClassLocally(name: String): Option[Class[_]] = {
+ val pathInDirectory = name.replace('.', '/') + ".class"
+ var inputStream: InputStream = null
try {
- val pathInDirectory = name.replace('.', '/') + ".class"
- val inputStream = {
+ inputStream = {
if (fileSystem != null) {
- fileSystem.open(new Path(directory, pathInDirectory))
+ getClassFileInputStreamFromFileSystem(pathInDirectory)
} else {
- val url = if (SparkEnv.get.securityManager.isAuthenticationEnabled()) {
- val uri = new URI(classUri + "/" + urlEncode(pathInDirectory))
- val newuri = Utils.constructURIForAuthentication(uri, SparkEnv.get.securityManager)
- newuri.toURL
- } else {
- new URL(classUri + "/" + urlEncode(pathInDirectory))
- }
-
- Utils.setupSecureURLConnection(url.openConnection(), SparkEnv.get.securityManager)
- .getInputStream
+ getClassFileInputStreamFromHttpServer(pathInDirectory)
}
}
val bytes = readAndTransformClass(name, inputStream)
- inputStream.close()
Some(defineClass(name, bytes, 0, bytes.length))
} catch {
- case e: FileNotFoundException =>
+ case e: ClassNotFoundException =>
// We did not find the class
logDebug(s"Did not load class $name from REPL class server at $uri", e)
None
@@ -102,6 +142,15 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader
// Something bad happened while checking if the class exists
logError(s"Failed to check existence of class $name on REPL class server at $uri", e)
None
+ } finally {
+ if (inputStream != null) {
+ try {
+ inputStream.close()
+ } catch {
+ case e: Exception =>
+ logError("Exception while closing inputStream", e)
+ }
+ }
}
}