aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-09-30 23:51:36 -0700
committerReynold Xin <rxin@databricks.com>2016-09-30 23:51:36 -0700
commit4bcd9b728b8df74756d16b27725c2db7c523d4b2 (patch)
tree4e91f316bcd44cb852d9d9fd80c1e470da8ab673 /core
parent15e9bbb49e00b3982c428d39776725d0dea2cdfa (diff)
downloadspark-4bcd9b728b8df74756d16b27725c2db7c523d4b2.tar.gz
spark-4bcd9b728b8df74756d16b27725c2db7c523d4b2.tar.bz2
spark-4bcd9b728b8df74756d16b27725c2db7c523d4b2.zip
[SPARK-17740] Spark tests should mock / interpose HDFS to ensure that streams are closed
## What changes were proposed in this pull request? As a followup to SPARK-17666, ensure filesystem connections are not leaked at least in unit tests. This is done here by intercepting filesystem calls as suggested by JoshRosen . At the end of each test, we assert no filesystem streams are left open. This applies to all tests using SharedSQLContext or SharedSparkContext. ## How was this patch tested? I verified that tests in sql and core are indeed using the filesystem backend, and fixed the detected leaks. I also checked that reverting https://github.com/apache/spark/pull/15245 causes many actual test failures due to connection leaks. Author: Eric Liang <ekl@databricks.com> Author: Eric Liang <ekhliang@gmail.com> Closes #15306 from ericl/sc-4672.
Diffstat (limited to 'core')
-rw-r--r--core/src/test/scala/org/apache/spark/DebugFilesystem.scala114
-rw-r--r--core/src/test/scala/org/apache/spark/SharedSparkContext.scala17
2 files changed, 128 insertions, 3 deletions
diff --git a/core/src/test/scala/org/apache/spark/DebugFilesystem.scala b/core/src/test/scala/org/apache/spark/DebugFilesystem.scala
new file mode 100644
index 0000000000..fb8d701ebd
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/DebugFilesystem.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.{FileDescriptor, InputStream}
+import java.lang
+import java.nio.ByteBuffer
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.fs._
+
+import org.apache.spark.internal.Logging
+
+object DebugFilesystem extends Logging {
+ // Stores the set of active streams and their creation sites.
+ private val openStreams = new ConcurrentHashMap[FSDataInputStream, Throwable]()
+
+ def clearOpenStreams(): Unit = {
+ openStreams.clear()
+ }
+
+ def assertNoOpenStreams(): Unit = {
+ val numOpen = openStreams.size()
+ if (numOpen > 0) {
+ for (exc <- openStreams.values().asScala) {
+ logWarning("Leaked filesystem connection created at:")
+ exc.printStackTrace()
+ }
+ throw new RuntimeException(s"There are $numOpen possibly leaked file streams.")
+ }
+ }
+}
+
+/**
+ * DebugFilesystem wraps file open calls to track all open connections. This can be used in tests
+ * to check that connections are not leaked.
+ */
+// TODO(ekl) we should consider always interposing this to expose num open conns as a metric
+class DebugFilesystem extends LocalFileSystem {
+ import DebugFilesystem._
+
+ override def open(f: Path, bufferSize: Int): FSDataInputStream = {
+ val wrapped: FSDataInputStream = super.open(f, bufferSize)
+ openStreams.put(wrapped, new Throwable())
+
+ new FSDataInputStream(wrapped.getWrappedStream) {
+ override def setDropBehind(dropBehind: lang.Boolean): Unit = wrapped.setDropBehind(dropBehind)
+
+ override def getWrappedStream: InputStream = wrapped.getWrappedStream
+
+ override def getFileDescriptor: FileDescriptor = wrapped.getFileDescriptor
+
+ override def getPos: Long = wrapped.getPos
+
+ override def seekToNewSource(targetPos: Long): Boolean = wrapped.seekToNewSource(targetPos)
+
+ override def seek(desired: Long): Unit = wrapped.seek(desired)
+
+ override def setReadahead(readahead: lang.Long): Unit = wrapped.setReadahead(readahead)
+
+ override def read(position: Long, buffer: Array[Byte], offset: Int, length: Int): Int =
+ wrapped.read(position, buffer, offset, length)
+
+ override def read(buf: ByteBuffer): Int = wrapped.read(buf)
+
+ override def readFully(position: Long, buffer: Array[Byte], offset: Int, length: Int): Unit =
+ wrapped.readFully(position, buffer, offset, length)
+
+ override def readFully(position: Long, buffer: Array[Byte]): Unit =
+ wrapped.readFully(position, buffer)
+
+ override def available(): Int = wrapped.available()
+
+ override def mark(readlimit: Int): Unit = wrapped.mark(readlimit)
+
+ override def skip(n: Long): Long = wrapped.skip(n)
+
+ override def markSupported(): Boolean = wrapped.markSupported()
+
+ override def close(): Unit = {
+ wrapped.close()
+ openStreams.remove(wrapped)
+ }
+
+ override def read(): Int = wrapped.read()
+
+ override def reset(): Unit = wrapped.reset()
+
+ override def toString: String = wrapped.toString
+
+ override def equals(obj: scala.Any): Boolean = wrapped.equals(obj)
+
+ override def hashCode(): Int = wrapped.hashCode()
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
index 858bc742e0..6aedcb1271 100644
--- a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
@@ -17,11 +17,11 @@
package org.apache.spark
-import org.scalatest.BeforeAndAfterAll
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.Suite
/** Shares a local `SparkContext` between all tests in a suite and closes it at the end */
-trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
+trait SharedSparkContext extends BeforeAndAfterAll with BeforeAndAfterEach { self: Suite =>
@transient private var _sc: SparkContext = _
@@ -31,7 +31,8 @@ trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
override def beforeAll() {
super.beforeAll()
- _sc = new SparkContext("local[4]", "test", conf)
+ _sc = new SparkContext(
+ "local[4]", "test", conf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName))
}
override def afterAll() {
@@ -42,4 +43,14 @@ trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
super.afterAll()
}
}
+
+ protected override def beforeEach(): Unit = {
+ super.beforeEach()
+ DebugFilesystem.clearOpenStreams()
+ }
+
+ protected override def afterEach(): Unit = {
+ super.afterEach()
+ DebugFilesystem.assertNoOpenStreams()
+ }
}