aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala63
1 files changed, 63 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
new file mode 100644
index 0000000000..eb1d2604fb
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.rpc.netty
+
+import java.io.File
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
+import org.apache.spark.network.server.StreamManager
+import org.apache.spark.rpc.RpcEnvFileServer
+
+/**
+ * StreamManager implementation for serving files from a NettyRpcEnv.
+ */
+private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
+ extends StreamManager with RpcEnvFileServer {
+
+ private val files = new ConcurrentHashMap[String, File]()
+ private val jars = new ConcurrentHashMap[String, File]()
+
+ override def getChunk(streamId: Long, chunkIndex: Int): ManagedBuffer = {
+ throw new UnsupportedOperationException()
+ }
+
+ override def openStream(streamId: String): ManagedBuffer = {
+ val Array(ftype, fname) = streamId.stripPrefix("/").split("/", 2)
+ val file = ftype match {
+ case "files" => files.get(fname)
+ case "jars" => jars.get(fname)
+ case _ => throw new IllegalArgumentException(s"Invalid file type: $ftype")
+ }
+
+ require(file != null, s"File not found: $streamId")
+ new FileSegmentManagedBuffer(rpcEnv.transportConf, file, 0, file.length())
+ }
+
+ override def addFile(file: File): String = {
+ require(files.putIfAbsent(file.getName(), file) == null,
+ s"File ${file.getName()} already registered.")
+ s"${rpcEnv.address.toSparkURL}/files/${file.getName()}"
+ }
+
+ override def addJar(file: File): String = {
+ require(jars.putIfAbsent(file.getName(), file) == null,
+ s"JAR ${file.getName()} already registered.")
+ s"${rpcEnv.address.toSparkURL}/jars/${file.getName()}"
+ }
+
+}