diff options
author | Bogdan Raducanu <bogdan@databricks.com> | 2017-04-10 17:34:15 +0200 |
---|---|---|
committer | Herman van Hovell <hvanhovell@databricks.com> | 2017-04-10 17:34:15 +0200 |
commit | 4f7d49b955b8c362da29a2540697240f4564d3ee (patch) | |
tree | aedd9f0e77294fe5387a89e3a3d8037b85785811 /core/src | |
parent | 3d7f201f2adc2d33be6f564fa76435c18552f4ba (diff) | |
download | spark-4f7d49b955b8c362da29a2540697240f4564d3ee.tar.gz spark-4f7d49b955b8c362da29a2540697240f4564d3ee.tar.bz2 spark-4f7d49b955b8c362da29a2540697240f4564d3ee.zip |
[SPARK-20243][TESTS] DebugFilesystem.assertNoOpenStreams thread race
## What changes were proposed in this pull request?
Synchronize access to openStreams map.
## How was this patch tested?
Existing tests.
Author: Bogdan Raducanu <bogdan@databricks.com>
Closes #17592 from bogdanrdc/SPARK-20243.
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/test/scala/org/apache/spark/DebugFilesystem.scala | 26 |
1 files changed, 16 insertions, 10 deletions
diff --git a/core/src/test/scala/org/apache/spark/DebugFilesystem.scala b/core/src/test/scala/org/apache/spark/DebugFilesystem.scala index 72aea84111..91355f7362 100644 --- a/core/src/test/scala/org/apache/spark/DebugFilesystem.scala +++ b/core/src/test/scala/org/apache/spark/DebugFilesystem.scala @@ -20,7 +20,6 @@ package org.apache.spark import java.io.{FileDescriptor, InputStream} import java.lang import java.nio.ByteBuffer -import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable @@ -31,21 +30,29 @@ import org.apache.spark.internal.Logging object DebugFilesystem extends Logging { // Stores the set of active streams and their creation sites. - private val openStreams = new ConcurrentHashMap[FSDataInputStream, Throwable]() + private val openStreams = mutable.Map.empty[FSDataInputStream, Throwable] - def clearOpenStreams(): Unit = { + def addOpenStream(stream: FSDataInputStream): Unit = openStreams.synchronized { + openStreams.put(stream, new Throwable()) + } + + def clearOpenStreams(): Unit = openStreams.synchronized { openStreams.clear() } - def assertNoOpenStreams(): Unit = { - val numOpen = openStreams.size() + def removeOpenStream(stream: FSDataInputStream): Unit = openStreams.synchronized { + openStreams.remove(stream) + } + + def assertNoOpenStreams(): Unit = openStreams.synchronized { + val numOpen = openStreams.values.size if (numOpen > 0) { - for (exc <- openStreams.values().asScala) { + for (exc <- openStreams.values) { logWarning("Leaked filesystem connection created at:") exc.printStackTrace() } throw new IllegalStateException(s"There are $numOpen possibly leaked file streams.", - openStreams.values().asScala.head) + openStreams.values.head) } } } @@ -60,8 +67,7 @@ class DebugFilesystem extends LocalFileSystem { override def open(f: Path, bufferSize: Int): FSDataInputStream = { val wrapped: FSDataInputStream = super.open(f, bufferSize) - openStreams.put(wrapped, new Throwable()) - + addOpenStream(wrapped) new FSDataInputStream(wrapped.getWrappedStream) { override def setDropBehind(dropBehind: lang.Boolean): Unit = wrapped.setDropBehind(dropBehind) @@ -98,7 +104,7 @@ class DebugFilesystem extends LocalFileSystem { override def close(): Unit = { wrapped.close() - openStreams.remove(wrapped) + removeOpenStream(wrapped) } override def read(): Int = wrapped.read() |