diff options
Diffstat (limited to 'core/src/main/java')
7 files changed, 136 insertions, 19 deletions
diff --git a/core/src/main/java/spark/network/netty/FileClient.java b/core/src/main/java/spark/network/netty/FileClient.java index 3a62dacbc8..0625a6d502 100644 --- a/core/src/main/java/spark/network/netty/FileClient.java +++ b/core/src/main/java/spark/network/netty/FileClient.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package spark.network.netty; import io.netty.bootstrap.Bootstrap; @@ -8,15 +25,20 @@ import io.netty.channel.ChannelOption; import io.netty.channel.oio.OioEventLoopGroup; import io.netty.channel.socket.oio.OioSocketChannel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class FileClient { + private Logger LOG = LoggerFactory.getLogger(this.getClass().getName()); private FileClientHandler handler = null; private Channel channel = null; private Bootstrap bootstrap = null; + private int connectTimeout = 60*1000; // 1 min - public FileClient(FileClientHandler handler) { + public FileClient(FileClientHandler handler, int connectTimeout) { this.handler = handler; + this.connectTimeout = connectTimeout; } public void init() { @@ -25,25 +47,10 @@ class FileClient { .channel(OioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout) .handler(new FileClientChannelInitializer(handler)); } - public static final class ChannelCloseListener implements ChannelFutureListener { - private FileClient fc = null; - - public ChannelCloseListener(FileClient fc){ - this.fc = fc; - } - - @Override - public void operationComplete(ChannelFuture future) { - if (fc.bootstrap!=null){ - fc.bootstrap.shutdown(); - fc.bootstrap = null; - } - } - } - public void connect(String host, int port) { try { // Start the connection attempt. @@ -58,8 +65,8 @@ class FileClient { public void waitForClose() { try { channel.closeFuture().sync(); - } catch (InterruptedException e){ - e.printStackTrace(); + } catch (InterruptedException e) { + LOG.warn("FileClient interrupted", e); } } diff --git a/core/src/main/java/spark/network/netty/FileClientChannelInitializer.java b/core/src/main/java/spark/network/netty/FileClientChannelInitializer.java index af25baf641..05ad4b61d7 100644 --- a/core/src/main/java/spark/network/netty/FileClientChannelInitializer.java +++ b/core/src/main/java/spark/network/netty/FileClientChannelInitializer.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package spark.network.netty; import io.netty.buffer.BufType; diff --git a/core/src/main/java/spark/network/netty/FileClientHandler.java b/core/src/main/java/spark/network/netty/FileClientHandler.java index 2069dee5ca..e8cd9801f6 100644 --- a/core/src/main/java/spark/network/netty/FileClientHandler.java +++ b/core/src/main/java/spark/network/netty/FileClientHandler.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package spark.network.netty; import io.netty.buffer.ByteBuf; @@ -9,7 +26,14 @@ abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter { private FileHeader currentHeader = null; + private volatile boolean handlerCalled = false; + + public boolean isComplete() { + return handlerCalled; + } + public abstract void handle(ChannelHandlerContext ctx, ByteBuf in, FileHeader header); + public abstract void handleError(String blockId); @Override public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) { @@ -26,6 +50,7 @@ abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter { // get file if(in.readableBytes() >= currentHeader.fileLen()) { handle(ctx, in, currentHeader); + handlerCalled = true; currentHeader = null; ctx.close(); } diff --git a/core/src/main/java/spark/network/netty/FileServer.java b/core/src/main/java/spark/network/netty/FileServer.java index dd3a557ae5..9f009a61d5 100644 --- a/core/src/main/java/spark/network/netty/FileServer.java +++ b/core/src/main/java/spark/network/netty/FileServer.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package spark.network.netty; import java.net.InetSocketAddress; diff --git a/core/src/main/java/spark/network/netty/FileServerChannelInitializer.java b/core/src/main/java/spark/network/netty/FileServerChannelInitializer.java index 8f1f5c65cd..50c57a81a3 100644 --- a/core/src/main/java/spark/network/netty/FileServerChannelInitializer.java +++ b/core/src/main/java/spark/network/netty/FileServerChannelInitializer.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package spark.network.netty; import io.netty.channel.ChannelInitializer; diff --git a/core/src/main/java/spark/network/netty/FileServerHandler.java b/core/src/main/java/spark/network/netty/FileServerHandler.java index a78eddb1b5..176ba8da49 100644 --- a/core/src/main/java/spark/network/netty/FileServerHandler.java +++ b/core/src/main/java/spark/network/netty/FileServerHandler.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package spark.network.netty; import java.io.File; diff --git a/core/src/main/java/spark/network/netty/PathResolver.java b/core/src/main/java/spark/network/netty/PathResolver.java index 302411672c..f446c55b19 100755 --- a/core/src/main/java/spark/network/netty/PathResolver.java +++ b/core/src/main/java/spark/network/netty/PathResolver.java @@ -1,3 +1,20 @@ +/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package spark.network.netty;
|