aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-09-30 23:51:36 -0700
committerReynold Xin <rxin@databricks.com>2016-09-30 23:51:36 -0700
commit4bcd9b728b8df74756d16b27725c2db7c523d4b2 (patch)
tree4e91f316bcd44cb852d9d9fd80c1e470da8ab673 /sql/core/src
parent15e9bbb49e00b3982c428d39776725d0dea2cdfa (diff)
downloadspark-4bcd9b728b8df74756d16b27725c2db7c523d4b2.tar.gz
spark-4bcd9b728b8df74756d16b27725c2db7c523d4b2.tar.bz2
spark-4bcd9b728b8df74756d16b27725c2db7c523d4b2.zip
[SPARK-17740] Spark tests should mock / interpose HDFS to ensure that streams are closed
## What changes were proposed in this pull request? As a followup to SPARK-17666, ensure filesystem connections are not leaked at least in unit tests. This is done here by intercepting filesystem calls as suggested by JoshRosen . At the end of each test, we assert no filesystem streams are left open. This applies to all tests using SharedSQLContext or SharedSparkContext. ## How was this patch tested? I verified that tests in sql and core are indeed using the filesystem backend, and fixed the detected leaks. I also checked that reverting https://github.com/apache/spark/pull/15245 causes many actual test failures due to connection leaks. Author: Eric Liang <ekl@databricks.com> Author: Eric Liang <ekhliang@gmail.com> Closes #15306 from ericl/sc-4672.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala1
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala19
3 files changed, 19 insertions, 4 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
index c7541889f2..00799301ca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
@@ -104,6 +104,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex
assert(column.getUTF8String(3 * i + 1).toString == i.toString)
assert(column.getUTF8String(3 * i + 2).toString == i.toString)
}
+ reader.close()
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 4259384f0b..9c1d26dcb2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -203,13 +203,14 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
// Open and delete
- fm.open(path)
+ val f1 = fm.open(path)
fm.delete(path)
assert(!fm.exists(path))
intercept[IOException] {
fm.open(path)
}
fm.delete(path) // should not throw exception
+ f1.close()
// Rename
val path1 = new Path(s"$dir/file1")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
index 79c37faa4e..db24ee8b46 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
@@ -17,14 +17,16 @@
package org.apache.spark.sql.test
-import org.apache.spark.SparkConf
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{DebugFilesystem, SparkConf}
import org.apache.spark.sql.{SparkSession, SQLContext}
/**
* Helper trait for SQL test suites where all tests share a single [[TestSparkSession]].
*/
-trait SharedSQLContext extends SQLTestUtils {
+trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
protected val sparkConf = new SparkConf()
@@ -52,7 +54,8 @@ trait SharedSQLContext extends SQLTestUtils {
protected override def beforeAll(): Unit = {
SparkSession.sqlListener.set(null)
if (_spark == null) {
- _spark = new TestSparkSession(sparkConf)
+ _spark = new TestSparkSession(
+ sparkConf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName))
}
// Ensure we have initialized the context before calling parent code
super.beforeAll()
@@ -71,4 +74,14 @@ trait SharedSQLContext extends SQLTestUtils {
super.afterAll()
}
}
+
+ protected override def beforeEach(): Unit = {
+ super.beforeEach()
+ DebugFilesystem.clearOpenStreams()
+ }
+
+ protected override def afterEach(): Unit = {
+ super.afterEach()
+ DebugFilesystem.assertNoOpenStreams()
+ }
}