aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala60
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala33
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala71
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/PythonRunnerSuite.scala61
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala41
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala66
7 files changed, 314 insertions, 43 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
index 2dfa02bd26..0d6751f3fa 100644
--- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -17,10 +17,13 @@
package org.apache.spark.deploy
+import java.net.URI
+
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import org.apache.spark.api.python.{PythonUtils, RedirectThread}
+import org.apache.spark.util.Utils
/**
* A main class used by spark-submit to launch Python applications. It executes python as a
@@ -28,12 +31,15 @@ import org.apache.spark.api.python.{PythonUtils, RedirectThread}
*/
object PythonRunner {
def main(args: Array[String]) {
- val primaryResource = args(0)
+ val pythonFile = args(0)
val pyFiles = args(1)
val otherArgs = args.slice(2, args.length)
-
val pythonExec = sys.env.get("PYSPARK_PYTHON").getOrElse("python") // TODO: get this from conf
+ // Format python file paths before adding them to the PYTHONPATH
+ val formattedPythonFile = formatPath(pythonFile)
+ val formattedPyFiles = formatPaths(pyFiles)
+
// Launch a Py4J gateway server for the process to connect to; this will let it see our
// Java system properties and such
val gatewayServer = new py4j.GatewayServer(null, 0)
@@ -42,13 +48,13 @@ object PythonRunner {
// Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the
// python directories in SPARK_HOME (if set), and any files in the pyFiles argument
val pathElements = new ArrayBuffer[String]
- pathElements ++= Option(pyFiles).getOrElse("").split(",")
+ pathElements ++= formattedPyFiles
pathElements += PythonUtils.sparkPythonPath
pathElements += sys.env.getOrElse("PYTHONPATH", "")
val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)
// Launch Python process
- val builder = new ProcessBuilder(Seq(pythonExec, "-u", primaryResource) ++ otherArgs)
+ val builder = new ProcessBuilder(Seq(pythonExec, "-u", formattedPythonFile) ++ otherArgs)
val env = builder.environment()
env.put("PYTHONPATH", pythonPath)
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
@@ -59,4 +65,50 @@ object PythonRunner {
System.exit(process.waitFor())
}
+
+ /**
+ * Format the python file path so that it can be added to the PYTHONPATH correctly.
+ *
+ * Python does not understand URI schemes in paths. Before adding python files to the
+ * PYTHONPATH, we need to extract the path from the URI. This is safe to do because we
+ * currently only support local python files.
+ */
+ def formatPath(path: String, testWindows: Boolean = false): String = {
+ if (Utils.nonLocalPaths(path, testWindows).nonEmpty) {
+ throw new IllegalArgumentException("Launching Python applications through " +
+ s"spark-submit is currently only supported for local files: $path")
+ }
+ val windows = Utils.isWindows || testWindows
+ var formattedPath = if (windows) Utils.formatWindowsPath(path) else path
+
+ // Strip the URI scheme from the path
+ formattedPath =
+ new URI(formattedPath).getScheme match {
+ case Utils.windowsDrive(d) if windows => formattedPath
+ case null => formattedPath
+ case _ => new URI(formattedPath).getPath
+ }
+
+ // Guard against malformed paths potentially throwing NPE
+ if (formattedPath == null) {
+ throw new IllegalArgumentException(s"Python file path is malformed: $path")
+ }
+
+ // In Windows, the drive should not be prefixed with "/"
+ // For instance, python does not understand "/C:/path/to/sheep.py"
+ formattedPath = if (windows) formattedPath.stripPrefix("/") else formattedPath
+ formattedPath
+ }
+
+ /**
+ * Format each python file path in the comma-delimited list of paths, so it can be
+ * added to the PYTHONPATH correctly.
+ */
+ def formatPaths(paths: String, testWindows: Boolean = false): Array[String] = {
+ Option(paths).getOrElse("")
+ .split(",")
+ .filter(_.nonEmpty)
+ .map { p => formatPath(p, testWindows) }
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index c54331c00f..7e9a9344e6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -136,9 +136,9 @@ object SparkSubmit {
args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
args.files = mergeFileLists(args.files, args.primaryResource)
}
- val pyFiles = Option(args.pyFiles).getOrElse("")
- args.files = mergeFileLists(args.files, pyFiles)
- sysProps("spark.submit.pyFiles") = pyFiles
+ args.files = mergeFileLists(args.files, args.pyFiles)
+ // Format python file paths properly before adding them to the PYTHONPATH
+ sysProps("spark.submit.pyFiles") = PythonRunner.formatPaths(args.pyFiles).mkString(",")
}
// If we're deploying into YARN, use yarn.Client as a wrapper around the user class
@@ -299,13 +299,18 @@ object SparkSubmit {
}
private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
- val localJarFile = new File(localJar)
- if (!localJarFile.exists()) {
- printWarning(s"Jar $localJar does not exist, skipping.")
+ val uri = Utils.resolveURI(localJar)
+ uri.getScheme match {
+ case "file" | "local" =>
+ val file = new File(uri.getPath)
+ if (file.exists()) {
+ loader.addURL(file.toURI.toURL)
+ } else {
+ printWarning(s"Local jar $file does not exist, skipping.")
+ }
+ case _ =>
+ printWarning(s"Skip remote jar $uri.")
}
-
- val url = localJarFile.getAbsoluteFile.toURI.toURL
- loader.addURL(url)
}
/**
@@ -318,7 +323,7 @@ object SparkSubmit {
/**
* Return whether the given primary resource represents a shell.
*/
- private def isShell(primaryResource: String): Boolean = {
+ private[spark] def isShell(primaryResource: String): Boolean = {
primaryResource == SPARK_SHELL || primaryResource == PYSPARK_SHELL
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 0cc05fb95a..bf449afae6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -118,7 +118,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
} catch {
case e: Exception =>
- SparkSubmit.printErrorAndExit("Failed to read JAR: " + primaryResource)
+ SparkSubmit.printErrorAndExit("Cannot load main class from JAR: " + primaryResource)
return
}
}
@@ -148,6 +148,18 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script")
}
+ // Require all python files to be local, so we can add them to the PYTHONPATH
+ if (isPython) {
+ if (Utils.nonLocalPaths(primaryResource).nonEmpty) {
+ SparkSubmit.printErrorAndExit(s"Only local python files are supported: $primaryResource")
+ }
+ val nonLocalPyFiles = Utils.nonLocalPaths(pyFiles).mkString(",")
+ if (nonLocalPyFiles.nonEmpty) {
+ SparkSubmit.printErrorAndExit(
+ s"Only local additional python files are supported: $nonLocalPyFiles")
+ }
+ }
+
if (master.startsWith("yarn")) {
val hasHadoopEnv = sys.env.contains("HADOOP_CONF_DIR") || sys.env.contains("YARN_CONF_DIR")
if (!hasHadoopEnv && !Utils.isTesting) {
@@ -263,19 +275,19 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
parse(tail)
case ("--files") :: value :: tail =>
- files = value
+ files = Utils.resolveURIs(value)
parse(tail)
case ("--py-files") :: value :: tail =>
- pyFiles = value
+ pyFiles = Utils.resolveURIs(value)
parse(tail)
case ("--archives") :: value :: tail =>
- archives = value
+ archives = Utils.resolveURIs(value)
parse(tail)
case ("--jars") :: value :: tail =>
- jars = value
+ jars = Utils.resolveURIs(value)
parse(tail)
case ("--help" | "-h") :: tail =>
@@ -296,7 +308,12 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
val errMessage = s"Unrecognized option '$value'."
SparkSubmit.printErrorAndExit(errMessage)
case v =>
- primaryResource = v
+ primaryResource =
+ if (!SparkSubmit.isShell(v)) {
+ Utils.resolveURI(v).toString
+ } else {
+ v
+ }
inSparkOpts = false
isPython = SparkSubmit.isPython(v)
parse(tail)
@@ -327,8 +344,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
| --name NAME A name of your application.
| --jars JARS Comma-separated list of local jars to include on the driver
| and executor classpaths.
- | --py-files PY_FILES Comma-separated list of .zip or .egg files to place on the
- | PYTHONPATH for Python apps.
+ | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
+ | on the PYTHONPATH for Python apps.
| --files FILES Comma-separated list of files to be placed in the working
| directory of each executor.
| --properties-file FILE Path to a file from which to load extra properties. If not
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 0c7cff019f..3b1b6df089 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1086,9 +1086,19 @@ private[spark] object Utils extends Logging {
}
/**
- * Return true if this is Windows.
+ * Whether the underlying operating system is Windows.
*/
- def isWindows = SystemUtils.IS_OS_WINDOWS
+ val isWindows = SystemUtils.IS_OS_WINDOWS
+
+ /**
+ * Pattern for matching a Windows drive, which contains only a single alphabet character.
+ */
+ val windowsDrive = "([a-zA-Z])".r
+
+ /**
+ * Format a Windows path such that it can be safely passed to a URI.
+ */
+ def formatWindowsPath(path: String): String = path.replace("\\", "/")
/**
* Indicates whether Spark is currently running unit tests.
@@ -1166,4 +1176,61 @@ private[spark] object Utils extends Logging {
true
}
}
+
+ /**
+ * Return a well-formed URI for the file described by a user input string.
+ *
+ * If the supplied path does not contain a scheme, or is a relative path, it will be
+ * converted into an absolute path with a file:// scheme.
+ */
+ def resolveURI(path: String, testWindows: Boolean = false): URI = {
+
+ // In Windows, the file separator is a backslash, but this is inconsistent with the URI format
+ val windows = isWindows || testWindows
+ val formattedPath = if (windows) formatWindowsPath(path) else path
+
+ val uri = new URI(formattedPath)
+ if (uri.getPath == null) {
+ throw new IllegalArgumentException(s"Given path is malformed: $uri")
+ }
+ uri.getScheme match {
+ case windowsDrive(d) if windows =>
+ new URI("file:/" + uri.toString.stripPrefix("/"))
+ case null =>
+ // Preserve fragments for HDFS file name substitution (denoted by "#")
+ // For instance, in "abc.py#xyz.py", "xyz.py" is the name observed by the application
+ val fragment = uri.getFragment
+ val part = new File(uri.getPath).toURI
+ new URI(part.getScheme, part.getPath, fragment)
+ case _ =>
+ uri
+ }
+ }
+
+ /** Resolve a comma-separated list of paths. */
+ def resolveURIs(paths: String, testWindows: Boolean = false): String = {
+ if (paths == null || paths.trim.isEmpty) {
+ ""
+ } else {
+ paths.split(",").map { p => Utils.resolveURI(p, testWindows) }.mkString(",")
+ }
+ }
+
+ /** Return all non-local paths from a comma-separated list of paths. */
+ def nonLocalPaths(paths: String, testWindows: Boolean = false): Array[String] = {
+ val windows = isWindows || testWindows
+ if (paths == null || paths.trim.isEmpty) {
+ Array.empty
+ } else {
+ paths.split(",").filter { p =>
+ val formattedPath = if (windows) formatWindowsPath(p) else p
+ new URI(formattedPath).getScheme match {
+ case windowsDrive(d) if windows => false
+ case "local" | "file" | null => false
+ case _ => true
+ }
+ }
+ }
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/PythonRunnerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/PythonRunnerSuite.scala
new file mode 100644
index 0000000000..bb6251fb4b
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/PythonRunnerSuite.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import org.scalatest.FunSuite
+
+class PythonRunnerSuite extends FunSuite {
+
+ // Test formatting a single path to be added to the PYTHONPATH
+ test("format path") {
+ assert(PythonRunner.formatPath("spark.py") === "spark.py")
+ assert(PythonRunner.formatPath("file:/spark.py") === "/spark.py")
+ assert(PythonRunner.formatPath("file:///spark.py") === "/spark.py")
+ assert(PythonRunner.formatPath("local:/spark.py") === "/spark.py")
+ assert(PythonRunner.formatPath("local:///spark.py") === "/spark.py")
+ assert(PythonRunner.formatPath("C:/a/b/spark.py", testWindows = true) === "C:/a/b/spark.py")
+ assert(PythonRunner.formatPath("/C:/a/b/spark.py", testWindows = true) === "C:/a/b/spark.py")
+ assert(PythonRunner.formatPath("file:/C:/a/b/spark.py", testWindows = true) ===
+ "C:/a/b/spark.py")
+ intercept[IllegalArgumentException] { PythonRunner.formatPath("one:two") }
+ intercept[IllegalArgumentException] { PythonRunner.formatPath("hdfs:s3:xtremeFS") }
+ intercept[IllegalArgumentException] { PythonRunner.formatPath("hdfs:/path/to/some.py") }
+ }
+
+ // Test formatting multiple comma-separated paths to be added to the PYTHONPATH
+ test("format paths") {
+ assert(PythonRunner.formatPaths("spark.py") === Array("spark.py"))
+ assert(PythonRunner.formatPaths("file:/spark.py") === Array("/spark.py"))
+ assert(PythonRunner.formatPaths("file:/app.py,local:/spark.py") ===
+ Array("/app.py", "/spark.py"))
+ assert(PythonRunner.formatPaths("me.py,file:/you.py,local:/we.py") ===
+ Array("me.py", "/you.py", "/we.py"))
+ assert(PythonRunner.formatPaths("C:/a/b/spark.py", testWindows = true) ===
+ Array("C:/a/b/spark.py"))
+ assert(PythonRunner.formatPaths("/C:/a/b/spark.py", testWindows = true) ===
+ Array("C:/a/b/spark.py"))
+ assert(PythonRunner.formatPaths("C:/free.py,pie.py", testWindows = true) ===
+ Array("C:/free.py", "pie.py"))
+ assert(PythonRunner.formatPaths("lovely.py,C:/free.py,file:/d:/fry.py", testWindows = true) ===
+ Array("lovely.py", "C:/free.py", "d:/fry.py"))
+ intercept[IllegalArgumentException] { PythonRunner.formatPaths("one:two,three") }
+ intercept[IllegalArgumentException] { PythonRunner.formatPaths("two,three,four:five:six") }
+ intercept[IllegalArgumentException] { PythonRunner.formatPaths("hdfs:/some.py,foo.py") }
+ intercept[IllegalArgumentException] { PythonRunner.formatPaths("foo.py,hdfs:/some.py") }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 6c0deede53..02427a4a83 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -91,7 +91,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
"--jars=one.jar,two.jar,three.jar",
"--name=myApp")
val appArgs = new SparkSubmitArguments(clArgs)
- appArgs.jars should be ("one.jar,two.jar,three.jar")
+ appArgs.jars should include regex (".*one.jar,.*two.jar,.*three.jar")
appArgs.name should be ("myApp")
}
@@ -125,17 +125,17 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
val childArgsStr = childArgs.mkString(" ")
- childArgsStr should include ("--jar thejar.jar")
childArgsStr should include ("--class org.SomeClass")
- childArgsStr should include ("--addJars one.jar,two.jar,three.jar")
childArgsStr should include ("--executor-memory 5g")
childArgsStr should include ("--driver-memory 4g")
childArgsStr should include ("--executor-cores 5")
childArgsStr should include ("--arg arg1 --arg arg2")
childArgsStr should include ("--queue thequeue")
- childArgsStr should include ("--files file1.txt,file2.txt")
- childArgsStr should include ("--archives archive1.txt,archive2.txt")
childArgsStr should include ("--num-executors 6")
+ childArgsStr should include regex ("--jar .*thejar.jar")
+ childArgsStr should include regex ("--addJars .*one.jar,.*two.jar,.*three.jar")
+ childArgsStr should include regex ("--files .*file1.txt,.*file2.txt")
+ childArgsStr should include regex ("--archives .*archive1.txt,.*archive2.txt")
mainClass should be ("org.apache.spark.deploy.yarn.Client")
classpath should have length (0)
sysProps("spark.app.name") should be ("beauty")
@@ -162,18 +162,19 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
childArgs.mkString(" ") should be ("arg1 arg2")
mainClass should be ("org.SomeClass")
- classpath should contain ("thejar.jar")
- classpath should contain ("one.jar")
- classpath should contain ("two.jar")
- classpath should contain ("three.jar")
+ classpath should have length (4)
+ classpath(0) should endWith ("thejar.jar")
+ classpath(1) should endWith ("one.jar")
+ classpath(2) should endWith ("two.jar")
+ classpath(3) should endWith ("three.jar")
sysProps("spark.app.name") should be ("trill")
- sysProps("spark.jars") should be ("one.jar,two.jar,three.jar,thejar.jar")
sysProps("spark.executor.memory") should be ("5g")
sysProps("spark.executor.cores") should be ("5")
sysProps("spark.yarn.queue") should be ("thequeue")
- sysProps("spark.yarn.dist.files") should be ("file1.txt,file2.txt")
- sysProps("spark.yarn.dist.archives") should be ("archive1.txt,archive2.txt")
sysProps("spark.executor.instances") should be ("6")
+ sysProps("spark.yarn.dist.files") should include regex (".*file1.txt,.*file2.txt")
+ sysProps("spark.yarn.dist.archives") should include regex (".*archive1.txt,.*archive2.txt")
+ sysProps("spark.jars") should include regex (".*one.jar,.*two.jar,.*three.jar,.*thejar.jar")
sysProps("SPARK_SUBMIT") should be ("true")
}
@@ -190,11 +191,13 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
val childArgsStr = childArgs.mkString(" ")
- childArgsStr.startsWith("--memory 4g --cores 5 --supervise") should be (true)
- childArgsStr should include ("launch spark://h:p thejar.jar org.SomeClass arg1 arg2")
+ childArgsStr should startWith ("--memory 4g --cores 5 --supervise")
+ childArgsStr should include regex ("launch spark://h:p .*thejar.jar org.SomeClass arg1 arg2")
mainClass should be ("org.apache.spark.deploy.Client")
- classpath should have length (0)
- sysProps should have size (2) // contains --jar entry and SPARK_SUBMIT
+ classpath should have size (0)
+ sysProps should have size (2)
+ sysProps.keys should contain ("spark.jars")
+ sysProps.keys should contain ("SPARK_SUBMIT")
}
test("handles standalone client mode") {
@@ -211,7 +214,8 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
childArgs.mkString(" ") should be ("arg1 arg2")
mainClass should be ("org.SomeClass")
- classpath should contain ("thejar.jar")
+ classpath should have length (1)
+ classpath(0) should endWith ("thejar.jar")
sysProps("spark.executor.memory") should be ("5g")
sysProps("spark.cores.max") should be ("5")
}
@@ -230,7 +234,8 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
childArgs.mkString(" ") should be ("arg1 arg2")
mainClass should be ("org.SomeClass")
- classpath should contain ("thejar.jar")
+ classpath should have length (1)
+ classpath(0) should endWith ("thejar.jar")
sysProps("spark.executor.memory") should be ("5g")
sysProps("spark.cores.max") should be ("5")
}
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index cf9e20d347..0aad882ed7 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.util
import scala.util.Random
import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
+import java.net.URI
import java.nio.{ByteBuffer, ByteOrder}
import com.google.common.base.Charsets
@@ -168,5 +169,68 @@ class UtilsSuite extends FunSuite {
assert(result.size.equals(1))
assert(result(0).getCanonicalPath.equals(child1.getCanonicalPath))
}
-}
+ test("resolveURI") {
+ def assertResolves(before: String, after: String, testWindows: Boolean = false): Unit = {
+ assume(before.split(",").length == 1)
+ assert(Utils.resolveURI(before, testWindows) === new URI(after))
+ assert(Utils.resolveURI(after, testWindows) === new URI(after))
+ assert(new URI(Utils.resolveURIs(before, testWindows)) === new URI(after))
+ assert(new URI(Utils.resolveURIs(after, testWindows)) === new URI(after))
+ }
+ val cwd = System.getProperty("user.dir")
+ assertResolves("hdfs:/root/spark.jar", "hdfs:/root/spark.jar")
+ assertResolves("hdfs:///root/spark.jar#app.jar", "hdfs:/root/spark.jar#app.jar")
+ assertResolves("spark.jar", s"file:$cwd/spark.jar")
+ assertResolves("spark.jar#app.jar", s"file:$cwd/spark.jar#app.jar")
+ assertResolves("C:/path/to/file.txt", "file:/C:/path/to/file.txt", testWindows = true)
+ assertResolves("C:\\path\\to\\file.txt", "file:/C:/path/to/file.txt", testWindows = true)
+ assertResolves("file:/C:/path/to/file.txt", "file:/C:/path/to/file.txt", testWindows = true)
+ assertResolves("file:///C:/path/to/file.txt", "file:/C:/path/to/file.txt", testWindows = true)
+ assertResolves("file:/C:/file.txt#alias.txt", "file:/C:/file.txt#alias.txt", testWindows = true)
+ intercept[IllegalArgumentException] { Utils.resolveURI("file:foo") }
+ intercept[IllegalArgumentException] { Utils.resolveURI("file:foo:baby") }
+
+ // Test resolving comma-delimited paths
+ assert(Utils.resolveURIs("jar1,jar2") === s"file:$cwd/jar1,file:$cwd/jar2")
+ assert(Utils.resolveURIs("file:/jar1,file:/jar2") === "file:/jar1,file:/jar2")
+ assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3") ===
+ s"hdfs:/jar1,file:/jar2,file:$cwd/jar3")
+ assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3,jar4#jar5") ===
+ s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4#jar5")
+ assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3,C:\\pi.py#py.pi", testWindows = true) ===
+ s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi")
+ }
+
+ test("nonLocalPaths") {
+ assert(Utils.nonLocalPaths("spark.jar") === Array.empty)
+ assert(Utils.nonLocalPaths("file:/spark.jar") === Array.empty)
+ assert(Utils.nonLocalPaths("file:///spark.jar") === Array.empty)
+ assert(Utils.nonLocalPaths("local:/spark.jar") === Array.empty)
+ assert(Utils.nonLocalPaths("local:///spark.jar") === Array.empty)
+ assert(Utils.nonLocalPaths("hdfs:/spark.jar") === Array("hdfs:/spark.jar"))
+ assert(Utils.nonLocalPaths("hdfs:///spark.jar") === Array("hdfs:///spark.jar"))
+ assert(Utils.nonLocalPaths("file:/spark.jar,local:/smart.jar,family.py") === Array.empty)
+ assert(Utils.nonLocalPaths("local:/spark.jar,file:/smart.jar,family.py") === Array.empty)
+ assert(Utils.nonLocalPaths("hdfs:/spark.jar,s3:/smart.jar") ===
+ Array("hdfs:/spark.jar", "s3:/smart.jar"))
+ assert(Utils.nonLocalPaths("hdfs:/spark.jar,s3:/smart.jar,local.py,file:/hello/pi.py") ===
+ Array("hdfs:/spark.jar", "s3:/smart.jar"))
+ assert(Utils.nonLocalPaths("local.py,hdfs:/spark.jar,file:/hello/pi.py,s3:/smart.jar") ===
+ Array("hdfs:/spark.jar", "s3:/smart.jar"))
+
+ // Test Windows paths
+ assert(Utils.nonLocalPaths("C:/some/path.jar", testWindows = true) === Array.empty)
+ assert(Utils.nonLocalPaths("file:/C:/some/path.jar", testWindows = true) === Array.empty)
+ assert(Utils.nonLocalPaths("file:///C:/some/path.jar", testWindows = true) === Array.empty)
+ assert(Utils.nonLocalPaths("local:/C:/some/path.jar", testWindows = true) === Array.empty)
+ assert(Utils.nonLocalPaths("local:///C:/some/path.jar", testWindows = true) === Array.empty)
+ assert(Utils.nonLocalPaths("hdfs:/a.jar,C:/my.jar,s3:/another.jar", testWindows = true) ===
+ Array("hdfs:/a.jar", "s3:/another.jar"))
+ assert(Utils.nonLocalPaths("D:/your.jar,hdfs:/a.jar,s3:/another.jar", testWindows = true) ===
+ Array("hdfs:/a.jar", "s3:/another.jar"))
+ assert(Utils.nonLocalPaths("hdfs:/a.jar,s3:/another.jar,e:/our.jar", testWindows = true) ===
+ Array("hdfs:/a.jar", "s3:/another.jar"))
+ }
+
+}