diff options
Diffstat (limited to 'common/network-common')
3 files changed, 5 insertions, 5 deletions
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java index 5a36e18b09..b5a9d6671f 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java @@ -94,7 +94,7 @@ public class TransportClientFactory implements Closeable { this.context = Preconditions.checkNotNull(context); this.conf = context.getConf(); this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps)); - this.connectionPool = new ConcurrentHashMap<SocketAddress, ClientPool>(); + this.connectionPool = new ConcurrentHashMap<>(); this.numConnectionsPerPeer = conf.numConnectionsPerPeer(); this.rand = new Random(); diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java index f0e2004d2d..8a69223c88 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java @@ -64,9 +64,9 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> { public TransportResponseHandler(Channel channel) { this.channel = channel; - this.outstandingFetches = new ConcurrentHashMap<StreamChunkId, ChunkReceivedCallback>(); - this.outstandingRpcs = new ConcurrentHashMap<Long, RpcResponseCallback>(); - this.streamCallbacks = new ConcurrentLinkedQueue<StreamCallback>(); + this.outstandingFetches = new ConcurrentHashMap<>(); + this.outstandingRpcs = new ConcurrentHashMap<>(); + this.streamCallbacks = new ConcurrentLinkedQueue<>(); this.timeOfLastRequestNs = new AtomicLong(0); } diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java index e2222ae085..ae7e520b2f 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java @@ -63,7 +63,7 @@ public class OneForOneStreamManager extends StreamManager { // For debugging purposes, start with a random stream id to help identifying different streams. // This does not need to be globally unique, only unique to this class. nextStreamId = new AtomicLong((long) new Random().nextInt(Integer.MAX_VALUE) * 1000); - streams = new ConcurrentHashMap<Long, StreamState>(); + streams = new ConcurrentHashMap<>(); } @Override |