aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala2
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala32
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala32
-rw-r--r--external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala11
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala32
-rw-r--r--external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala11
-rw-r--r--external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala22
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala26
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala16
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala28
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala14
23 files changed, 181 insertions, 96 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 0dcf0307e1..2c1d331b9a 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1488,10 +1488,11 @@ private[spark] object Utils extends Logging {
/** Return uncompressed file length of a compressed file. */
private def getCompressedFileLength(file: File): Long = {
+ var gzInputStream: GZIPInputStream = null
try {
// Uncompress .gz file to determine file size.
var fileSize = 0L
- val gzInputStream = new GZIPInputStream(new FileInputStream(file))
+ gzInputStream = new GZIPInputStream(new FileInputStream(file))
val bufSize = 1024
val buf = new Array[Byte](bufSize)
var numBytes = ByteStreams.read(gzInputStream, buf, 0, bufSize)
@@ -1504,6 +1505,10 @@ private[spark] object Utils extends Logging {
case e: Throwable =>
logError(s"Cannot get file length of ${file}", e)
throw e
+ } finally {
+ if (gzInputStream != null) {
+ gzInputStream.close()
+ }
}
}
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 442a603cae..6027310a96 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -505,7 +505,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4#jar5,file:$cwd/path%20to/jar6")
if (Utils.isWindows) {
assertResolves("""hdfs:/jar1,file:/jar2,jar3,C:\pi.py#py.pi,C:\path to\jar4""",
- s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi,file:/C:/path%20to/jar4")
+ s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py%23py.pi,file:/C:/path%20to/jar4")
}
}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index fd1689acf6..7e60410c90 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.kafka010
-import java.io.File
+import java.io.{File, IOException}
import java.lang.{Integer => JInt}
import java.net.InetSocketAddress
import java.util.{Map => JMap, Properties}
@@ -138,10 +138,21 @@ class KafkaTestUtils extends Logging {
if (server != null) {
server.shutdown()
+ server.awaitShutdown()
server = null
}
- brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
+ // On Windows, `logDirs` is left open even after Kafka server above is completely shut down
+ // in some cases. It leads to test failures on Windows if the directory deletion failure
+ // throws an exception.
+ brokerConf.logDirs.foreach { f =>
+ try {
+ Utils.deleteRecursively(new File(f))
+ } catch {
+ case e: IOException if Utils.isWindows =>
+ logWarning(e.getMessage)
+ }
+ }
if (zkUtils != null) {
zkUtils.close()
@@ -374,8 +385,21 @@ class KafkaTestUtils extends Logging {
def shutdown() {
factory.shutdown()
- Utils.deleteRecursively(snapshotDir)
- Utils.deleteRecursively(logDir)
+ // The directories are not closed even if the ZooKeeper server is shut down.
+ // Please see ZOOKEEPER-1844, which is fixed in 3.4.6+. It leads to test failures
+ // on Windows if the directory deletion failure throws an exception.
+ try {
+ Utils.deleteRecursively(snapshotDir)
+ } catch {
+ case e: IOException if Utils.isWindows =>
+ logWarning(e.getMessage)
+ }
+ try {
+ Utils.deleteRecursively(logDir)
+ } catch {
+ case e: IOException if Utils.isWindows =>
+ logWarning(e.getMessage)
+ }
}
}
}
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
index e73823e898..8273c2b49f 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.kafka010
-import java.io.File
+import java.io.{File, IOException}
import java.lang.{Integer => JInt}
import java.net.InetSocketAddress
import java.util.{Map => JMap, Properties}
@@ -134,10 +134,21 @@ private[kafka010] class KafkaTestUtils extends Logging {
if (server != null) {
server.shutdown()
+ server.awaitShutdown()
server = null
}
- brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
+ // On Windows, `logDirs` is left open even after Kafka server above is completely shut down
+ // in some cases. It leads to test failures on Windows if the directory deletion failure
+ // throws an exception.
+ brokerConf.logDirs.foreach { f =>
+ try {
+ Utils.deleteRecursively(new File(f))
+ } catch {
+ case e: IOException if Utils.isWindows =>
+ logWarning(e.getMessage)
+ }
+ }
if (zkUtils != null) {
zkUtils.close()
@@ -273,8 +284,21 @@ private[kafka010] class KafkaTestUtils extends Logging {
def shutdown() {
factory.shutdown()
- Utils.deleteRecursively(snapshotDir)
- Utils.deleteRecursively(logDir)
+ // The directories are not closed even if the ZooKeeper server is shut down.
+ // Please see ZOOKEEPER-1844, which is fixed in 3.4.6+. It leads to test failures
+ // on Windows if the directory deletion failure throws an exception.
+ try {
+ Utils.deleteRecursively(snapshotDir)
+ } catch {
+ case e: IOException if Utils.isWindows =>
+ logWarning(e.getMessage)
+ }
+ try {
+ Utils.deleteRecursively(logDir)
+ } catch {
+ case e: IOException if Utils.isWindows =>
+ logWarning(e.getMessage)
+ }
}
}
}
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
index fde3714d3d..88a312a189 100644
--- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -53,7 +53,6 @@ class DirectKafkaStreamSuite
.setMaster("local[4]")
.setAppName(this.getClass.getSimpleName)
- private var sc: SparkContext = _
private var ssc: StreamingContext = _
private var testDir: File = _
@@ -73,11 +72,7 @@ class DirectKafkaStreamSuite
after {
if (ssc != null) {
- ssc.stop()
- sc = null
- }
- if (sc != null) {
- sc.stop()
+ ssc.stop(stopSparkContext = true)
}
if (testDir != null) {
Utils.deleteRecursively(testDir)
@@ -372,7 +367,7 @@ class DirectKafkaStreamSuite
sendData(i)
}
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ eventually(timeout(20 seconds), interval(50 milliseconds)) {
assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum)
}
@@ -411,7 +406,7 @@ class DirectKafkaStreamSuite
sendData(i)
}
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ eventually(timeout(20 seconds), interval(50 milliseconds)) {
assert(DirectKafkaStreamSuite.total.get === (1 to 20).sum)
}
ssc.stop()
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
index 03c9ca7524..ef1968585b 100644
--- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.kafka
-import java.io.File
+import java.io.{File, IOException}
import java.lang.{Integer => JInt}
import java.net.InetSocketAddress
import java.util.{Map => JMap, Properties}
@@ -137,10 +137,21 @@ private[kafka] class KafkaTestUtils extends Logging {
if (server != null) {
server.shutdown()
+ server.awaitShutdown()
server = null
}
- brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
+ // On Windows, `logDirs` is left open even after Kafka server above is completely shut down
+ // in some cases. It leads to test failures on Windows if the directory deletion failure
+ // throws an exception.
+ brokerConf.logDirs.foreach { f =>
+ try {
+ Utils.deleteRecursively(new File(f))
+ } catch {
+ case e: IOException if Utils.isWindows =>
+ logWarning(e.getMessage)
+ }
+ }
if (zkClient != null) {
zkClient.close()
@@ -268,8 +279,21 @@ private[kafka] class KafkaTestUtils extends Logging {
def shutdown() {
factory.shutdown()
- Utils.deleteRecursively(snapshotDir)
- Utils.deleteRecursively(logDir)
+ // The directories are not closed even if the ZooKeeper server is shut down.
+ // Please see ZOOKEEPER-1844, which is fixed in 3.4.6+. It leads to test failures
+ // on Windows if the directory deletion failure throws an exception.
+ try {
+ Utils.deleteRecursively(snapshotDir)
+ } catch {
+ case e: IOException if Utils.isWindows =>
+ logWarning(e.getMessage)
+ }
+ try {
+ Utils.deleteRecursively(logDir)
+ } catch {
+ case e: IOException if Utils.isWindows =>
+ logWarning(e.getMessage)
+ }
}
}
}
diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index 8a747a5e29..f8b34074f1 100644
--- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -52,7 +52,6 @@ class DirectKafkaStreamSuite
.setMaster("local[4]")
.setAppName(this.getClass.getSimpleName)
- private var sc: SparkContext = _
private var ssc: StreamingContext = _
private var testDir: File = _
@@ -72,11 +71,7 @@ class DirectKafkaStreamSuite
after {
if (ssc != null) {
- ssc.stop()
- sc = null
- }
- if (sc != null) {
- sc.stop()
+ ssc.stop(stopSparkContext = true)
}
if (testDir != null) {
Utils.deleteRecursively(testDir)
@@ -276,7 +271,7 @@ class DirectKafkaStreamSuite
sendData(i)
}
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ eventually(timeout(20 seconds), interval(50 milliseconds)) {
assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum)
}
@@ -319,7 +314,7 @@ class DirectKafkaStreamSuite
sendData(i)
}
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ eventually(timeout(20 seconds), interval(50 milliseconds)) {
assert(DirectKafkaStreamSuite.total.get === (1 to 20).sum)
}
ssc.stop()
diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
index 7b9aee39ff..57f89cc7db 100644
--- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
+++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
@@ -80,7 +80,7 @@ class ReliableKafkaStreamSuite extends SparkFunSuite
after {
if (ssc != null) {
- ssc.stop()
+ ssc.stop(stopSparkContext = true)
ssc = null
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 012b6ea4c5..ac6c3a89db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
import scala.util.Try
+import org.apache.commons.lang3.StringEscapeUtils
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
@@ -222,25 +223,34 @@ case class LoadDataCommand(
val loadPath =
if (isLocal) {
val uri = Utils.resolveURI(path)
- val filePath = uri.getPath()
- val exists = if (filePath.contains("*")) {
+ val file = new File(uri.getPath)
+ val exists = if (file.getAbsolutePath.contains("*")) {
val fileSystem = FileSystems.getDefault
- val pathPattern = fileSystem.getPath(filePath)
- val dir = pathPattern.getParent.toString
+ val dir = file.getParentFile.getAbsolutePath
if (dir.contains("*")) {
throw new AnalysisException(
s"LOAD DATA input path allows only filename wildcard: $path")
}
+ // Note that special characters such as "*" on Windows are not allowed as a path.
+ // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path.
+ val dirPath = fileSystem.getPath(dir)
+ val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath
+ val safePathPattern = if (Utils.isWindows) {
+ // On Windows, the pattern should not start with slashes for absolute file paths.
+ pathPattern.stripPrefix("/")
+ } else {
+ pathPattern
+ }
val files = new File(dir).listFiles()
if (files == null) {
false
} else {
- val matcher = fileSystem.getPathMatcher("glob:" + pathPattern.toAbsolutePath)
+ val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern)
files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath)))
}
} else {
- new File(filePath).exists()
+ new File(file.getAbsolutePath).exists()
}
if (!exists) {
throw new AnalysisException(s"LOAD DATA input path does not exist: $path")
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index a8dd5102b7..dcb8e498a4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -165,7 +165,7 @@ private[hive] class TestHiveSparkSession(
System.clearProperty("spark.hostPort")
// For some hive test case which contain ${system:test.tmp.dir}
- System.setProperty("test.tmp.dir", Utils.createTempDir().getCanonicalPath)
+ System.setProperty("test.tmp.dir", Utils.createTempDir().toURI.getPath)
/** The location of the compiled hive distribution */
lazy val hiveHome = envVarToFile("HIVE_HOME")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index 9aa9ebf1aa..8f0d5d886c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -339,7 +339,13 @@ class HiveSparkSubmitSuite
private def runSparkSubmit(args: Seq[String]): Unit = {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
val history = ArrayBuffer.empty[String]
- val commands = Seq("./bin/spark-submit") ++ args
+ val sparkSubmit = if (Utils.isWindows) {
+ // On Windows, `ProcessBuilder.directory` does not change the current working directory.
+ new File("..\\..\\bin\\spark-submit.cmd").getAbsolutePath
+ } else {
+ "./bin/spark-submit"
+ }
+ val commands = Seq(sparkSubmit) ++ args
val commandLine = commands.mkString("'", "' '", "'")
val builder = new ProcessBuilder(commands: _*).directory(new File(sparkHome))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 13ef79e3b7..081f6f6d82 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1071,11 +1071,9 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
test("CTAS: persisted bucketed data source table") {
withTempPath { dir =>
withTable("t") {
- val path = dir.getCanonicalPath
-
sql(
s"""CREATE TABLE t USING PARQUET
- |OPTIONS (PATH '$path')
+ |OPTIONS (PATH '${dir.toURI}')
|CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS
|AS SELECT 1 AS a, 2 AS b
""".stripMargin
@@ -1093,11 +1091,9 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
withTempPath { dir =>
withTable("t") {
- val path = dir.getCanonicalPath
-
sql(
s"""CREATE TABLE t USING PARQUET
- |OPTIONS (PATH '$path')
+ |OPTIONS (PATH '${dir.toURI}')
|CLUSTERED BY (a) INTO 2 BUCKETS
|AS SELECT 1 AS a, 2 AS b
""".stripMargin
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
index 44233cfbf0..dca207a72d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
@@ -172,7 +172,7 @@ class PartitionProviderCompatibilitySuite
withTempDir { dir2 =>
sql(
s"""alter table test partition (partCol=1)
- |set location '${dir2.getAbsolutePath}'""".stripMargin)
+ |set location '${dir2.toURI}'""".stripMargin)
assert(sql("select * from test").count() == 4)
sql(
"""insert overwrite table test
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
index 55b72c625d..70750c4e05 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
@@ -77,7 +77,7 @@ class PartitionedTablePerfStatsSuite
|create external table $tableName (fieldOne long)
|partitioned by (partCol1 int, partCol2 int)
|stored as parquet
- |location "${dir.getAbsolutePath}"""".stripMargin)
+ |location "${dir.toURI}"""".stripMargin)
if (repair) {
spark.sql(s"msck repair table $tableName")
}
@@ -102,7 +102,7 @@ class PartitionedTablePerfStatsSuite
spark.sql(s"""
|create table $tableName (fieldOne long, partCol1 int, partCol2 int)
|using parquet
- |options (path "${dir.getAbsolutePath}")
+ |options (path "${dir.toURI}")
|partitioned by (partCol1, partCol2)""".stripMargin)
if (repair) {
spark.sql(s"msck repair table $tableName")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
index 68df809434..cc26b32184 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
@@ -146,7 +146,7 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing
| c1 INT COMMENT 'bla',
| c2 STRING
|)
- |LOCATION '$dir'
+ |LOCATION '${dir.toURI}'
|TBLPROPERTIES (
| 'prop1' = 'value1',
| 'prop2' = 'value2'
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 8803ea36de..b040f26d28 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -57,7 +57,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
\"separatorChar\" = \",\",
\"quoteChar\" = \"\\\"\",
\"escapeChar\" = \"\\\\\")
- LOCATION '$tempDir'
+ LOCATION '${tempDir.toURI}'
""")
spark.conf.set(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key, true)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 77285282a6..0af331e67b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -87,7 +87,7 @@ class HiveDDLSuite
s"""
|create table $tabName
|stored as parquet
- |location '$tmpDir'
+ |location '${tmpDir.toURI}'
|as select 1, '3'
""".stripMargin)
@@ -269,7 +269,7 @@ class HiveDDLSuite
s"""
|CREATE EXTERNAL TABLE $externalTab (key INT, value STRING)
|PARTITIONED BY (ds STRING, hr STRING)
- |LOCATION '$basePath'
+ |LOCATION '${tmpDir.toURI}'
""".stripMargin)
// Before data insertion, all the directory are empty
@@ -678,14 +678,10 @@ class HiveDDLSuite
} else {
assert(!fs.exists(new Path(tmpDir.toString)))
}
- sql(s"CREATE DATABASE $dbName Location '$tmpDir'")
+ sql(s"CREATE DATABASE $dbName Location '${tmpDir.toURI.getPath.stripSuffix("/")}'")
val db1 = catalog.getDatabaseMetadata(dbName)
- val dbPath = "file:" + tmpDir
- assert(db1 == CatalogDatabase(
- dbName,
- "",
- if (dbPath.endsWith(File.separator)) dbPath.dropRight(1) else dbPath,
- Map.empty))
+ val dbPath = tmpDir.toURI.toString.stripSuffix("/")
+ assert(db1 == CatalogDatabase(dbName, "", dbPath, Map.empty))
sql("USE db1")
sql(s"CREATE TABLE $tabName as SELECT 1")
@@ -713,10 +709,6 @@ class HiveDDLSuite
}
}
- private def appendTrailingSlash(path: String): String = {
- if (!path.endsWith(File.separator)) path + File.separator else path
- }
-
private def dropDatabase(cascade: Boolean, tableExists: Boolean): Unit = {
val dbName = "db1"
val dbPath = new Path(spark.sessionState.conf.warehousePath)
@@ -724,7 +716,7 @@ class HiveDDLSuite
sql(s"CREATE DATABASE $dbName")
val catalog = spark.sessionState.catalog
- val expectedDBLocation = "file:" + appendTrailingSlash(dbPath.toString) + s"$dbName.db"
+ val expectedDBLocation = s"file:${dbPath.toUri.getPath.stripSuffix("/")}/$dbName.db"
val db1 = catalog.getDatabaseMetadata(dbName)
assert(db1 == CatalogDatabase(
dbName,
@@ -857,7 +849,7 @@ class HiveDDLSuite
val path = dir.getCanonicalPath
spark.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd)
.write.format("parquet").save(path)
- sql(s"CREATE TABLE $sourceTabName USING parquet OPTIONS (PATH '$path')")
+ sql(s"CREATE TABLE $sourceTabName USING parquet OPTIONS (PATH '${dir.toURI}')")
sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName")
// The source table should be an external data source table
@@ -894,7 +886,7 @@ class HiveDDLSuite
test("CREATE TABLE LIKE an external Hive serde table") {
val catalog = spark.sessionState.catalog
withTempDir { tmpDir =>
- val basePath = tmpDir.getCanonicalPath
+ val basePath = tmpDir.toURI
val sourceTabName = "tab1"
val targetTabName = "tab2"
withTable(sourceTabName, targetTabName) {
@@ -1112,7 +1104,7 @@ class HiveDDLSuite
Seq("parquet", "json", "orc").foreach { fileFormat =>
withTable("t1") {
withTempPath { dir =>
- val path = dir.getCanonicalPath
+ val path = dir.toURI.toString
spark.range(1).write.format(fileFormat).save(path)
sql(s"CREATE TABLE t1 USING $fileFormat OPTIONS (PATH '$path')")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index f65b5f4daa..f47cf4a9c6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -126,7 +126,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
s"""
|CREATE FUNCTION udtf_count_temp
|AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
- |USING JAR '${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}'
+ |USING JAR '${hiveContext.getHiveFile("TestUDTF.jar").toURI}'
""".stripMargin)
checkAnswer(
@@ -321,7 +321,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
s"""
|CREATE FUNCTION udtf_count
|AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
- |USING JAR '${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}'
+ |USING JAR '${hiveContext.getHiveFile("TestUDTF.jar").toURI}'
""".stripMargin)
checkKeywordsExist(sql("describe function udtf_count"),
@@ -644,7 +644,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
withTempDir { dir =>
val defaultDataSource = sessionState.conf.defaultDataSourceName
- val tempLocation = dir.getCanonicalPath
+ val tempLocation = dir.toURI.getPath.stripSuffix("/")
sql(s"CREATE TABLE ctas1 LOCATION 'file:$tempLocation/c1'" +
" AS SELECT key k, value FROM src ORDER BY k, value")
checkRelation("ctas1", true, defaultDataSource, Some(s"file:$tempLocation/c1"))
@@ -1953,16 +1953,18 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
test("SPARK-17796 Support wildcard character in filename for LOAD DATA LOCAL INPATH") {
withTempDir { dir =>
+ val path = dir.toURI.toString.stripSuffix("/")
+ val dirPath = dir.getAbsoluteFile
for (i <- 1 to 3) {
- Files.write(s"$i", new File(s"$dir/part-r-0000$i"), StandardCharsets.UTF_8)
+ Files.write(s"$i", new File(dirPath, s"part-r-0000$i"), StandardCharsets.UTF_8)
}
for (i <- 5 to 7) {
- Files.write(s"$i", new File(s"$dir/part-s-0000$i"), StandardCharsets.UTF_8)
+ Files.write(s"$i", new File(dirPath, s"part-s-0000$i"), StandardCharsets.UTF_8)
}
withTable("load_t") {
sql("CREATE TABLE load_t (a STRING)")
- sql(s"LOAD DATA LOCAL INPATH '$dir/*part-r*' INTO TABLE load_t")
+ sql(s"LOAD DATA LOCAL INPATH '$path/*part-r*' INTO TABLE load_t")
checkAnswer(sql("SELECT * FROM load_t"), Seq(Row("1"), Row("2"), Row("3")))
val m = intercept[AnalysisException] {
@@ -1971,7 +1973,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
assert(m.contains("LOAD DATA input path does not exist"))
val m2 = intercept[AnalysisException] {
- sql(s"LOAD DATA LOCAL INPATH '$dir*/*part*' INTO TABLE load_t")
+ sql(s"LOAD DATA LOCAL INPATH '$path*/*part*' INTO TABLE load_t")
}.getMessage
assert(m2.contains("LOAD DATA input path allows only filename wildcard"))
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala
index 0ff3511c87..a20c758a83 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala
@@ -43,7 +43,7 @@ class WindowQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleto
| p_retailprice DOUBLE,
| p_comment STRING)
""".stripMargin)
- val testData1 = TestHive.getHiveFile("data/files/part_tiny.txt").getCanonicalPath
+ val testData1 = TestHive.getHiveFile("data/files/part_tiny.txt").toURI
sql(
s"""
|LOAD DATA LOCAL INPATH '$testData1' overwrite into table part
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
index 2b40469051..fe1e17dd08 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
@@ -57,7 +57,7 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
| stringField STRING
|)
|STORED AS ORC
- |LOCATION '${orcTableAsDir.getCanonicalPath}'
+ |LOCATION '${orcTableAsDir.toURI}'
""".stripMargin)
sql(
@@ -172,7 +172,7 @@ class OrcSourceSuite extends OrcSuite {
s"""CREATE TEMPORARY VIEW normal_orc_source
|USING org.apache.spark.sql.hive.orc
|OPTIONS (
- | PATH '${new File(orcTableAsDir.getAbsolutePath).getCanonicalPath}'
+ | PATH '${new File(orcTableAsDir.getAbsolutePath).toURI}'
|)
""".stripMargin)
@@ -180,7 +180,7 @@ class OrcSourceSuite extends OrcSuite {
s"""CREATE TEMPORARY VIEW normal_orc_as_source
|USING org.apache.spark.sql.hive.orc
|OPTIONS (
- | PATH '${new File(orcTableAsDir.getAbsolutePath).getCanonicalPath}'
+ | PATH '${new File(orcTableAsDir.getAbsolutePath).toURI}'
|)
""".stripMargin)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 2ce60fe589..d3e04d4bf2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -81,7 +81,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- location '${partitionedTableDir.getCanonicalPath}'
+ location '${partitionedTableDir.toURI}'
""")
sql(s"""
@@ -95,7 +95,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- location '${partitionedTableDirWithKey.getCanonicalPath}'
+ location '${partitionedTableDirWithKey.toURI}'
""")
sql(s"""
@@ -108,7 +108,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- location '${new File(normalTableDir, "normal").getCanonicalPath}'
+ location '${new File(normalTableDir, "normal").toURI}'
""")
sql(s"""
@@ -124,7 +124,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- LOCATION '${partitionedTableDirWithComplexTypes.getCanonicalPath}'
+ LOCATION '${partitionedTableDirWithComplexTypes.toURI}'
""")
sql(s"""
@@ -140,7 +140,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- LOCATION '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
+ LOCATION '${partitionedTableDirWithKeyAndComplexTypes.toURI}'
""")
sql(
@@ -561,7 +561,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
test("SPARK-15248: explicitly added partitions should be readable") {
withTable("test_added_partitions", "test_temp") {
withTempDir { src =>
- val partitionDir = new File(src, "partition").getCanonicalPath
+ val partitionDir = new File(src, "partition").toURI
sql(
"""
|CREATE TABLE test_added_partitions (a STRING)
@@ -636,7 +636,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
CREATE TEMPORARY VIEW partitioned_parquet
USING org.apache.spark.sql.parquet
OPTIONS (
- path '${partitionedTableDir.getCanonicalPath}'
+ path '${partitionedTableDir.toURI}'
)
""")
@@ -644,7 +644,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
CREATE TEMPORARY VIEW partitioned_parquet_with_key
USING org.apache.spark.sql.parquet
OPTIONS (
- path '${partitionedTableDirWithKey.getCanonicalPath}'
+ path '${partitionedTableDirWithKey.toURI}'
)
""")
@@ -652,7 +652,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
CREATE TEMPORARY VIEW normal_parquet
USING org.apache.spark.sql.parquet
OPTIONS (
- path '${new File(partitionedTableDir, "p=1").getCanonicalPath}'
+ path '${new File(partitionedTableDir, "p=1").toURI}'
)
""")
@@ -660,7 +660,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
CREATE TEMPORARY VIEW partitioned_parquet_with_key_and_complextypes
USING org.apache.spark.sql.parquet
OPTIONS (
- path '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
+ path '${partitionedTableDirWithKeyAndComplexTypes.toURI}'
)
""")
@@ -668,7 +668,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
CREATE TEMPORARY VIEW partitioned_parquet_with_complextypes
USING org.apache.spark.sql.parquet
OPTIONS (
- path '${partitionedTableDirWithComplexTypes.getCanonicalPath}'
+ path '${partitionedTableDirWithComplexTypes.toURI}'
)
""")
}
@@ -701,8 +701,6 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
test("SPARK-8811: compatibility with array of struct in Hive") {
withTempPath { dir =>
- val path = dir.getCanonicalPath
-
withTable("array_of_struct") {
val conf = Seq(
HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false",
@@ -712,7 +710,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
withSQLConf(conf: _*) {
sql(
s"""CREATE TABLE array_of_struct
- |STORED AS PARQUET LOCATION '$path'
+ |STORED AS PARQUET LOCATION '${dir.toURI}'
|AS SELECT
| '1st' AS a,
| '2nd' AS b,
@@ -720,7 +718,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
""".stripMargin)
checkAnswer(
- spark.read.parquet(path),
+ spark.read.parquet(dir.getCanonicalPath),
Row("1st", "2nd", Seq(Row("val_a", "val_b"))))
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 5ba09a54af..eca7c79465 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -175,6 +175,12 @@ private[streaming] class ReceiverSupervisorImpl(
}
override protected def onStop(message: String, error: Option[Throwable]) {
+ receivedBlockHandler match {
+ case handler: WriteAheadLogBasedBlockHandler =>
+ // Write ahead log should be closed.
+ handler.stop()
+ case _ =>
+ }
registeredBlockGenerators.asScala.foreach { _.stop() }
env.rpcEnv.stop(endpoint)
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index a1e9d1e023..7fcf45e7de 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, ObjectOutputStream}
+import java.io._
import java.nio.charset.StandardCharsets
import java.util.concurrent.ConcurrentLinkedQueue
@@ -629,7 +629,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
val filenames = fileInputDStream.batchTimeToSelectedFiles.synchronized
{ fileInputDStream.batchTimeToSelectedFiles.values.flatten }
- filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
+ filenames.map(_.split("/").last.toInt).toSeq.sorted
}
try {
@@ -755,7 +755,15 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
assert(outputBuffer.asScala.flatten.toSet === expectedOutput.toSet)
}
} finally {
- Utils.deleteRecursively(testDir)
+ try {
+ // As the driver shuts down in the middle of processing and the thread above sleeps
+ // for a while, `testDir` can be not closed correctly at this point which causes the
+ // test failure on Windows.
+ Utils.deleteRecursively(testDir)
+ } catch {
+ case e: IOException if Utils.isWindows =>
+ logWarning(e.getMessage)
+ }
}
}