aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorHyukjin Kwon <gurwls223@gmail.com>2016-11-07 12:47:39 -0800
committerMridul Muralidharan <mridul@gmail.com>2016-11-07 12:47:39 -0800
commit8f0ea011a7294679ec4275b2fef349ef45b6eb81 (patch)
tree9599da11d56741c69b408771fa59255541f0e57d /core/src
parent0d95662e7fff26669d4f70e88fdac7a4128a4f49 (diff)
downloadspark-8f0ea011a7294679ec4275b2fef349ef45b6eb81.tar.gz
spark-8f0ea011a7294679ec4275b2fef349ef45b6eb81.tar.bz2
spark-8f0ea011a7294679ec4275b2fef349ef45b6eb81.zip
[SPARK-14914][CORE] Fix Resource not closed after using, mostly for unit tests
## What changes were proposed in this pull request? Close `FileStreams`, `ZipFiles` etc to release the resources after using. Not closing the resources will cause IO Exception to be raised while deleting temp files. ## How was this patch tested? Existing tests Author: U-FAREAST\tl <tl@microsoft.com> Author: hyukjinkwon <gurwls223@gmail.com> Author: Tao LI <tl@microsoft.com> Closes #15618 from HyukjinKwon/SPARK-14914-1.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala13
-rw-r--r--core/src/test/scala/org/apache/spark/FileSuite.scala13
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala35
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala26
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala7
6 files changed, 66 insertions, 36 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index eac901d100..9f800e3a09 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -239,12 +239,17 @@ private[spark] object ReliableCheckpointRDD extends Logging {
val fs = partitionerFilePath.getFileSystem(sc.hadoopConfiguration)
val fileInputStream = fs.open(partitionerFilePath, bufferSize)
val serializer = SparkEnv.get.serializer.newInstance()
- val deserializeStream = serializer.deserializeStream(fileInputStream)
- val partitioner = Utils.tryWithSafeFinally[Partitioner] {
- deserializeStream.readObject[Partitioner]
+ val partitioner = Utils.tryWithSafeFinally {
+ val deserializeStream = serializer.deserializeStream(fileInputStream)
+ Utils.tryWithSafeFinally {
+ deserializeStream.readObject[Partitioner]
+ } {
+ deserializeStream.close()
+ }
} {
- deserializeStream.close()
+ fileInputStream.close()
}
+
logDebug(s"Read partitioner from $partitionerFilePath")
Some(partitioner)
} catch {
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index cc52bb1d23..89f0b1cb5b 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -58,10 +58,15 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
nums.saveAsTextFile(outputDir)
// Read the plain text file and check it's OK
val outputFile = new File(outputDir, "part-00000")
- val content = Source.fromFile(outputFile).mkString
- assert(content === "1\n2\n3\n4\n")
- // Also try reading it in as a text file RDD
- assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4"))
+ val bufferSrc = Source.fromFile(outputFile)
+ Utils.tryWithSafeFinally {
+ val content = bufferSrc.mkString
+ assert(content === "1\n2\n3\n4\n")
+ // Also try reading it in as a text file RDD
+ assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4"))
+ } {
+ bufferSrc.close()
+ }
}
test("text files (compressed)") {
diff --git a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
index 13cba94578..005587051b 100644
--- a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
@@ -33,7 +33,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkFunSuite
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
-import org.apache.spark.util.ResetSystemProperties
+import org.apache.spark.util.{ResetSystemProperties, Utils}
class RPackageUtilsSuite
extends SparkFunSuite
@@ -74,9 +74,13 @@ class RPackageUtilsSuite
val deps = Seq(dep1, dep2).mkString(",")
IvyTestUtils.withRepository(main, Some(deps), None, withR = true) { repo =>
val jars = Seq(main, dep1, dep2).map(c => new JarFile(getJarPath(c, new File(new URI(repo)))))
- assert(RPackageUtils.checkManifestForR(jars(0)), "should have R code")
- assert(!RPackageUtils.checkManifestForR(jars(1)), "should not have R code")
- assert(!RPackageUtils.checkManifestForR(jars(2)), "should not have R code")
+ Utils.tryWithSafeFinally {
+ assert(RPackageUtils.checkManifestForR(jars(0)), "should have R code")
+ assert(!RPackageUtils.checkManifestForR(jars(1)), "should not have R code")
+ assert(!RPackageUtils.checkManifestForR(jars(2)), "should not have R code")
+ } {
+ jars.foreach(_.close())
+ }
}
}
@@ -131,7 +135,7 @@ class RPackageUtilsSuite
test("SparkR zipping works properly") {
val tempDir = Files.createTempDir()
- try {
+ Utils.tryWithSafeFinally {
IvyTestUtils.writeFile(tempDir, "test.R", "abc")
val fakeSparkRDir = new File(tempDir, "SparkR")
assert(fakeSparkRDir.mkdirs())
@@ -144,14 +148,19 @@ class RPackageUtilsSuite
IvyTestUtils.writeFile(fakePackageDir, "DESCRIPTION", "abc")
val finalZip = RPackageUtils.zipRLibraries(tempDir, "sparkr.zip")
assert(finalZip.exists())
- val entries = new ZipFile(finalZip).entries().asScala.map(_.getName).toSeq
- assert(entries.contains("/test.R"))
- assert(entries.contains("/SparkR/abc.R"))
- assert(entries.contains("/SparkR/DESCRIPTION"))
- assert(!entries.contains("/package.zip"))
- assert(entries.contains("/packageTest/def.R"))
- assert(entries.contains("/packageTest/DESCRIPTION"))
- } finally {
+ val zipFile = new ZipFile(finalZip)
+ Utils.tryWithSafeFinally {
+ val entries = zipFile.entries().asScala.map(_.getName).toSeq
+ assert(entries.contains("/test.R"))
+ assert(entries.contains("/SparkR/abc.R"))
+ assert(entries.contains("/SparkR/DESCRIPTION"))
+ assert(!entries.contains("/package.zip"))
+ assert(entries.contains("/packageTest/def.R"))
+ assert(entries.contains("/packageTest/DESCRIPTION"))
+ } {
+ zipFile.close()
+ }
+ } {
FileUtils.deleteDirectory(tempDir)
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index a5eda7b5a5..2c41c432d1 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -449,8 +449,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream)
val bstream = new BufferedOutputStream(cstream)
if (isNewFormat) {
- EventLoggingListener.initEventLog(new FileOutputStream(file))
+ val newFormatStream = new FileOutputStream(file)
+ Utils.tryWithSafeFinally {
+ EventLoggingListener.initEventLog(newFormatStream)
+ } {
+ newFormatStream.close()
+ }
}
+
val writer = new OutputStreamWriter(bstream, StandardCharsets.UTF_8)
Utils.tryWithSafeFinally {
events.foreach(e => writer.write(compact(render(JsonProtocol.sparkEventToJson(e))) + "\n"))
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 7f4859206e..8a5ec37eeb 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -202,8 +202,6 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
// Make sure expected events exist in the log file.
val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem)
- val logStart = SparkListenerLogStart(SPARK_VERSION)
- val lines = readLines(logData)
val eventSet = mutable.Set(
SparkListenerApplicationStart,
SparkListenerBlockManagerAdded,
@@ -216,19 +214,25 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
SparkListenerTaskStart,
SparkListenerTaskEnd,
SparkListenerApplicationEnd).map(Utils.getFormattedClassName)
- lines.foreach { line =>
- eventSet.foreach { event =>
- if (line.contains(event)) {
- val parsedEvent = JsonProtocol.sparkEventFromJson(parse(line))
- val eventType = Utils.getFormattedClassName(parsedEvent)
- if (eventType == event) {
- eventSet.remove(event)
+ Utils.tryWithSafeFinally {
+ val logStart = SparkListenerLogStart(SPARK_VERSION)
+ val lines = readLines(logData)
+ lines.foreach { line =>
+ eventSet.foreach { event =>
+ if (line.contains(event)) {
+ val parsedEvent = JsonProtocol.sparkEventFromJson(parse(line))
+ val eventType = Utils.getFormattedClassName(parsedEvent)
+ if (eventType == event) {
+ eventSet.remove(event)
+ }
}
}
}
+ assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
+ assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq)
+ } {
+ logData.close()
}
- assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
- assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq)
}
private def readLines(in: InputStream): Seq[String] = {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index 9e472f900b..ee95e4ff7d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -183,9 +183,9 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
// ensure we reset the classloader after the test completes
val originalClassLoader = Thread.currentThread.getContextClassLoader
- try {
+ val loader = new MutableURLClassLoader(new Array[URL](0), originalClassLoader)
+ Utils.tryWithSafeFinally {
// load the exception from the jar
- val loader = new MutableURLClassLoader(new Array[URL](0), originalClassLoader)
loader.addURL(jarFile.toURI.toURL)
Thread.currentThread().setContextClassLoader(loader)
val excClass: Class[_] = Utils.classForName("repro.MyException")
@@ -209,8 +209,9 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
assert(expectedFailure.findFirstMatchIn(exceptionMessage).isDefined)
assert(unknownFailure.findFirstMatchIn(exceptionMessage).isEmpty)
- } finally {
+ } {
Thread.currentThread.setContextClassLoader(originalClassLoader)
+ loader.close()
}
}