aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/apache')
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileClient.java89
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java41
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java60
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileServer.java103
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java42
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java82
-rwxr-xr-xcore/src/main/java/org/apache/spark/network/netty/PathResolver.java29
7 files changed, 446 insertions, 0 deletions
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClient.java b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
new file mode 100644
index 0000000000..20a7a3aa8c
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.oio.OioEventLoopGroup;
+import io.netty.channel.socket.oio.OioSocketChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class FileClient {
+
+ private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
+ private FileClientHandler handler = null;
+ private Channel channel = null;
+ private Bootstrap bootstrap = null;
+ private int connectTimeout = 60*1000; // 1 min
+
+ public FileClient(FileClientHandler handler, int connectTimeout) {
+ this.handler = handler;
+ this.connectTimeout = connectTimeout;
+ }
+
+ public void init() {
+ bootstrap = new Bootstrap();
+ bootstrap.group(new OioEventLoopGroup())
+ .channel(OioSocketChannel.class)
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
+ .handler(new FileClientChannelInitializer(handler));
+ }
+
+ public void connect(String host, int port) {
+ try {
+ // Start the connection attempt.
+ channel = bootstrap.connect(host, port).sync().channel();
+ // ChannelFuture cf = channel.closeFuture();
+ //cf.addListener(new ChannelCloseListener(this));
+ } catch (InterruptedException e) {
+ close();
+ }
+ }
+
+ public void waitForClose() {
+ try {
+ channel.closeFuture().sync();
+ } catch (InterruptedException e) {
+ LOG.warn("FileClient interrupted", e);
+ }
+ }
+
+ public void sendRequest(String file) {
+ //assert(file == null);
+ //assert(channel == null);
+ channel.write(file + "\r\n");
+ }
+
+ public void close() {
+ if(channel != null) {
+ channel.close();
+ channel = null;
+ }
+ if ( bootstrap!=null) {
+ bootstrap.shutdown();
+ bootstrap = null;
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java b/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
new file mode 100644
index 0000000000..65ee15d63b
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty;
+
+import io.netty.buffer.BufType;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.string.StringEncoder;
+
+
+class FileClientChannelInitializer extends ChannelInitializer<SocketChannel> {
+
+ private FileClientHandler fhandler;
+
+ public FileClientChannelInitializer(FileClientHandler handler) {
+ fhandler = handler;
+ }
+
+ @Override
+ public void initChannel(SocketChannel channel) {
+ // file no more than 2G
+ channel.pipeline()
+ .addLast("encoder", new StringEncoder(BufType.BYTE))
+ .addLast("handler", fhandler);
+ }
+}
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java
new file mode 100644
index 0000000000..c4aa2669e0
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundByteHandlerAdapter;
+
+
+abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter {
+
+ private FileHeader currentHeader = null;
+
+ private volatile boolean handlerCalled = false;
+
+ public boolean isComplete() {
+ return handlerCalled;
+ }
+
+ public abstract void handle(ChannelHandlerContext ctx, ByteBuf in, FileHeader header);
+ public abstract void handleError(String blockId);
+
+ @Override
+ public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) {
+ // Use direct buffer if possible.
+ return ctx.alloc().ioBuffer();
+ }
+
+ @Override
+ public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) {
+ // get header
+ if (currentHeader == null && in.readableBytes() >= FileHeader.HEADER_SIZE()) {
+ currentHeader = FileHeader.create(in.readBytes(FileHeader.HEADER_SIZE()));
+ }
+ // get file
+ if(in.readableBytes() >= currentHeader.fileLen()) {
+ handle(ctx, in, currentHeader);
+ handlerCalled = true;
+ currentHeader = null;
+ ctx.close();
+ }
+ }
+
+}
+
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServer.java b/core/src/main/java/org/apache/spark/network/netty/FileServer.java
new file mode 100644
index 0000000000..666432474d
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServer.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty;
+
+import java.net.InetSocketAddress;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.oio.OioEventLoopGroup;
+import io.netty.channel.socket.oio.OioServerSocketChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Server that accept the path of a file an echo back its content.
+ */
+class FileServer {
+
+ private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
+
+ private ServerBootstrap bootstrap = null;
+ private ChannelFuture channelFuture = null;
+ private int port = 0;
+ private Thread blockingThread = null;
+
+ public FileServer(PathResolver pResolver, int port) {
+ InetSocketAddress addr = new InetSocketAddress(port);
+
+ // Configure the server.
+ bootstrap = new ServerBootstrap();
+ bootstrap.group(new OioEventLoopGroup(), new OioEventLoopGroup())
+ .channel(OioServerSocketChannel.class)
+ .option(ChannelOption.SO_BACKLOG, 100)
+ .option(ChannelOption.SO_RCVBUF, 1500)
+ .childHandler(new FileServerChannelInitializer(pResolver));
+ // Start the server.
+ channelFuture = bootstrap.bind(addr);
+ try {
+ // Get the address we bound to.
+ InetSocketAddress boundAddress =
+ ((InetSocketAddress) channelFuture.sync().channel().localAddress());
+ this.port = boundAddress.getPort();
+ } catch (InterruptedException ie) {
+ this.port = 0;
+ }
+ }
+
+ /**
+ * Start the file server asynchronously in a new thread.
+ */
+ public void start() {
+ blockingThread = new Thread() {
+ public void run() {
+ try {
+ channelFuture.channel().closeFuture().sync();
+ LOG.info("FileServer exiting");
+ } catch (InterruptedException e) {
+ LOG.error("File server start got interrupted", e);
+ }
+ // NOTE: bootstrap is shutdown in stop()
+ }
+ };
+ blockingThread.setDaemon(true);
+ blockingThread.start();
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void stop() {
+ // Close the bound channel.
+ if (channelFuture != null) {
+ channelFuture.channel().close();
+ channelFuture = null;
+ }
+ // Shutdown bootstrap.
+ if (bootstrap != null) {
+ bootstrap.shutdown();
+ bootstrap = null;
+ }
+ // TODO: Shutdown all accepted channels as well ?
+ }
+}
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java b/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
new file mode 100644
index 0000000000..833af1632d
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.DelimiterBasedFrameDecoder;
+import io.netty.handler.codec.Delimiters;
+import io.netty.handler.codec.string.StringDecoder;
+
+
+class FileServerChannelInitializer extends ChannelInitializer<SocketChannel> {
+
+ PathResolver pResolver;
+
+ public FileServerChannelInitializer(PathResolver pResolver) {
+ this.pResolver = pResolver;
+ }
+
+ @Override
+ public void initChannel(SocketChannel channel) {
+ channel.pipeline()
+ .addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()))
+ .addLast("strDecoder", new StringDecoder())
+ .addLast("handler", new FileServerHandler(pResolver));
+ }
+}
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
new file mode 100644
index 0000000000..d3d57a0255
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty;
+
+import java.io.File;
+import java.io.FileInputStream;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundMessageHandlerAdapter;
+import io.netty.channel.DefaultFileRegion;
+
+
+class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
+
+ PathResolver pResolver;
+
+ public FileServerHandler(PathResolver pResolver){
+ this.pResolver = pResolver;
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, String blockId) {
+ String path = pResolver.getAbsolutePath(blockId);
+ // if getFilePath returns null, close the channel
+ if (path == null) {
+ //ctx.close();
+ return;
+ }
+ File file = new File(path);
+ if (file.exists()) {
+ if (!file.isFile()) {
+ //logger.info("Not a file : " + file.getAbsolutePath());
+ ctx.write(new FileHeader(0, blockId).buffer());
+ ctx.flush();
+ return;
+ }
+ long length = file.length();
+ if (length > Integer.MAX_VALUE || length <= 0) {
+ //logger.info("too large file : " + file.getAbsolutePath() + " of size "+ length);
+ ctx.write(new FileHeader(0, blockId).buffer());
+ ctx.flush();
+ return;
+ }
+ int len = new Long(length).intValue();
+ //logger.info("Sending block "+blockId+" filelen = "+len);
+ //logger.info("header = "+ (new FileHeader(len, blockId)).buffer());
+ ctx.write((new FileHeader(len, blockId)).buffer());
+ try {
+ ctx.sendFile(new DefaultFileRegion(new FileInputStream(file)
+ .getChannel(), 0, file.length()));
+ } catch (Exception e) {
+ //logger.warning("Exception when sending file : " + file.getAbsolutePath());
+ e.printStackTrace();
+ }
+ } else {
+ //logger.warning("File not found: " + file.getAbsolutePath());
+ ctx.write(new FileHeader(0, blockId).buffer());
+ }
+ ctx.flush();
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ cause.printStackTrace();
+ ctx.close();
+ }
+}
diff --git a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java
new file mode 100755
index 0000000000..94c034cad0
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty;
+
+
+public interface PathResolver {
+ /**
+ * Get the absolute path of the file
+ *
+ * @param fileId
+ * @return the absolute path of file
+ */
+ public String getAbsolutePath(String fileId);
+}