aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/util/Utils.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/util/Utils.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala71
1 files changed, 69 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 0c7cff019f..3b1b6df089 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1086,9 +1086,19 @@ private[spark] object Utils extends Logging {
}
/**
- * Return true if this is Windows.
+ * Whether the underlying operating system is Windows.
*/
- def isWindows = SystemUtils.IS_OS_WINDOWS
+ val isWindows = SystemUtils.IS_OS_WINDOWS
+
+ /**
+ * Pattern for matching a Windows drive, which contains only a single alphabet character.
+ */
+ val windowsDrive = "([a-zA-Z])".r
+
+ /**
+ * Format a Windows path such that it can be safely passed to a URI.
+ */
+ def formatWindowsPath(path: String): String = path.replace("\\", "/")
/**
* Indicates whether Spark is currently running unit tests.
@@ -1166,4 +1176,61 @@ private[spark] object Utils extends Logging {
true
}
}
+
+ /**
+ * Return a well-formed URI for the file described by a user input string.
+ *
+ * If the supplied path does not contain a scheme, or is a relative path, it will be
+ * converted into an absolute path with a file:// scheme.
+ */
+ def resolveURI(path: String, testWindows: Boolean = false): URI = {
+
+ // In Windows, the file separator is a backslash, but this is inconsistent with the URI format
+ val windows = isWindows || testWindows
+ val formattedPath = if (windows) formatWindowsPath(path) else path
+
+ val uri = new URI(formattedPath)
+ if (uri.getPath == null) {
+ throw new IllegalArgumentException(s"Given path is malformed: $uri")
+ }
+ uri.getScheme match {
+ case windowsDrive(d) if windows =>
+ new URI("file:/" + uri.toString.stripPrefix("/"))
+ case null =>
+ // Preserve fragments for HDFS file name substitution (denoted by "#")
+ // For instance, in "abc.py#xyz.py", "xyz.py" is the name observed by the application
+ val fragment = uri.getFragment
+ val part = new File(uri.getPath).toURI
+ new URI(part.getScheme, part.getPath, fragment)
+ case _ =>
+ uri
+ }
+ }
+
+ /** Resolve a comma-separated list of paths. */
+ def resolveURIs(paths: String, testWindows: Boolean = false): String = {
+ if (paths == null || paths.trim.isEmpty) {
+ ""
+ } else {
+ paths.split(",").map { p => Utils.resolveURI(p, testWindows) }.mkString(",")
+ }
+ }
+
+ /** Return all non-local paths from a comma-separated list of paths. */
+ def nonLocalPaths(paths: String, testWindows: Boolean = false): Array[String] = {
+ val windows = isWindows || testWindows
+ if (paths == null || paths.trim.isEmpty) {
+ Array.empty
+ } else {
+ paths.split(",").filter { p =>
+ val formattedPath = if (windows) formatWindowsPath(p) else p
+ new URI(formattedPath).getScheme match {
+ case windowsDrive(d) if windows => false
+ case "local" | "file" | null => false
+ case _ => true
+ }
+ }
+ }
+ }
+
}