aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java/org
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-08-10 20:36:54 -0700
committerAaron Davidson <aaron@databricks.com>2014-08-10 20:36:54 -0700
commitba28a8fcbc3ba432e7ea4d6f0b535450a6ec96c6 (patch)
treec6d1b6369faaf454044986952d48dbc36c75a30c /core/src/main/java/org
parentb715aa0c8090cd57158ead2a1b35632cb98a6277 (diff)
downloadspark-ba28a8fcbc3ba432e7ea4d6f0b535450a6ec96c6.tar.gz
spark-ba28a8fcbc3ba432e7ea4d6f0b535450a6ec96c6.tar.bz2
spark-ba28a8fcbc3ba432e7ea4d6f0b535450a6ec96c6.zip
[SPARK-2936] Migrate Netty network module from Java to Scala
The Netty network module was originally written when Scala 2.9.x had a bug that prevents a pure Scala implementation, and a subset of the files were done in Java. We have since upgraded to Scala 2.10, and can migrate all Java files now to Scala. https://github.com/netty/netty/issues/781 https://github.com/mesos/spark/pull/522 Author: Reynold Xin <rxin@apache.org> Closes #1865 from rxin/netty and squashes the following commits: 332422f [Reynold Xin] Code review feedback ca9eeee [Reynold Xin] Minor update. 7f1434b [Reynold Xin] [SPARK-2936] Migrate Netty network module from Java to Scala
Diffstat (limited to 'core/src/main/java/org')
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileClient.java100
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java39
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java55
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileServer.java111
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java41
-rw-r--r--core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java83
-rwxr-xr-xcore/src/main/java/org/apache/spark/network/netty/PathResolver.java26
7 files changed, 0 insertions, 455 deletions
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClient.java b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
deleted file mode 100644
index 0d31894d6e..0000000000
--- a/core/src/main/java/org/apache/spark/network/netty/FileClient.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty;
-
-import java.util.concurrent.TimeUnit;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.oio.OioEventLoopGroup;
-import io.netty.channel.socket.oio.OioSocketChannel;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-class FileClient {
-
- private static final Logger LOG = LoggerFactory.getLogger(FileClient.class.getName());
-
- private final FileClientHandler handler;
- private Channel channel = null;
- private Bootstrap bootstrap = null;
- private EventLoopGroup group = null;
- private final int connectTimeout;
- private final int sendTimeout = 60; // 1 min
-
- FileClient(FileClientHandler handler, int connectTimeout) {
- this.handler = handler;
- this.connectTimeout = connectTimeout;
- }
-
- public void init() {
- group = new OioEventLoopGroup();
- bootstrap = new Bootstrap();
- bootstrap.group(group)
- .channel(OioSocketChannel.class)
- .option(ChannelOption.SO_KEEPALIVE, true)
- .option(ChannelOption.TCP_NODELAY, true)
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
- .handler(new FileClientChannelInitializer(handler));
- }
-
- public void connect(String host, int port) {
- try {
- // Start the connection attempt.
- channel = bootstrap.connect(host, port).sync().channel();
- // ChannelFuture cf = channel.closeFuture();
- //cf.addListener(new ChannelCloseListener(this));
- } catch (InterruptedException e) {
- LOG.warn("FileClient interrupted while trying to connect", e);
- close();
- }
- }
-
- public void waitForClose() {
- try {
- channel.closeFuture().sync();
- } catch (InterruptedException e) {
- LOG.warn("FileClient interrupted", e);
- }
- }
-
- public void sendRequest(String file) {
- //assert(file == null);
- //assert(channel == null);
- try {
- // Should be able to send the message to network link channel.
- boolean bSent = channel.writeAndFlush(file + "\r\n").await(sendTimeout, TimeUnit.SECONDS);
- if (!bSent) {
- throw new RuntimeException("Failed to send");
- }
- } catch (InterruptedException e) {
- LOG.error("Error", e);
- }
- }
-
- public void close() {
- if (group != null) {
- group.shutdownGracefully();
- group = null;
- bootstrap = null;
- }
- }
-}
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java b/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
deleted file mode 100644
index 264cf97d02..0000000000
--- a/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty;
-
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.string.StringEncoder;
-
-class FileClientChannelInitializer extends ChannelInitializer<SocketChannel> {
-
- private final FileClientHandler fhandler;
-
- FileClientChannelInitializer(FileClientHandler handler) {
- fhandler = handler;
- }
-
- @Override
- public void initChannel(SocketChannel channel) {
- // file no more than 2G
- channel.pipeline()
- .addLast("encoder", new StringEncoder())
- .addLast("handler", fhandler);
- }
-}
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java
deleted file mode 100644
index 63d3d92725..0000000000
--- a/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-
-import org.apache.spark.storage.BlockId;
-
-abstract class FileClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
-
- private FileHeader currentHeader = null;
-
- private volatile boolean handlerCalled = false;
-
- public boolean isComplete() {
- return handlerCalled;
- }
-
- public abstract void handle(ChannelHandlerContext ctx, ByteBuf in, FileHeader header);
- public abstract void handleError(BlockId blockId);
-
- @Override
- public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
- // get header
- if (currentHeader == null && in.readableBytes() >= FileHeader.HEADER_SIZE()) {
- currentHeader = FileHeader.create(in.readBytes(FileHeader.HEADER_SIZE()));
- }
- // get file
- if(in.readableBytes() >= currentHeader.fileLen()) {
- handle(ctx, in, currentHeader);
- handlerCalled = true;
- currentHeader = null;
- ctx.close();
- }
- }
-
-}
-
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServer.java b/core/src/main/java/org/apache/spark/network/netty/FileServer.java
deleted file mode 100644
index c93425e278..0000000000
--- a/core/src/main/java/org/apache/spark/network/netty/FileServer.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty;
-
-import java.net.InetSocketAddress;
-
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.oio.OioEventLoopGroup;
-import io.netty.channel.socket.oio.OioServerSocketChannel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Server that accept the path of a file an echo back its content.
- */
-class FileServer {
-
- private static final Logger LOG = LoggerFactory.getLogger(FileServer.class.getName());
-
- private EventLoopGroup bossGroup = null;
- private EventLoopGroup workerGroup = null;
- private ChannelFuture channelFuture = null;
- private int port = 0;
-
- FileServer(PathResolver pResolver, int port) {
- InetSocketAddress addr = new InetSocketAddress(port);
-
- // Configure the server.
- bossGroup = new OioEventLoopGroup();
- workerGroup = new OioEventLoopGroup();
-
- ServerBootstrap bootstrap = new ServerBootstrap();
- bootstrap.group(bossGroup, workerGroup)
- .channel(OioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, 100)
- .option(ChannelOption.SO_RCVBUF, 1500)
- .childHandler(new FileServerChannelInitializer(pResolver));
- // Start the server.
- channelFuture = bootstrap.bind(addr);
- try {
- // Get the address we bound to.
- InetSocketAddress boundAddress =
- ((InetSocketAddress) channelFuture.sync().channel().localAddress());
- this.port = boundAddress.getPort();
- } catch (InterruptedException ie) {
- this.port = 0;
- }
- }
-
- /**
- * Start the file server asynchronously in a new thread.
- */
- public void start() {
- Thread blockingThread = new Thread() {
- @Override
- public void run() {
- try {
- channelFuture.channel().closeFuture().sync();
- LOG.info("FileServer exiting");
- } catch (InterruptedException e) {
- LOG.error("File server start got interrupted", e);
- }
- // NOTE: bootstrap is shutdown in stop()
- }
- };
- blockingThread.setDaemon(true);
- blockingThread.start();
- }
-
- public int getPort() {
- return port;
- }
-
- public void stop() {
- // Close the bound channel.
- if (channelFuture != null) {
- channelFuture.channel().close().awaitUninterruptibly();
- channelFuture = null;
- }
-
- // Shutdown event groups
- if (bossGroup != null) {
- bossGroup.shutdownGracefully();
- bossGroup = null;
- }
-
- if (workerGroup != null) {
- workerGroup.shutdownGracefully();
- workerGroup = null;
- }
- // TODO: Shutdown all accepted channels as well ?
- }
-}
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java b/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
deleted file mode 100644
index 46efec8f8d..0000000000
--- a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty;
-
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.DelimiterBasedFrameDecoder;
-import io.netty.handler.codec.Delimiters;
-import io.netty.handler.codec.string.StringDecoder;
-
-class FileServerChannelInitializer extends ChannelInitializer<SocketChannel> {
-
- private final PathResolver pResolver;
-
- FileServerChannelInitializer(PathResolver pResolver) {
- this.pResolver = pResolver;
- }
-
- @Override
- public void initChannel(SocketChannel channel) {
- channel.pipeline()
- .addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()))
- .addLast("stringDecoder", new StringDecoder())
- .addLast("handler", new FileServerHandler(pResolver));
- }
-}
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
deleted file mode 100644
index c0133e19c7..0000000000
--- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty;
-
-import java.io.File;
-import java.io.FileInputStream;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.DefaultFileRegion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.spark.storage.BlockId;
-import org.apache.spark.storage.FileSegment;
-
-class FileServerHandler extends SimpleChannelInboundHandler<String> {
-
- private static final Logger LOG = LoggerFactory.getLogger(FileServerHandler.class.getName());
-
- private final PathResolver pResolver;
-
- FileServerHandler(PathResolver pResolver){
- this.pResolver = pResolver;
- }
-
- @Override
- public void channelRead0(ChannelHandlerContext ctx, String blockIdString) {
- BlockId blockId = BlockId.apply(blockIdString);
- FileSegment fileSegment = pResolver.getBlockLocation(blockId);
- // if getBlockLocation returns null, close the channel
- if (fileSegment == null) {
- //ctx.close();
- return;
- }
- File file = fileSegment.file();
- if (file.exists()) {
- if (!file.isFile()) {
- ctx.write(new FileHeader(0, blockId).buffer());
- ctx.flush();
- return;
- }
- long length = fileSegment.length();
- if (length > Integer.MAX_VALUE || length <= 0) {
- ctx.write(new FileHeader(0, blockId).buffer());
- ctx.flush();
- return;
- }
- int len = (int) length;
- ctx.write((new FileHeader(len, blockId)).buffer());
- try {
- ctx.write(new DefaultFileRegion(new FileInputStream(file)
- .getChannel(), fileSegment.offset(), fileSegment.length()));
- } catch (Exception e) {
- LOG.error("Exception: ", e);
- }
- } else {
- ctx.write(new FileHeader(0, blockId).buffer());
- }
- ctx.flush();
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- LOG.error("Exception: ", cause);
- ctx.close();
- }
-}
diff --git a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java
deleted file mode 100755
index 7ad8d03efb..0000000000
--- a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty;
-
-import org.apache.spark.storage.BlockId;
-import org.apache.spark.storage.FileSegment;
-
-public interface PathResolver {
- /** Get the file segment in which the given block resides. */
- FileSegment getBlockLocation(BlockId blockId);
-}