1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
import java.io.{FileDescriptor, InputStream}
import java.lang
import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.hadoop.fs._
import org.apache.spark.internal.Logging
object DebugFilesystem extends Logging {
// Stores the set of active streams and their creation sites.
private val openStreams = new ConcurrentHashMap[FSDataInputStream, Throwable]()
def clearOpenStreams(): Unit = {
openStreams.clear()
}
def assertNoOpenStreams(): Unit = {
val numOpen = openStreams.size()
if (numOpen > 0) {
for (exc <- openStreams.values().asScala) {
logWarning("Leaked filesystem connection created at:")
exc.printStackTrace()
}
throw new RuntimeException(s"There are $numOpen possibly leaked file streams.")
}
}
}
/**
* DebugFilesystem wraps file open calls to track all open connections. This can be used in tests
* to check that connections are not leaked.
*/
// TODO(ekl) we should consider always interposing this to expose num open conns as a metric
class DebugFilesystem extends LocalFileSystem {
import DebugFilesystem._
override def open(f: Path, bufferSize: Int): FSDataInputStream = {
val wrapped: FSDataInputStream = super.open(f, bufferSize)
openStreams.put(wrapped, new Throwable())
new FSDataInputStream(wrapped.getWrappedStream) {
override def setDropBehind(dropBehind: lang.Boolean): Unit = wrapped.setDropBehind(dropBehind)
override def getWrappedStream: InputStream = wrapped.getWrappedStream
override def getFileDescriptor: FileDescriptor = wrapped.getFileDescriptor
override def getPos: Long = wrapped.getPos
override def seekToNewSource(targetPos: Long): Boolean = wrapped.seekToNewSource(targetPos)
override def seek(desired: Long): Unit = wrapped.seek(desired)
override def setReadahead(readahead: lang.Long): Unit = wrapped.setReadahead(readahead)
override def read(position: Long, buffer: Array[Byte], offset: Int, length: Int): Int =
wrapped.read(position, buffer, offset, length)
override def read(buf: ByteBuffer): Int = wrapped.read(buf)
override def readFully(position: Long, buffer: Array[Byte], offset: Int, length: Int): Unit =
wrapped.readFully(position, buffer, offset, length)
override def readFully(position: Long, buffer: Array[Byte]): Unit =
wrapped.readFully(position, buffer)
override def available(): Int = wrapped.available()
override def mark(readlimit: Int): Unit = wrapped.mark(readlimit)
override def skip(n: Long): Long = wrapped.skip(n)
override def markSupported(): Boolean = wrapped.markSupported()
override def close(): Unit = {
wrapped.close()
openStreams.remove(wrapped)
}
override def read(): Int = wrapped.read()
override def reset(): Unit = wrapped.reset()
override def toString: String = wrapped.toString
override def equals(obj: scala.Any): Boolean = wrapped.equals(obj)
override def hashCode(): Int = wrapped.hashCode()
}
}
}
|