From 4bcd9b728b8df74756d16b27725c2db7c523d4b2 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 30 Sep 2016 23:51:36 -0700 Subject: [SPARK-17740] Spark tests should mock / interpose HDFS to ensure that streams are closed ## What changes were proposed in this pull request? As a followup to SPARK-17666, ensure filesystem connections are not leaked at least in unit tests. This is done here by intercepting filesystem calls as suggested by JoshRosen . At the end of each test, we assert no filesystem streams are left open. This applies to all tests using SharedSQLContext or SharedSparkContext. ## How was this patch tested? I verified that tests in sql and core are indeed using the filesystem backend, and fixed the detected leaks. I also checked that reverting https://github.com/apache/spark/pull/15245 causes many actual test failures due to connection leaks. Author: Eric Liang Author: Eric Liang Closes #15306 from ericl/sc-4672. --- .../datasources/parquet/ParquetEncodingSuite.scala | 1 + .../execution/streaming/HDFSMetadataLogSuite.scala | 3 ++- .../org/apache/spark/sql/test/SharedSQLContext.scala | 19 ++++++++++++++++--- 3 files changed, 19 insertions(+), 4 deletions(-) (limited to 'sql/core/src') diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala index c7541889f2..00799301ca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala @@ -104,6 +104,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex assert(column.getUTF8String(3 * i + 1).toString == i.toString) assert(column.getUTF8String(3 * i + 2).toString == i.toString) } + reader.close() } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index 4259384f0b..9c1d26dcb2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -203,13 +203,14 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { } // Open and delete - fm.open(path) + val f1 = fm.open(path) fm.delete(path) assert(!fm.exists(path)) intercept[IOException] { fm.open(path) } fm.delete(path) // should not throw exception + f1.close() // Rename val path1 = new Path(s"$dir/file1") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala index 79c37faa4e..db24ee8b46 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala @@ -17,14 +17,16 @@ package org.apache.spark.sql.test -import org.apache.spark.SparkConf +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{DebugFilesystem, SparkConf} import org.apache.spark.sql.{SparkSession, SQLContext} /** * Helper trait for SQL test suites where all tests share a single [[TestSparkSession]]. */ -trait SharedSQLContext extends SQLTestUtils { +trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach { protected val sparkConf = new SparkConf() @@ -52,7 +54,8 @@ trait SharedSQLContext extends SQLTestUtils { protected override def beforeAll(): Unit = { SparkSession.sqlListener.set(null) if (_spark == null) { - _spark = new TestSparkSession(sparkConf) + _spark = new TestSparkSession( + sparkConf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)) } // Ensure we have initialized the context before calling parent code super.beforeAll() @@ -71,4 +74,14 @@ trait SharedSQLContext extends SQLTestUtils { super.afterAll() } } + + protected override def beforeEach(): Unit = { + super.beforeEach() + DebugFilesystem.clearOpenStreams() + } + + protected override def afterEach(): Unit = { + super.afterEach() + DebugFilesystem.assertNoOpenStreams() + } } -- cgit v1.2.3