aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala22
1 files changed, 21 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
index a2768b4252..ecd9697245 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
@@ -25,12 +25,22 @@ import org.apache.spark.rpc.RpcEnvFileServer
/**
* StreamManager implementation for serving files from a NettyRpcEnv.
+ *
+ * Three kinds of resources can be registered in this manager, all backed by actual files:
+ *
+ * - "/files": a flat list of files; used as the backend for [[SparkContext.addFile]].
+ * - "/jars": a flat list of files; used as the backend for [[SparkContext.addJar]].
+ * - arbitrary directories; all files under the directory become available through the manager,
+ * respecting the directory's hierarchy.
+ *
+ * Only streaming (openStream) is supported.
*/
private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
extends StreamManager with RpcEnvFileServer {
private val files = new ConcurrentHashMap[String, File]()
private val jars = new ConcurrentHashMap[String, File]()
+ private val dirs = new ConcurrentHashMap[String, File]()
override def getChunk(streamId: Long, chunkIndex: Int): ManagedBuffer = {
throw new UnsupportedOperationException()
@@ -41,7 +51,10 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
val file = ftype match {
case "files" => files.get(fname)
case "jars" => jars.get(fname)
- case _ => throw new IllegalArgumentException(s"Invalid file type: $ftype")
+ case other =>
+ val dir = dirs.get(ftype)
+ require(dir != null, s"Invalid stream URI: $ftype not found.")
+ new File(dir, fname)
}
require(file != null && file.isFile(), s"File not found: $streamId")
@@ -60,4 +73,11 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
s"${rpcEnv.address.toSparkURL}/jars/${file.getName()}"
}
+ override def addDirectory(baseUri: String, path: File): String = {
+ val fixedBaseUri = validateDirectoryUri(baseUri)
+ require(dirs.putIfAbsent(fixedBaseUri.stripPrefix("/"), path) == null,
+ s"URI '$fixedBaseUri' already registered.")
+ s"${rpcEnv.address.toSparkURL}$fixedBaseUri"
+ }
+
}