aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java')
-rw-r--r--core/src/main/java/spark/network/netty/FileClient.java45
-rw-r--r--core/src/main/java/spark/network/netty/FileClientChannelInitializer.java17
-rw-r--r--core/src/main/java/spark/network/netty/FileClientHandler.java25
-rw-r--r--core/src/main/java/spark/network/netty/FileServer.java17
-rw-r--r--core/src/main/java/spark/network/netty/FileServerChannelInitializer.java17
-rw-r--r--core/src/main/java/spark/network/netty/FileServerHandler.java17
-rwxr-xr-xcore/src/main/java/spark/network/netty/PathResolver.java17
7 files changed, 136 insertions, 19 deletions
diff --git a/core/src/main/java/spark/network/netty/FileClient.java b/core/src/main/java/spark/network/netty/FileClient.java
index 3a62dacbc8..0625a6d502 100644
--- a/core/src/main/java/spark/network/netty/FileClient.java
+++ b/core/src/main/java/spark/network/netty/FileClient.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package spark.network.netty;
import io.netty.bootstrap.Bootstrap;
@@ -8,15 +25,20 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.oio.OioSocketChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class FileClient {
+ private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
private FileClientHandler handler = null;
private Channel channel = null;
private Bootstrap bootstrap = null;
+ private int connectTimeout = 60*1000; // 1 min
- public FileClient(FileClientHandler handler) {
+ public FileClient(FileClientHandler handler, int connectTimeout) {
this.handler = handler;
+ this.connectTimeout = connectTimeout;
}
public void init() {
@@ -25,25 +47,10 @@ class FileClient {
.channel(OioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
.handler(new FileClientChannelInitializer(handler));
}
- public static final class ChannelCloseListener implements ChannelFutureListener {
- private FileClient fc = null;
-
- public ChannelCloseListener(FileClient fc){
- this.fc = fc;
- }
-
- @Override
- public void operationComplete(ChannelFuture future) {
- if (fc.bootstrap!=null){
- fc.bootstrap.shutdown();
- fc.bootstrap = null;
- }
- }
- }
-
public void connect(String host, int port) {
try {
// Start the connection attempt.
@@ -58,8 +65,8 @@ class FileClient {
public void waitForClose() {
try {
channel.closeFuture().sync();
- } catch (InterruptedException e){
- e.printStackTrace();
+ } catch (InterruptedException e) {
+ LOG.warn("FileClient interrupted", e);
}
}
diff --git a/core/src/main/java/spark/network/netty/FileClientChannelInitializer.java b/core/src/main/java/spark/network/netty/FileClientChannelInitializer.java
index af25baf641..05ad4b61d7 100644
--- a/core/src/main/java/spark/network/netty/FileClientChannelInitializer.java
+++ b/core/src/main/java/spark/network/netty/FileClientChannelInitializer.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package spark.network.netty;
import io.netty.buffer.BufType;
diff --git a/core/src/main/java/spark/network/netty/FileClientHandler.java b/core/src/main/java/spark/network/netty/FileClientHandler.java
index 2069dee5ca..e8cd9801f6 100644
--- a/core/src/main/java/spark/network/netty/FileClientHandler.java
+++ b/core/src/main/java/spark/network/netty/FileClientHandler.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package spark.network.netty;
import io.netty.buffer.ByteBuf;
@@ -9,7 +26,14 @@ abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter {
private FileHeader currentHeader = null;
+ private volatile boolean handlerCalled = false;
+
+ public boolean isComplete() {
+ return handlerCalled;
+ }
+
public abstract void handle(ChannelHandlerContext ctx, ByteBuf in, FileHeader header);
+ public abstract void handleError(String blockId);
@Override
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) {
@@ -26,6 +50,7 @@ abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter {
// get file
if(in.readableBytes() >= currentHeader.fileLen()) {
handle(ctx, in, currentHeader);
+ handlerCalled = true;
currentHeader = null;
ctx.close();
}
diff --git a/core/src/main/java/spark/network/netty/FileServer.java b/core/src/main/java/spark/network/netty/FileServer.java
index dd3a557ae5..9f009a61d5 100644
--- a/core/src/main/java/spark/network/netty/FileServer.java
+++ b/core/src/main/java/spark/network/netty/FileServer.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package spark.network.netty;
import java.net.InetSocketAddress;
diff --git a/core/src/main/java/spark/network/netty/FileServerChannelInitializer.java b/core/src/main/java/spark/network/netty/FileServerChannelInitializer.java
index 8f1f5c65cd..50c57a81a3 100644
--- a/core/src/main/java/spark/network/netty/FileServerChannelInitializer.java
+++ b/core/src/main/java/spark/network/netty/FileServerChannelInitializer.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package spark.network.netty;
import io.netty.channel.ChannelInitializer;
diff --git a/core/src/main/java/spark/network/netty/FileServerHandler.java b/core/src/main/java/spark/network/netty/FileServerHandler.java
index a78eddb1b5..176ba8da49 100644
--- a/core/src/main/java/spark/network/netty/FileServerHandler.java
+++ b/core/src/main/java/spark/network/netty/FileServerHandler.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package spark.network.netty;
import java.io.File;
diff --git a/core/src/main/java/spark/network/netty/PathResolver.java b/core/src/main/java/spark/network/netty/PathResolver.java
index 302411672c..f446c55b19 100755
--- a/core/src/main/java/spark/network/netty/PathResolver.java
+++ b/core/src/main/java/spark/network/netty/PathResolver.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package spark.network.netty;