aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/FileClient.scala85
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/FileClientChannelInitializer.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/FileClientHandler.scala50
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/FileServer.scala91
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/FileServerChannelInitializer.scala34
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/FileServerHandler.scala68
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/PathResolver.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala2
9 files changed, 387 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/netty/FileClient.scala b/core/src/main/scala/org/apache/spark/network/netty/FileClient.scala
new file mode 100644
index 0000000000..c6d35f73db
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/netty/FileClient.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import java.util.concurrent.TimeUnit
+
+import io.netty.bootstrap.Bootstrap
+import io.netty.channel.{Channel, ChannelOption, EventLoopGroup}
+import io.netty.channel.oio.OioEventLoopGroup
+import io.netty.channel.socket.oio.OioSocketChannel
+
+import org.apache.spark.Logging
+
+class FileClient(handler: FileClientHandler, connectTimeout: Int) extends Logging {
+
+ private var channel: Channel = _
+ private var bootstrap: Bootstrap = _
+ private var group: EventLoopGroup = _
+ private val sendTimeout = 60
+
+ def init(): Unit = {
+ group = new OioEventLoopGroup
+ bootstrap = new Bootstrap
+ bootstrap.group(group)
+ .channel(classOf[OioSocketChannel])
+ .option(ChannelOption.SO_KEEPALIVE, java.lang.Boolean.TRUE)
+ .option(ChannelOption.TCP_NODELAY, java.lang.Boolean.TRUE)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(connectTimeout))
+ .handler(new FileClientChannelInitializer(handler))
+ }
+
+ def connect(host: String, port: Int) {
+ try {
+ channel = bootstrap.connect(host, port).sync().channel()
+ } catch {
+ case e: InterruptedException =>
+ logWarning("FileClient interrupted while trying to connect", e)
+ close()
+ }
+ }
+
+ def waitForClose(): Unit = {
+ try {
+ channel.closeFuture.sync()
+ } catch {
+ case e: InterruptedException =>
+ logWarning("FileClient interrupted", e)
+ }
+ }
+
+ def sendRequest(file: String): Unit = {
+ try {
+ val bSent = channel.writeAndFlush(file + "\r\n").await(sendTimeout, TimeUnit.SECONDS)
+ if (!bSent) {
+ throw new RuntimeException("Failed to send")
+ }
+ } catch {
+ case e: InterruptedException =>
+ logError("Error", e)
+ }
+ }
+
+ def close(): Unit = {
+ if (group != null) {
+ group.shutdownGracefully()
+ group = null
+ bootstrap = null
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/network/netty/FileClientChannelInitializer.scala b/core/src/main/scala/org/apache/spark/network/netty/FileClientChannelInitializer.scala
new file mode 100644
index 0000000000..f4261c13f7
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/netty/FileClientChannelInitializer.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import io.netty.channel.ChannelInitializer
+import io.netty.channel.socket.SocketChannel
+import io.netty.handler.codec.string.StringEncoder
+
+
+class FileClientChannelInitializer(handler: FileClientHandler)
+ extends ChannelInitializer[SocketChannel] {
+
+ def initChannel(channel: SocketChannel) {
+ channel.pipeline.addLast("encoder", new StringEncoder).addLast("handler", handler)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/network/netty/FileClientHandler.scala b/core/src/main/scala/org/apache/spark/network/netty/FileClientHandler.scala
new file mode 100644
index 0000000000..017302ec7d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/netty/FileClientHandler.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import io.netty.buffer.ByteBuf
+import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
+
+import org.apache.spark.storage.BlockId
+
+
+abstract class FileClientHandler extends SimpleChannelInboundHandler[ByteBuf] {
+
+ private var currentHeader: FileHeader = null
+
+ @volatile
+ private var handlerCalled: Boolean = false
+
+ def isComplete: Boolean = handlerCalled
+
+ def handle(ctx: ChannelHandlerContext, in: ByteBuf, header: FileHeader)
+
+ def handleError(blockId: BlockId)
+
+ override def channelRead0(ctx: ChannelHandlerContext, in: ByteBuf) {
+ if (currentHeader == null && in.readableBytes >= FileHeader.HEADER_SIZE) {
+ currentHeader = FileHeader.create(in.readBytes(FileHeader.HEADER_SIZE))
+ }
+ if (in.readableBytes >= currentHeader.fileLen) {
+ handle(ctx, in, currentHeader)
+ handlerCalled = true
+ currentHeader = null
+ ctx.close()
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala b/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
index 136c191204..607e560ff2 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
@@ -26,7 +26,7 @@ private[spark] class FileHeader (
val fileLen: Int,
val blockId: BlockId) extends Logging {
- lazy val buffer = {
+ lazy val buffer: ByteBuf = {
val buf = Unpooled.buffer()
buf.capacity(FileHeader.HEADER_SIZE)
buf.writeInt(fileLen)
@@ -62,11 +62,10 @@ private[spark] object FileHeader {
new FileHeader(length, blockId)
}
- def main (args:Array[String]) {
+ def main(args:Array[String]) {
val header = new FileHeader(25, TestBlockId("my_block"))
val buf = header.buffer
val newHeader = FileHeader.create(buf)
System.out.println("id=" + newHeader.blockId + ",size=" + newHeader.fileLen)
}
}
-
diff --git a/core/src/main/scala/org/apache/spark/network/netty/FileServer.scala b/core/src/main/scala/org/apache/spark/network/netty/FileServer.scala
new file mode 100644
index 0000000000..dff7795065
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/netty/FileServer.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import java.net.InetSocketAddress
+
+import io.netty.bootstrap.ServerBootstrap
+import io.netty.channel.{ChannelFuture, ChannelOption, EventLoopGroup}
+import io.netty.channel.oio.OioEventLoopGroup
+import io.netty.channel.socket.oio.OioServerSocketChannel
+
+import org.apache.spark.Logging
+
+/**
+ * Server that accept the path of a file an echo back its content.
+ */
+class FileServer(pResolver: PathResolver, private var port: Int) extends Logging {
+
+ private val addr: InetSocketAddress = new InetSocketAddress(port)
+ private var bossGroup: EventLoopGroup = new OioEventLoopGroup
+ private var workerGroup: EventLoopGroup = new OioEventLoopGroup
+
+ private var channelFuture: ChannelFuture = {
+ val bootstrap = new ServerBootstrap
+ bootstrap.group(bossGroup, workerGroup)
+ .channel(classOf[OioServerSocketChannel])
+ .option(ChannelOption.SO_BACKLOG, java.lang.Integer.valueOf(100))
+ .option(ChannelOption.SO_RCVBUF, java.lang.Integer.valueOf(1500))
+ .childHandler(new FileServerChannelInitializer(pResolver))
+ bootstrap.bind(addr)
+ }
+
+ try {
+ val boundAddress = channelFuture.sync.channel.localAddress.asInstanceOf[InetSocketAddress]
+ port = boundAddress.getPort
+ } catch {
+ case ie: InterruptedException =>
+ port = 0
+ }
+
+ /** Start the file server asynchronously in a new thread. */
+ def start(): Unit = {
+ val blockingThread: Thread = new Thread {
+ override def run(): Unit = {
+ try {
+ channelFuture.channel.closeFuture.sync
+ logInfo("FileServer exiting")
+ } catch {
+ case e: InterruptedException =>
+ logError("File server start got interrupted", e)
+ }
+ // NOTE: bootstrap is shutdown in stop()
+ }
+ }
+ blockingThread.setDaemon(true)
+ blockingThread.start()
+ }
+
+ def getPort: Int = port
+
+ def stop(): Unit = {
+ if (channelFuture != null) {
+ channelFuture.channel().close().awaitUninterruptibly()
+ channelFuture = null
+ }
+ if (bossGroup != null) {
+ bossGroup.shutdownGracefully()
+ bossGroup = null
+ }
+ if (workerGroup != null) {
+ workerGroup.shutdownGracefully()
+ workerGroup = null
+ }
+ }
+}
+
diff --git a/core/src/main/scala/org/apache/spark/network/netty/FileServerChannelInitializer.scala b/core/src/main/scala/org/apache/spark/network/netty/FileServerChannelInitializer.scala
new file mode 100644
index 0000000000..aaa2f913d0
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/netty/FileServerChannelInitializer.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import io.netty.channel.ChannelInitializer
+import io.netty.channel.socket.SocketChannel
+import io.netty.handler.codec.{DelimiterBasedFrameDecoder, Delimiters}
+import io.netty.handler.codec.string.StringDecoder
+
+class FileServerChannelInitializer(pResolver: PathResolver)
+ extends ChannelInitializer[SocketChannel] {
+
+ override def initChannel(channel: SocketChannel): Unit = {
+ channel.pipeline
+ .addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter : _*))
+ .addLast("stringDecoder", new StringDecoder)
+ .addLast("handler", new FileServerHandler(pResolver))
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/network/netty/FileServerHandler.scala b/core/src/main/scala/org/apache/spark/network/netty/FileServerHandler.scala
new file mode 100644
index 0000000000..96f60b2883
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/netty/FileServerHandler.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import java.io.FileInputStream
+
+import io.netty.channel.{DefaultFileRegion, ChannelHandlerContext, SimpleChannelInboundHandler}
+
+import org.apache.spark.Logging
+import org.apache.spark.storage.{BlockId, FileSegment}
+
+
+class FileServerHandler(pResolver: PathResolver)
+ extends SimpleChannelInboundHandler[String] with Logging {
+
+ override def channelRead0(ctx: ChannelHandlerContext, blockIdString: String): Unit = {
+ val blockId: BlockId = BlockId(blockIdString)
+ val fileSegment: FileSegment = pResolver.getBlockLocation(blockId)
+ if (fileSegment == null) {
+ return
+ }
+ val file = fileSegment.file
+ if (file.exists) {
+ if (!file.isFile) {
+ ctx.write(new FileHeader(0, blockId).buffer)
+ ctx.flush()
+ return
+ }
+ val length: Long = fileSegment.length
+ if (length > Integer.MAX_VALUE || length <= 0) {
+ ctx.write(new FileHeader(0, blockId).buffer)
+ ctx.flush()
+ return
+ }
+ ctx.write(new FileHeader(length.toInt, blockId).buffer)
+ try {
+ val channel = new FileInputStream(file).getChannel
+ ctx.write(new DefaultFileRegion(channel, fileSegment.offset, fileSegment.length))
+ } catch {
+ case e: Exception =>
+ logError("Exception: ", e)
+ }
+ } else {
+ ctx.write(new FileHeader(0, blockId).buffer)
+ }
+ ctx.flush()
+ }
+
+ override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
+ logError("Exception: ", cause)
+ ctx.close()
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/network/netty/PathResolver.scala b/core/src/main/scala/org/apache/spark/network/netty/PathResolver.scala
new file mode 100644
index 0000000000..0d7695072a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/netty/PathResolver.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import org.apache.spark.storage.{BlockId, FileSegment}
+
+trait PathResolver {
+ /** Get the file segment in which the given block resides. */
+ def getBlockLocation(blockId: BlockId): FileSegment
+}
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
index 7ef7aecc6a..95958e30f7 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
@@ -32,7 +32,7 @@ private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver) ext
server.stop()
}
- def port: Int = server.getPort()
+ def port: Int = server.getPort
}