aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala14
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala4
-rw-r--r--external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala2
-rw-r--r--external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala2
-rw-r--r--external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala1
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala7
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala1
8 files changed, 30 insertions, 2 deletions
diff --git a/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
index 8b53d4f14a..f6ac89fc27 100644
--- a/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
@@ -51,6 +51,8 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers {
assert(fakeClassVersion === "1")
val fakeClass2 = classLoader.loadClass("FakeClass2").newInstance()
assert(fakeClass.getClass === fakeClass2.getClass)
+ classLoader.close()
+ parentLoader.close()
}
test("parent first") {
@@ -61,6 +63,8 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers {
assert(fakeClassVersion === "2")
val fakeClass2 = classLoader.loadClass("FakeClass1").newInstance()
assert(fakeClass.getClass === fakeClass2.getClass)
+ classLoader.close()
+ parentLoader.close()
}
test("child first can fall back") {
@@ -69,6 +73,8 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers {
val fakeClass = classLoader.loadClass("FakeClass3").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "2")
+ classLoader.close()
+ parentLoader.close()
}
test("child first can fail") {
@@ -77,6 +83,8 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers {
intercept[java.lang.ClassNotFoundException] {
classLoader.loadClass("FakeClassDoesNotExist").newInstance()
}
+ classLoader.close()
+ parentLoader.close()
}
test("default JDK classloader get resources") {
@@ -84,6 +92,8 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers {
val classLoader = new URLClassLoader(fileUrlsChild, parentLoader)
assert(classLoader.getResources("resource1").asScala.size === 2)
assert(classLoader.getResources("resource2").asScala.size === 1)
+ classLoader.close()
+ parentLoader.close()
}
test("parent first get resources") {
@@ -91,6 +101,8 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers {
val classLoader = new MutableURLClassLoader(fileUrlsChild, parentLoader)
assert(classLoader.getResources("resource1").asScala.size === 2)
assert(classLoader.getResources("resource2").asScala.size === 1)
+ classLoader.close()
+ parentLoader.close()
}
test("child first get resources") {
@@ -103,6 +115,8 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers {
res1.map(scala.io.Source.fromURL(_).mkString) should contain inOrderOnly
("resource1Contents-child", "resource1Contents-parent")
+ classLoader.close()
+ parentLoader.close()
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
index 3d02ce0561..a897cad02f 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
@@ -51,7 +51,8 @@ object LocalFileLR {
showWarning()
- val lines = scala.io.Source.fromFile(args(0)).getLines().toArray
+ val fileSrc = scala.io.Source.fromFile(args(0))
+ val lines = fileSrc.getLines().toArray
val points = lines.map(parsePoint _)
val ITERATIONS = args(1).toInt
@@ -69,6 +70,7 @@ object LocalFileLR {
w -= gradient
}
+ fileSrc.close()
println("Final w: " + w)
}
}
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
index 02aec43c3b..c81836da3c 100644
--- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -272,6 +272,7 @@ class DirectKafkaStreamSuite
collectedData.contains("b")
}
assert(!collectedData.contains("a"))
+ ssc.stop()
}
@@ -324,6 +325,7 @@ class DirectKafkaStreamSuite
collectedData.contains("b")
}
assert(!collectedData.contains("a"))
+ ssc.stop()
}
// Test to verify the offset ranges can be recovered from the checkpoints
diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index ab1c5055a2..8a747a5e29 100644
--- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -184,6 +184,7 @@ class DirectKafkaStreamSuite
collectedData.contains("b")
}
assert(!collectedData.contains("a"))
+ ssc.stop()
}
@@ -230,6 +231,7 @@ class DirectKafkaStreamSuite
collectedData.contains("b")
}
assert(!collectedData.contains("a"))
+ ssc.stop()
}
// Test to verify the offset ranges can be recovered from the checkpoints
diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index 6a35ac14a8..426cd83b4d 100644
--- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -80,5 +80,6 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
assert(result.synchronized { sent === result })
}
+ ssc.stop()
}
}
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 8f2c4fafa0..5d20ec958c 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -609,7 +609,12 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
test("SPARK-11043 check operation log root directory") {
val expectedLine =
"Operation log root directory is created: " + operationLogPath.getAbsoluteFile
- assert(Source.fromFile(logPath).getLines().exists(_.contains(expectedLine)))
+ val bufferSrc = Source.fromFile(logPath)
+ Utils.tryWithSafeFinally {
+ assert(bufferSrc.getLines().exists(_.contains(expectedLine)))
+ } {
+ bufferSrc.close()
+ }
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 41f16bfa5f..a1e9d1e023 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -815,6 +815,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
val ois = new ObjectInputStreamWithLoader(
new ByteArrayInputStream(bos.toByteArray), loader)
assert(ois.readObject().asInstanceOf[Class[_]].getName == "[LtestClz;")
+ ois.close()
}
test("SPARK-11267: the race condition of two checkpoints in a batch") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
index a2dbae149f..5f7f7fa5e6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
@@ -123,6 +123,7 @@ class JobGeneratorSuite extends TestSuiteBase {
assert(getBlocksOfBatch(longBatchTime).nonEmpty, "blocks of incomplete batch already deleted")
assert(batchCounter.getNumCompletedBatches < longBatchNumber)
waitLatch.countDown()
+ ssc.stop()
}
}
}