diff options
23 files changed, 181 insertions, 96 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0dcf0307e1..2c1d331b9a 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1488,10 +1488,11 @@ private[spark] object Utils extends Logging { /** Return uncompressed file length of a compressed file. */ private def getCompressedFileLength(file: File): Long = { + var gzInputStream: GZIPInputStream = null try { // Uncompress .gz file to determine file size. var fileSize = 0L - val gzInputStream = new GZIPInputStream(new FileInputStream(file)) + gzInputStream = new GZIPInputStream(new FileInputStream(file)) val bufSize = 1024 val buf = new Array[Byte](bufSize) var numBytes = ByteStreams.read(gzInputStream, buf, 0, bufSize) @@ -1504,6 +1505,10 @@ private[spark] object Utils extends Logging { case e: Throwable => logError(s"Cannot get file length of ${file}", e) throw e + } finally { + if (gzInputStream != null) { + gzInputStream.close() + } } } diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 442a603cae..6027310a96 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -505,7 +505,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4#jar5,file:$cwd/path%20to/jar6") if (Utils.isWindows) { assertResolves("""hdfs:/jar1,file:/jar2,jar3,C:\pi.py#py.pi,C:\path to\jar4""", - s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi,file:/C:/path%20to/jar4") + s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py%23py.pi,file:/C:/path%20to/jar4") } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index fd1689acf6..7e60410c90 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.kafka010 -import java.io.File +import java.io.{File, IOException} import java.lang.{Integer => JInt} import java.net.InetSocketAddress import java.util.{Map => JMap, Properties} @@ -138,10 +138,21 @@ class KafkaTestUtils extends Logging { if (server != null) { server.shutdown() + server.awaitShutdown() server = null } - brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) } + // On Windows, `logDirs` is left open even after Kafka server above is completely shut down + // in some cases. It leads to test failures on Windows if the directory deletion failure + // throws an exception. + brokerConf.logDirs.foreach { f => + try { + Utils.deleteRecursively(new File(f)) + } catch { + case e: IOException if Utils.isWindows => + logWarning(e.getMessage) + } + } if (zkUtils != null) { zkUtils.close() @@ -374,8 +385,21 @@ class KafkaTestUtils extends Logging { def shutdown() { factory.shutdown() - Utils.deleteRecursively(snapshotDir) - Utils.deleteRecursively(logDir) + // The directories are not closed even if the ZooKeeper server is shut down. + // Please see ZOOKEEPER-1844, which is fixed in 3.4.6+. It leads to test failures + // on Windows if the directory deletion failure throws an exception. + try { + Utils.deleteRecursively(snapshotDir) + } catch { + case e: IOException if Utils.isWindows => + logWarning(e.getMessage) + } + try { + Utils.deleteRecursively(logDir) + } catch { + case e: IOException if Utils.isWindows => + logWarning(e.getMessage) + } } } } diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala index e73823e898..8273c2b49f 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.kafka010 -import java.io.File +import java.io.{File, IOException} import java.lang.{Integer => JInt} import java.net.InetSocketAddress import java.util.{Map => JMap, Properties} @@ -134,10 +134,21 @@ private[kafka010] class KafkaTestUtils extends Logging { if (server != null) { server.shutdown() + server.awaitShutdown() server = null } - brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) } + // On Windows, `logDirs` is left open even after Kafka server above is completely shut down + // in some cases. It leads to test failures on Windows if the directory deletion failure + // throws an exception. + brokerConf.logDirs.foreach { f => + try { + Utils.deleteRecursively(new File(f)) + } catch { + case e: IOException if Utils.isWindows => + logWarning(e.getMessage) + } + } if (zkUtils != null) { zkUtils.close() @@ -273,8 +284,21 @@ private[kafka010] class KafkaTestUtils extends Logging { def shutdown() { factory.shutdown() - Utils.deleteRecursively(snapshotDir) - Utils.deleteRecursively(logDir) + // The directories are not closed even if the ZooKeeper server is shut down. + // Please see ZOOKEEPER-1844, which is fixed in 3.4.6+. It leads to test failures + // on Windows if the directory deletion failure throws an exception. + try { + Utils.deleteRecursively(snapshotDir) + } catch { + case e: IOException if Utils.isWindows => + logWarning(e.getMessage) + } + try { + Utils.deleteRecursively(logDir) + } catch { + case e: IOException if Utils.isWindows => + logWarning(e.getMessage) + } } } } diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index fde3714d3d..88a312a189 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -53,7 +53,6 @@ class DirectKafkaStreamSuite .setMaster("local[4]") .setAppName(this.getClass.getSimpleName) - private var sc: SparkContext = _ private var ssc: StreamingContext = _ private var testDir: File = _ @@ -73,11 +72,7 @@ class DirectKafkaStreamSuite after { if (ssc != null) { - ssc.stop() - sc = null - } - if (sc != null) { - sc.stop() + ssc.stop(stopSparkContext = true) } if (testDir != null) { Utils.deleteRecursively(testDir) @@ -372,7 +367,7 @@ class DirectKafkaStreamSuite sendData(i) } - eventually(timeout(10 seconds), interval(50 milliseconds)) { + eventually(timeout(20 seconds), interval(50 milliseconds)) { assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum) } @@ -411,7 +406,7 @@ class DirectKafkaStreamSuite sendData(i) } - eventually(timeout(10 seconds), interval(50 milliseconds)) { + eventually(timeout(20 seconds), interval(50 milliseconds)) { assert(DirectKafkaStreamSuite.total.get === (1 to 20).sum) } ssc.stop() diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index 03c9ca7524..ef1968585b 100644 --- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.kafka -import java.io.File +import java.io.{File, IOException} import java.lang.{Integer => JInt} import java.net.InetSocketAddress import java.util.{Map => JMap, Properties} @@ -137,10 +137,21 @@ private[kafka] class KafkaTestUtils extends Logging { if (server != null) { server.shutdown() + server.awaitShutdown() server = null } - brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) } + // On Windows, `logDirs` is left open even after Kafka server above is completely shut down + // in some cases. It leads to test failures on Windows if the directory deletion failure + // throws an exception. + brokerConf.logDirs.foreach { f => + try { + Utils.deleteRecursively(new File(f)) + } catch { + case e: IOException if Utils.isWindows => + logWarning(e.getMessage) + } + } if (zkClient != null) { zkClient.close() @@ -268,8 +279,21 @@ private[kafka] class KafkaTestUtils extends Logging { def shutdown() { factory.shutdown() - Utils.deleteRecursively(snapshotDir) - Utils.deleteRecursively(logDir) + // The directories are not closed even if the ZooKeeper server is shut down. + // Please see ZOOKEEPER-1844, which is fixed in 3.4.6+. It leads to test failures + // on Windows if the directory deletion failure throws an exception. + try { + Utils.deleteRecursively(snapshotDir) + } catch { + case e: IOException if Utils.isWindows => + logWarning(e.getMessage) + } + try { + Utils.deleteRecursively(logDir) + } catch { + case e: IOException if Utils.isWindows => + logWarning(e.getMessage) + } } } } diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 8a747a5e29..f8b34074f1 100644 --- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -52,7 +52,6 @@ class DirectKafkaStreamSuite .setMaster("local[4]") .setAppName(this.getClass.getSimpleName) - private var sc: SparkContext = _ private var ssc: StreamingContext = _ private var testDir: File = _ @@ -72,11 +71,7 @@ class DirectKafkaStreamSuite after { if (ssc != null) { - ssc.stop() - sc = null - } - if (sc != null) { - sc.stop() + ssc.stop(stopSparkContext = true) } if (testDir != null) { Utils.deleteRecursively(testDir) @@ -276,7 +271,7 @@ class DirectKafkaStreamSuite sendData(i) } - eventually(timeout(10 seconds), interval(50 milliseconds)) { + eventually(timeout(20 seconds), interval(50 milliseconds)) { assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum) } @@ -319,7 +314,7 @@ class DirectKafkaStreamSuite sendData(i) } - eventually(timeout(10 seconds), interval(50 milliseconds)) { + eventually(timeout(20 seconds), interval(50 milliseconds)) { assert(DirectKafkaStreamSuite.total.get === (1 to 20).sum) } ssc.stop() diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala index 7b9aee39ff..57f89cc7db 100644 --- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala +++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala @@ -80,7 +80,7 @@ class ReliableKafkaStreamSuite extends SparkFunSuite after { if (ssc != null) { - ssc.stop() + ssc.stop(stopSparkContext = true) ssc = null } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 012b6ea4c5..ac6c3a89db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -26,6 +26,7 @@ import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal import scala.util.Try +import org.apache.commons.lang3.StringEscapeUtils import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, Row, SparkSession} @@ -222,25 +223,34 @@ case class LoadDataCommand( val loadPath = if (isLocal) { val uri = Utils.resolveURI(path) - val filePath = uri.getPath() - val exists = if (filePath.contains("*")) { + val file = new File(uri.getPath) + val exists = if (file.getAbsolutePath.contains("*")) { val fileSystem = FileSystems.getDefault - val pathPattern = fileSystem.getPath(filePath) - val dir = pathPattern.getParent.toString + val dir = file.getParentFile.getAbsolutePath if (dir.contains("*")) { throw new AnalysisException( s"LOAD DATA input path allows only filename wildcard: $path") } + // Note that special characters such as "*" on Windows are not allowed as a path. + // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. + val dirPath = fileSystem.getPath(dir) + val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath + val safePathPattern = if (Utils.isWindows) { + // On Windows, the pattern should not start with slashes for absolute file paths. + pathPattern.stripPrefix("/") + } else { + pathPattern + } val files = new File(dir).listFiles() if (files == null) { false } else { - val matcher = fileSystem.getPathMatcher("glob:" + pathPattern.toAbsolutePath) + val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) } } else { - new File(filePath).exists() + new File(file.getAbsolutePath).exists() } if (!exists) { throw new AnalysisException(s"LOAD DATA input path does not exist: $path") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index a8dd5102b7..dcb8e498a4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -165,7 +165,7 @@ private[hive] class TestHiveSparkSession( System.clearProperty("spark.hostPort") // For some hive test case which contain ${system:test.tmp.dir} - System.setProperty("test.tmp.dir", Utils.createTempDir().getCanonicalPath) + System.setProperty("test.tmp.dir", Utils.createTempDir().toURI.getPath) /** The location of the compiled hive distribution */ lazy val hiveHome = envVarToFile("HIVE_HOME") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index 9aa9ebf1aa..8f0d5d886c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -339,7 +339,13 @@ class HiveSparkSubmitSuite private def runSparkSubmit(args: Seq[String]): Unit = { val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) val history = ArrayBuffer.empty[String] - val commands = Seq("./bin/spark-submit") ++ args + val sparkSubmit = if (Utils.isWindows) { + // On Windows, `ProcessBuilder.directory` does not change the current working directory. + new File("..\\..\\bin\\spark-submit.cmd").getAbsolutePath + } else { + "./bin/spark-submit" + } + val commands = Seq(sparkSubmit) ++ args val commandLine = commands.mkString("'", "' '", "'") val builder = new ProcessBuilder(commands: _*).directory(new File(sparkHome)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 13ef79e3b7..081f6f6d82 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -1071,11 +1071,9 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv test("CTAS: persisted bucketed data source table") { withTempPath { dir => withTable("t") { - val path = dir.getCanonicalPath - sql( s"""CREATE TABLE t USING PARQUET - |OPTIONS (PATH '$path') + |OPTIONS (PATH '${dir.toURI}') |CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS |AS SELECT 1 AS a, 2 AS b """.stripMargin @@ -1093,11 +1091,9 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv withTempPath { dir => withTable("t") { - val path = dir.getCanonicalPath - sql( s"""CREATE TABLE t USING PARQUET - |OPTIONS (PATH '$path') + |OPTIONS (PATH '${dir.toURI}') |CLUSTERED BY (a) INTO 2 BUCKETS |AS SELECT 1 AS a, 2 AS b """.stripMargin diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala index 44233cfbf0..dca207a72d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala @@ -172,7 +172,7 @@ class PartitionProviderCompatibilitySuite withTempDir { dir2 => sql( s"""alter table test partition (partCol=1) - |set location '${dir2.getAbsolutePath}'""".stripMargin) + |set location '${dir2.toURI}'""".stripMargin) assert(sql("select * from test").count() == 4) sql( """insert overwrite table test diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala index 55b72c625d..70750c4e05 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala @@ -77,7 +77,7 @@ class PartitionedTablePerfStatsSuite |create external table $tableName (fieldOne long) |partitioned by (partCol1 int, partCol2 int) |stored as parquet - |location "${dir.getAbsolutePath}"""".stripMargin) + |location "${dir.toURI}"""".stripMargin) if (repair) { spark.sql(s"msck repair table $tableName") } @@ -102,7 +102,7 @@ class PartitionedTablePerfStatsSuite spark.sql(s""" |create table $tableName (fieldOne long, partCol1 int, partCol2 int) |using parquet - |options (path "${dir.getAbsolutePath}") + |options (path "${dir.toURI}") |partitioned by (partCol1, partCol2)""".stripMargin) if (repair) { spark.sql(s"msck repair table $tableName") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala index 68df809434..cc26b32184 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala @@ -146,7 +146,7 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing | c1 INT COMMENT 'bla', | c2 STRING |) - |LOCATION '$dir' + |LOCATION '${dir.toURI}' |TBLPROPERTIES ( | 'prop1' = 'value1', | 'prop2' = 'value2' diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 8803ea36de..b040f26d28 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -57,7 +57,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto \"separatorChar\" = \",\", \"quoteChar\" = \"\\\"\", \"escapeChar\" = \"\\\\\") - LOCATION '$tempDir' + LOCATION '${tempDir.toURI}' """) spark.conf.set(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key, true) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 77285282a6..0af331e67b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -87,7 +87,7 @@ class HiveDDLSuite s""" |create table $tabName |stored as parquet - |location '$tmpDir' + |location '${tmpDir.toURI}' |as select 1, '3' """.stripMargin) @@ -269,7 +269,7 @@ class HiveDDLSuite s""" |CREATE EXTERNAL TABLE $externalTab (key INT, value STRING) |PARTITIONED BY (ds STRING, hr STRING) - |LOCATION '$basePath' + |LOCATION '${tmpDir.toURI}' """.stripMargin) // Before data insertion, all the directory are empty @@ -678,14 +678,10 @@ class HiveDDLSuite } else { assert(!fs.exists(new Path(tmpDir.toString))) } - sql(s"CREATE DATABASE $dbName Location '$tmpDir'") + sql(s"CREATE DATABASE $dbName Location '${tmpDir.toURI.getPath.stripSuffix("/")}'") val db1 = catalog.getDatabaseMetadata(dbName) - val dbPath = "file:" + tmpDir - assert(db1 == CatalogDatabase( - dbName, - "", - if (dbPath.endsWith(File.separator)) dbPath.dropRight(1) else dbPath, - Map.empty)) + val dbPath = tmpDir.toURI.toString.stripSuffix("/") + assert(db1 == CatalogDatabase(dbName, "", dbPath, Map.empty)) sql("USE db1") sql(s"CREATE TABLE $tabName as SELECT 1") @@ -713,10 +709,6 @@ class HiveDDLSuite } } - private def appendTrailingSlash(path: String): String = { - if (!path.endsWith(File.separator)) path + File.separator else path - } - private def dropDatabase(cascade: Boolean, tableExists: Boolean): Unit = { val dbName = "db1" val dbPath = new Path(spark.sessionState.conf.warehousePath) @@ -724,7 +716,7 @@ class HiveDDLSuite sql(s"CREATE DATABASE $dbName") val catalog = spark.sessionState.catalog - val expectedDBLocation = "file:" + appendTrailingSlash(dbPath.toString) + s"$dbName.db" + val expectedDBLocation = s"file:${dbPath.toUri.getPath.stripSuffix("/")}/$dbName.db" val db1 = catalog.getDatabaseMetadata(dbName) assert(db1 == CatalogDatabase( dbName, @@ -857,7 +849,7 @@ class HiveDDLSuite val path = dir.getCanonicalPath spark.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd) .write.format("parquet").save(path) - sql(s"CREATE TABLE $sourceTabName USING parquet OPTIONS (PATH '$path')") + sql(s"CREATE TABLE $sourceTabName USING parquet OPTIONS (PATH '${dir.toURI}')") sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName") // The source table should be an external data source table @@ -894,7 +886,7 @@ class HiveDDLSuite test("CREATE TABLE LIKE an external Hive serde table") { val catalog = spark.sessionState.catalog withTempDir { tmpDir => - val basePath = tmpDir.getCanonicalPath + val basePath = tmpDir.toURI val sourceTabName = "tab1" val targetTabName = "tab2" withTable(sourceTabName, targetTabName) { @@ -1112,7 +1104,7 @@ class HiveDDLSuite Seq("parquet", "json", "orc").foreach { fileFormat => withTable("t1") { withTempPath { dir => - val path = dir.getCanonicalPath + val path = dir.toURI.toString spark.range(1).write.format(fileFormat).save(path) sql(s"CREATE TABLE t1 USING $fileFormat OPTIONS (PATH '$path')") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index f65b5f4daa..f47cf4a9c6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -126,7 +126,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { s""" |CREATE FUNCTION udtf_count_temp |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2' - |USING JAR '${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}' + |USING JAR '${hiveContext.getHiveFile("TestUDTF.jar").toURI}' """.stripMargin) checkAnswer( @@ -321,7 +321,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { s""" |CREATE FUNCTION udtf_count |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2' - |USING JAR '${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}' + |USING JAR '${hiveContext.getHiveFile("TestUDTF.jar").toURI}' """.stripMargin) checkKeywordsExist(sql("describe function udtf_count"), @@ -644,7 +644,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { withTempDir { dir => val defaultDataSource = sessionState.conf.defaultDataSourceName - val tempLocation = dir.getCanonicalPath + val tempLocation = dir.toURI.getPath.stripSuffix("/") sql(s"CREATE TABLE ctas1 LOCATION 'file:$tempLocation/c1'" + " AS SELECT key k, value FROM src ORDER BY k, value") checkRelation("ctas1", true, defaultDataSource, Some(s"file:$tempLocation/c1")) @@ -1953,16 +1953,18 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { test("SPARK-17796 Support wildcard character in filename for LOAD DATA LOCAL INPATH") { withTempDir { dir => + val path = dir.toURI.toString.stripSuffix("/") + val dirPath = dir.getAbsoluteFile for (i <- 1 to 3) { - Files.write(s"$i", new File(s"$dir/part-r-0000$i"), StandardCharsets.UTF_8) + Files.write(s"$i", new File(dirPath, s"part-r-0000$i"), StandardCharsets.UTF_8) } for (i <- 5 to 7) { - Files.write(s"$i", new File(s"$dir/part-s-0000$i"), StandardCharsets.UTF_8) + Files.write(s"$i", new File(dirPath, s"part-s-0000$i"), StandardCharsets.UTF_8) } withTable("load_t") { sql("CREATE TABLE load_t (a STRING)") - sql(s"LOAD DATA LOCAL INPATH '$dir/*part-r*' INTO TABLE load_t") + sql(s"LOAD DATA LOCAL INPATH '$path/*part-r*' INTO TABLE load_t") checkAnswer(sql("SELECT * FROM load_t"), Seq(Row("1"), Row("2"), Row("3"))) val m = intercept[AnalysisException] { @@ -1971,7 +1973,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { assert(m.contains("LOAD DATA input path does not exist")) val m2 = intercept[AnalysisException] { - sql(s"LOAD DATA LOCAL INPATH '$dir*/*part*' INTO TABLE load_t") + sql(s"LOAD DATA LOCAL INPATH '$path*/*part*' INTO TABLE load_t") }.getMessage assert(m2.contains("LOAD DATA input path allows only filename wildcard")) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala index 0ff3511c87..a20c758a83 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala @@ -43,7 +43,7 @@ class WindowQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleto | p_retailprice DOUBLE, | p_comment STRING) """.stripMargin) - val testData1 = TestHive.getHiveFile("data/files/part_tiny.txt").getCanonicalPath + val testData1 = TestHive.getHiveFile("data/files/part_tiny.txt").toURI sql( s""" |LOAD DATA LOCAL INPATH '$testData1' overwrite into table part diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala index 2b40469051..fe1e17dd08 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala @@ -57,7 +57,7 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA | stringField STRING |) |STORED AS ORC - |LOCATION '${orcTableAsDir.getCanonicalPath}' + |LOCATION '${orcTableAsDir.toURI}' """.stripMargin) sql( @@ -172,7 +172,7 @@ class OrcSourceSuite extends OrcSuite { s"""CREATE TEMPORARY VIEW normal_orc_source |USING org.apache.spark.sql.hive.orc |OPTIONS ( - | PATH '${new File(orcTableAsDir.getAbsolutePath).getCanonicalPath}' + | PATH '${new File(orcTableAsDir.getAbsolutePath).toURI}' |) """.stripMargin) @@ -180,7 +180,7 @@ class OrcSourceSuite extends OrcSuite { s"""CREATE TEMPORARY VIEW normal_orc_as_source |USING org.apache.spark.sql.hive.orc |OPTIONS ( - | PATH '${new File(orcTableAsDir.getAbsolutePath).getCanonicalPath}' + | PATH '${new File(orcTableAsDir.getAbsolutePath).toURI}' |) """.stripMargin) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 2ce60fe589..d3e04d4bf2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -81,7 +81,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' - location '${partitionedTableDir.getCanonicalPath}' + location '${partitionedTableDir.toURI}' """) sql(s""" @@ -95,7 +95,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' - location '${partitionedTableDirWithKey.getCanonicalPath}' + location '${partitionedTableDirWithKey.toURI}' """) sql(s""" @@ -108,7 +108,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' - location '${new File(normalTableDir, "normal").getCanonicalPath}' + location '${new File(normalTableDir, "normal").toURI}' """) sql(s""" @@ -124,7 +124,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' - LOCATION '${partitionedTableDirWithComplexTypes.getCanonicalPath}' + LOCATION '${partitionedTableDirWithComplexTypes.toURI}' """) sql(s""" @@ -140,7 +140,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' - LOCATION '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}' + LOCATION '${partitionedTableDirWithKeyAndComplexTypes.toURI}' """) sql( @@ -561,7 +561,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { test("SPARK-15248: explicitly added partitions should be readable") { withTable("test_added_partitions", "test_temp") { withTempDir { src => - val partitionDir = new File(src, "partition").getCanonicalPath + val partitionDir = new File(src, "partition").toURI sql( """ |CREATE TABLE test_added_partitions (a STRING) @@ -636,7 +636,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest { CREATE TEMPORARY VIEW partitioned_parquet USING org.apache.spark.sql.parquet OPTIONS ( - path '${partitionedTableDir.getCanonicalPath}' + path '${partitionedTableDir.toURI}' ) """) @@ -644,7 +644,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest { CREATE TEMPORARY VIEW partitioned_parquet_with_key USING org.apache.spark.sql.parquet OPTIONS ( - path '${partitionedTableDirWithKey.getCanonicalPath}' + path '${partitionedTableDirWithKey.toURI}' ) """) @@ -652,7 +652,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest { CREATE TEMPORARY VIEW normal_parquet USING org.apache.spark.sql.parquet OPTIONS ( - path '${new File(partitionedTableDir, "p=1").getCanonicalPath}' + path '${new File(partitionedTableDir, "p=1").toURI}' ) """) @@ -660,7 +660,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest { CREATE TEMPORARY VIEW partitioned_parquet_with_key_and_complextypes USING org.apache.spark.sql.parquet OPTIONS ( - path '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}' + path '${partitionedTableDirWithKeyAndComplexTypes.toURI}' ) """) @@ -668,7 +668,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest { CREATE TEMPORARY VIEW partitioned_parquet_with_complextypes USING org.apache.spark.sql.parquet OPTIONS ( - path '${partitionedTableDirWithComplexTypes.getCanonicalPath}' + path '${partitionedTableDirWithComplexTypes.toURI}' ) """) } @@ -701,8 +701,6 @@ class ParquetSourceSuite extends ParquetPartitioningTest { test("SPARK-8811: compatibility with array of struct in Hive") { withTempPath { dir => - val path = dir.getCanonicalPath - withTable("array_of_struct") { val conf = Seq( HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false", @@ -712,7 +710,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest { withSQLConf(conf: _*) { sql( s"""CREATE TABLE array_of_struct - |STORED AS PARQUET LOCATION '$path' + |STORED AS PARQUET LOCATION '${dir.toURI}' |AS SELECT | '1st' AS a, | '2nd' AS b, @@ -720,7 +718,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest { """.stripMargin) checkAnswer( - spark.read.parquet(path), + spark.read.parquet(dir.getCanonicalPath), Row("1st", "2nd", Seq(Row("val_a", "val_b")))) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 5ba09a54af..eca7c79465 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -175,6 +175,12 @@ private[streaming] class ReceiverSupervisorImpl( } override protected def onStop(message: String, error: Option[Throwable]) { + receivedBlockHandler match { + case handler: WriteAheadLogBasedBlockHandler => + // Write ahead log should be closed. + handler.stop() + case _ => + } registeredBlockGenerators.asScala.foreach { _.stop() } env.rpcEnv.stop(endpoint) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index a1e9d1e023..7fcf45e7de 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming -import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, ObjectOutputStream} +import java.io._ import java.nio.charset.StandardCharsets import java.util.concurrent.ConcurrentLinkedQueue @@ -629,7 +629,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]] val filenames = fileInputDStream.batchTimeToSelectedFiles.synchronized { fileInputDStream.batchTimeToSelectedFiles.values.flatten } - filenames.map(_.split(File.separator).last.toInt).toSeq.sorted + filenames.map(_.split("/").last.toInt).toSeq.sorted } try { @@ -755,7 +755,15 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester assert(outputBuffer.asScala.flatten.toSet === expectedOutput.toSet) } } finally { - Utils.deleteRecursively(testDir) + try { + // As the driver shuts down in the middle of processing and the thread above sleeps + // for a while, `testDir` can be not closed correctly at this point which causes the + // test failure on Windows. + Utils.deleteRecursively(testDir) + } catch { + case e: IOException if Utils.isWindows => + logWarning(e.getMessage) + } } } |