aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2015-12-17 09:55:37 -0800
committerShixiong Zhu <shixiong@databricks.com>2015-12-17 09:55:37 -0800
commit86e405f357711ae93935853a912bc13985c259db (patch)
tree8892bb8e1986479e51daf5bfe634aa7d6285d679 /core
parent6e0771665b3c9330fc0a5b2c7740a796b4cd712e (diff)
downloadspark-86e405f357711ae93935853a912bc13985c259db.tar.gz
spark-86e405f357711ae93935853a912bc13985c259db.tar.bz2
spark-86e405f357711ae93935853a912bc13985c259db.zip
[SPARK-12220][CORE] Make Utils.fetchFile support files that contain special characters
This PR encodes and decodes the file name to fix the issue. Author: Shixiong Zhu <shixiong@databricks.com> Closes #10208 from zsxwing/uri.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/HttpFileServer.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala26
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala11
5 files changed, 46 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
index 77d8ec9bb1..46f9f9e9af 100644
--- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
@@ -63,12 +63,12 @@ private[spark] class HttpFileServer(
def addFile(file: File) : String = {
addFileToDir(file, fileDir)
- serverUri + "/files/" + file.getName
+ serverUri + "/files/" + Utils.encodeFileNameToURIRawPath(file.getName)
}
def addJar(file: File) : String = {
addFileToDir(file, jarDir)
- serverUri + "/jars/" + file.getName
+ serverUri + "/jars/" + Utils.encodeFileNameToURIRawPath(file.getName)
}
def addDirectory(path: String, resourceBase: String): String = {
@@ -85,7 +85,7 @@ private[spark] class HttpFileServer(
throw new IllegalArgumentException(s"$file cannot be a directory.")
}
Files.copy(file, new File(dir, file.getName))
- dir + "/" + file.getName
+ dir + "/" + Utils.encodeFileNameToURIRawPath(file.getName)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
index ecd9697245..394cde4fa0 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.server.StreamManager
import org.apache.spark.rpc.RpcEnvFileServer
+import org.apache.spark.util.Utils
/**
* StreamManager implementation for serving files from a NettyRpcEnv.
@@ -64,13 +65,13 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
override def addFile(file: File): String = {
require(files.putIfAbsent(file.getName(), file) == null,
s"File ${file.getName()} already registered.")
- s"${rpcEnv.address.toSparkURL}/files/${file.getName()}"
+ s"${rpcEnv.address.toSparkURL}/files/${Utils.encodeFileNameToURIRawPath(file.getName())}"
}
override def addJar(file: File): String = {
require(jars.putIfAbsent(file.getName(), file) == null,
s"JAR ${file.getName()} already registered.")
- s"${rpcEnv.address.toSparkURL}/jars/${file.getName()}"
+ s"${rpcEnv.address.toSparkURL}/jars/${Utils.encodeFileNameToURIRawPath(file.getName())}"
}
override def addDirectory(baseUri: String, path: File): String = {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 9dbe66e7ee..fce89dfccf 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -331,6 +331,30 @@ private[spark] object Utils extends Logging {
}
/**
+ * A file name may contain some invalid URI characters, such as " ". This method will convert the
+ * file name to a raw path accepted by `java.net.URI(String)`.
+ *
+ * Note: the file name must not contain "/" or "\"
+ */
+ def encodeFileNameToURIRawPath(fileName: String): String = {
+ require(!fileName.contains("/") && !fileName.contains("\\"))
+ // `file` and `localhost` are not used. Just to prevent URI from parsing `fileName` as
+ // scheme or host. The prefix "/" is required because URI doesn't accept a relative path.
+ // We should remove it after we get the raw path.
+ new URI("file", null, "localhost", -1, "/" + fileName, null, null).getRawPath.substring(1)
+ }
+
+ /**
+ * Get the file name from uri's raw path and decode it. If the raw path of uri ends with "/",
+ * return the name before the last "/".
+ */
+ def decodeFileNameInURI(uri: URI): String = {
+ val rawPath = uri.getRawPath
+ val rawFileName = rawPath.split("/").last
+ new URI("file:///" + rawFileName).getPath.substring(1)
+ }
+
+ /**
* Download a file or directory to target directory. Supports fetching the file in a variety of
* ways, including HTTP, Hadoop-compatible filesystems, and files on a standard filesystem, based
* on the URL parameter. Fetching directories is only supported from Hadoop-compatible
@@ -351,7 +375,7 @@ private[spark] object Utils extends Logging {
hadoopConf: Configuration,
timestamp: Long,
useCache: Boolean) {
- val fileName = url.split("/").last
+ val fileName = decodeFileNameInURI(new URI(url))
val targetFile = new File(targetDir, fileName)
val fetchCacheEnabled = conf.getBoolean("spark.files.useFetchCache", defaultValue = true)
if (useCache && fetchCacheEnabled) {
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 6d153eb04e..49e3e0191c 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -771,6 +771,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val tempDir = Utils.createTempDir()
val file = new File(tempDir, "file")
Files.write(UUID.randomUUID().toString(), file, UTF_8)
+ val fileWithSpecialChars = new File(tempDir, "file name")
+ Files.write(UUID.randomUUID().toString(), fileWithSpecialChars, UTF_8)
val empty = new File(tempDir, "empty")
Files.write("", empty, UTF_8);
val jar = new File(tempDir, "jar")
@@ -787,6 +789,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
Files.write(UUID.randomUUID().toString(), subFile2, UTF_8)
val fileUri = env.fileServer.addFile(file)
+ val fileWithSpecialCharsUri = env.fileServer.addFile(fileWithSpecialChars)
val emptyUri = env.fileServer.addFile(empty)
val jarUri = env.fileServer.addJar(jar)
val dir1Uri = env.fileServer.addDirectory("/dir1", dir1)
@@ -805,6 +808,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val files = Seq(
(file, fileUri),
+ (fileWithSpecialChars, fileWithSpecialCharsUri),
(empty, emptyUri),
(jar, jarUri),
(subFile1, dir1Uri + "/file1"),
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 68b0da76bc..fdb51d440e 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -734,4 +734,15 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
conf.set("spark.executor.instances", "0")) === true)
}
+ test("encodeFileNameToURIRawPath") {
+ assert(Utils.encodeFileNameToURIRawPath("abc") === "abc")
+ assert(Utils.encodeFileNameToURIRawPath("abc xyz") === "abc%20xyz")
+ assert(Utils.encodeFileNameToURIRawPath("abc:xyz") === "abc:xyz")
+ }
+
+ test("decodeFileNameInURI") {
+ assert(Utils.decodeFileNameInURI(new URI("files:///abc/xyz")) === "xyz")
+ assert(Utils.decodeFileNameInURI(new URI("files:///abc")) === "abc")
+ assert(Utils.decodeFileNameInURI(new URI("files:///abc%20xyz")) === "abc xyz")
+ }
}