aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorBogdan Raducanu <bogdan@databricks.com>2017-04-10 17:34:15 +0200
committerHerman van Hovell <hvanhovell@databricks.com>2017-04-10 17:34:15 +0200
commit4f7d49b955b8c362da29a2540697240f4564d3ee (patch)
treeaedd9f0e77294fe5387a89e3a3d8037b85785811 /core
parent3d7f201f2adc2d33be6f564fa76435c18552f4ba (diff)
downloadspark-4f7d49b955b8c362da29a2540697240f4564d3ee.tar.gz
spark-4f7d49b955b8c362da29a2540697240f4564d3ee.tar.bz2
spark-4f7d49b955b8c362da29a2540697240f4564d3ee.zip
[SPARK-20243][TESTS] DebugFilesystem.assertNoOpenStreams thread race
## What changes were proposed in this pull request? Synchronize access to openStreams map. ## How was this patch tested? Existing tests. Author: Bogdan Raducanu <bogdan@databricks.com> Closes #17592 from bogdanrdc/SPARK-20243.
Diffstat (limited to 'core')
-rw-r--r--core/src/test/scala/org/apache/spark/DebugFilesystem.scala26
1 files changed, 16 insertions, 10 deletions
diff --git a/core/src/test/scala/org/apache/spark/DebugFilesystem.scala b/core/src/test/scala/org/apache/spark/DebugFilesystem.scala
index 72aea84111..91355f7362 100644
--- a/core/src/test/scala/org/apache/spark/DebugFilesystem.scala
+++ b/core/src/test/scala/org/apache/spark/DebugFilesystem.scala
@@ -20,7 +20,6 @@ package org.apache.spark
import java.io.{FileDescriptor, InputStream}
import java.lang
import java.nio.ByteBuffer
-import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -31,21 +30,29 @@ import org.apache.spark.internal.Logging
object DebugFilesystem extends Logging {
// Stores the set of active streams and their creation sites.
- private val openStreams = new ConcurrentHashMap[FSDataInputStream, Throwable]()
+ private val openStreams = mutable.Map.empty[FSDataInputStream, Throwable]
- def clearOpenStreams(): Unit = {
+ def addOpenStream(stream: FSDataInputStream): Unit = openStreams.synchronized {
+ openStreams.put(stream, new Throwable())
+ }
+
+ def clearOpenStreams(): Unit = openStreams.synchronized {
openStreams.clear()
}
- def assertNoOpenStreams(): Unit = {
- val numOpen = openStreams.size()
+ def removeOpenStream(stream: FSDataInputStream): Unit = openStreams.synchronized {
+ openStreams.remove(stream)
+ }
+
+ def assertNoOpenStreams(): Unit = openStreams.synchronized {
+ val numOpen = openStreams.values.size
if (numOpen > 0) {
- for (exc <- openStreams.values().asScala) {
+ for (exc <- openStreams.values) {
logWarning("Leaked filesystem connection created at:")
exc.printStackTrace()
}
throw new IllegalStateException(s"There are $numOpen possibly leaked file streams.",
- openStreams.values().asScala.head)
+ openStreams.values.head)
}
}
}
@@ -60,8 +67,7 @@ class DebugFilesystem extends LocalFileSystem {
override def open(f: Path, bufferSize: Int): FSDataInputStream = {
val wrapped: FSDataInputStream = super.open(f, bufferSize)
- openStreams.put(wrapped, new Throwable())
-
+ addOpenStream(wrapped)
new FSDataInputStream(wrapped.getWrappedStream) {
override def setDropBehind(dropBehind: lang.Boolean): Unit = wrapped.setDropBehind(dropBehind)
@@ -98,7 +104,7 @@ class DebugFilesystem extends LocalFileSystem {
override def close(): Unit = {
wrapped.close()
- openStreams.remove(wrapped)
+ removeOpenStream(wrapped)
}
override def read(): Int = wrapped.read()