aboutsummaryrefslogtreecommitdiff
path: root/common
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2017-02-16 12:32:45 +0000
committerSean Owen <sowen@cloudera.com>2017-02-16 12:32:45 +0000
commit0e2405490f2056728d1353abbac6f3ea177ae533 (patch)
tree1a9ec960faec7abcb8d8fbac43b6a6dc633d2297 /common
parent3871d94a695d47169720e877f77ff1e4bede43ee (diff)
downloadspark-0e2405490f2056728d1353abbac6f3ea177ae533.tar.gz
spark-0e2405490f2056728d1353abbac6f3ea177ae533.tar.bz2
spark-0e2405490f2056728d1353abbac6f3ea177ae533.zip
[SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support
- Move external/java8-tests tests into core, streaming, sql and remove - Remove MaxPermGen and related options - Fix some reflection / TODOs around Java 8+ methods - Update doc references to 1.7/1.8 differences - Remove Java 7/8 related build profiles - Update some plugins for better Java 8 compatibility - Fix a few Java-related warnings For the future: - Update Java 8 examples to fully use Java 8 - Update Java tests to use lambdas for simplicity - Update Java internal implementations to use lambdas ## How was this patch tested? Existing tests Author: Sean Owen <sowen@cloudera.com> Closes #16871 from srowen/SPARK-19493.
Diffstat (limited to 'common')
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java111
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java16
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java3
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java27
-rw-r--r--common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java2
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java8
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java7
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java21
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java9
-rw-r--r--common/sketch/pom.xml2
-rw-r--r--common/unsafe/pom.xml2
-rw-r--r--common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java9
-rw-r--r--common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java88
13 files changed, 128 insertions, 177 deletions
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
index 7e7d78d42a..a6f527c118 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
@@ -32,8 +32,6 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.SettableFuture;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -133,40 +131,36 @@ public class TransportClient implements Closeable {
*/
public void fetchChunk(
long streamId,
- final int chunkIndex,
- final ChunkReceivedCallback callback) {
- final long startTime = System.currentTimeMillis();
+ int chunkIndex,
+ ChunkReceivedCallback callback) {
+ long startTime = System.currentTimeMillis();
if (logger.isDebugEnabled()) {
logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel));
}
- final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
+ StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
handler.addFetchRequest(streamChunkId, callback);
- channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(
- new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- long timeTaken = System.currentTimeMillis() - startTime;
- if (logger.isTraceEnabled()) {
- logger.trace("Sending request {} to {} took {} ms", streamChunkId,
- getRemoteAddress(channel), timeTaken);
- }
- } else {
- String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
- getRemoteAddress(channel), future.cause());
- logger.error(errorMsg, future.cause());
- handler.removeFetchRequest(streamChunkId);
- channel.close();
- try {
- callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));
- } catch (Exception e) {
- logger.error("Uncaught exception in RPC response callback handler!", e);
- }
- }
+ channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(future -> {
+ if (future.isSuccess()) {
+ long timeTaken = System.currentTimeMillis() - startTime;
+ if (logger.isTraceEnabled()) {
+ logger.trace("Sending request {} to {} took {} ms", streamChunkId,
+ getRemoteAddress(channel), timeTaken);
}
- });
+ } else {
+ String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
+ getRemoteAddress(channel), future.cause());
+ logger.error(errorMsg, future.cause());
+ handler.removeFetchRequest(streamChunkId);
+ channel.close();
+ try {
+ callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));
+ } catch (Exception e) {
+ logger.error("Uncaught exception in RPC response callback handler!", e);
+ }
+ }
+ });
}
/**
@@ -175,8 +169,8 @@ public class TransportClient implements Closeable {
* @param streamId The stream to fetch.
* @param callback Object to call with the stream data.
*/
- public void stream(final String streamId, final StreamCallback callback) {
- final long startTime = System.currentTimeMillis();
+ public void stream(String streamId, StreamCallback callback) {
+ long startTime = System.currentTimeMillis();
if (logger.isDebugEnabled()) {
logger.debug("Sending stream request for {} to {}", streamId, getRemoteAddress(channel));
}
@@ -186,29 +180,25 @@ public class TransportClient implements Closeable {
// when responses arrive.
synchronized (this) {
handler.addStreamCallback(callback);
- channel.writeAndFlush(new StreamRequest(streamId)).addListener(
- new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- long timeTaken = System.currentTimeMillis() - startTime;
- if (logger.isTraceEnabled()) {
- logger.trace("Sending request for {} to {} took {} ms", streamId,
- getRemoteAddress(channel), timeTaken);
- }
- } else {
- String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,
- getRemoteAddress(channel), future.cause());
- logger.error(errorMsg, future.cause());
- channel.close();
- try {
- callback.onFailure(streamId, new IOException(errorMsg, future.cause()));
- } catch (Exception e) {
- logger.error("Uncaught exception in RPC response callback handler!", e);
- }
- }
+ channel.writeAndFlush(new StreamRequest(streamId)).addListener(future -> {
+ if (future.isSuccess()) {
+ long timeTaken = System.currentTimeMillis() - startTime;
+ if (logger.isTraceEnabled()) {
+ logger.trace("Sending request for {} to {} took {} ms", streamId,
+ getRemoteAddress(channel), timeTaken);
}
- });
+ } else {
+ String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,
+ getRemoteAddress(channel), future.cause());
+ logger.error(errorMsg, future.cause());
+ channel.close();
+ try {
+ callback.onFailure(streamId, new IOException(errorMsg, future.cause()));
+ } catch (Exception e) {
+ logger.error("Uncaught exception in RPC response callback handler!", e);
+ }
+ }
+ });
}
}
@@ -220,19 +210,17 @@ public class TransportClient implements Closeable {
* @param callback Callback to handle the RPC's reply.
* @return The RPC's id.
*/
- public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
- final long startTime = System.currentTimeMillis();
+ public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {
+ long startTime = System.currentTimeMillis();
if (logger.isTraceEnabled()) {
logger.trace("Sending RPC to {}", getRemoteAddress(channel));
}
- final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
+ long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
handler.addRpcRequest(requestId, callback);
- channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))).addListener(
- new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
+ channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message)))
+ .addListener(future -> {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
if (logger.isTraceEnabled()) {
@@ -251,8 +239,7 @@ public class TransportClient implements Closeable {
logger.error("Uncaught exception in RPC response callback handler!", e);
}
}
- }
- });
+ });
return requestId;
}
diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java
index 980525dbf0..799f4540aa 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java
@@ -20,12 +20,7 @@ package org.apache.spark.network.crypto;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.GeneralSecurityException;
-import java.security.Key;
-import javax.crypto.KeyGenerator;
-import javax.crypto.Mac;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -37,7 +32,6 @@ import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientBootstrap;
import org.apache.spark.network.sasl.SaslClientBootstrap;
import org.apache.spark.network.sasl.SecretKeyHolder;
-import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.TransportConf;
/**
@@ -103,20 +97,18 @@ public class AuthClientBootstrap implements TransportClientBootstrap {
private void doSparkAuth(TransportClient client, Channel channel)
throws GeneralSecurityException, IOException {
- AuthEngine engine = new AuthEngine(authUser, secretKeyHolder.getSecretKey(authUser), conf);
- try {
+ String secretKey = secretKeyHolder.getSecretKey(authUser);
+ try (AuthEngine engine = new AuthEngine(authUser, secretKey, conf)) {
ClientChallenge challenge = engine.challenge();
ByteBuf challengeData = Unpooled.buffer(challenge.encodedLength());
challenge.encode(challengeData);
- ByteBuffer responseData = client.sendRpcSync(challengeData.nioBuffer(),
- conf.authRTTimeoutMs());
+ ByteBuffer responseData =
+ client.sendRpcSync(challengeData.nioBuffer(), conf.authRTTimeoutMs());
ServerResponse response = ServerResponse.decodeMessage(responseData);
engine.validate(response);
engine.sessionCipher().addToChannel(channel);
- } finally {
- engine.close();
}
}
diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
index 991d8ba95f..0a5c029940 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
@@ -17,9 +17,7 @@
package org.apache.spark.network.crypto;
-import java.io.IOException;
import java.nio.ByteBuffer;
-import javax.security.sasl.Sasl;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
@@ -35,7 +33,6 @@ import org.apache.spark.network.sasl.SecretKeyHolder;
import org.apache.spark.network.sasl.SaslRpcHandler;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
-import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.TransportConf;
/**
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index 900e8eb255..8193bc1376 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -22,8 +22,6 @@ import java.nio.ByteBuffer;
import com.google.common.base.Throwables;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -189,21 +187,16 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
* Responds to a single message with some Encodable object. If a failure occurs while sending,
* it will be logged and the channel closed.
*/
- private void respond(final Encodable result) {
- final SocketAddress remoteAddress = channel.remoteAddress();
- channel.writeAndFlush(result).addListener(
- new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- logger.trace("Sent result {} to client {}", result, remoteAddress);
- } else {
- logger.error(String.format("Error sending result %s to %s; closing connection",
- result, remoteAddress), future.cause());
- channel.close();
- }
- }
+ private void respond(Encodable result) {
+ SocketAddress remoteAddress = channel.remoteAddress();
+ channel.writeAndFlush(result).addListener(future -> {
+ if (future.isSuccess()) {
+ logger.trace("Sent result {} to client {}", result, remoteAddress);
+ } else {
+ logger.error(String.format("Error sending result %s to %s; closing connection",
+ result, remoteAddress), future.cause());
+ channel.close();
}
- );
+ });
}
}
diff --git a/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java
index 9a186f2113..a3519fe4a4 100644
--- a/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java
+++ b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java
@@ -18,10 +18,8 @@
package org.apache.spark.network.crypto;
import java.util.Arrays;
-import java.util.Map;
import static java.nio.charset.StandardCharsets.UTF_8;
-import com.google.common.collect.ImmutableMap;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index 6e02430a8e..6daf9609d7 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -190,12 +190,8 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
allMetrics.put("openBlockRequestLatencyMillis", openBlockRequestLatencyMillis);
allMetrics.put("registerExecutorRequestLatencyMillis", registerExecutorRequestLatencyMillis);
allMetrics.put("blockTransferRateBytes", blockTransferRateBytes);
- allMetrics.put("registeredExecutorsSize", new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return blockManager.getRegisteredExecutorsSize();
- }
- });
+ allMetrics.put("registeredExecutorsSize",
+ (Gauge<Integer>) () -> blockManager.getRegisteredExecutorsSize());
}
@Override
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index 25e9abde70..62d58aba4c 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -205,12 +205,7 @@ public class ExternalShuffleBlockResolver {
logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length);
// Execute the actual deletion in a different thread, as it may take some time.
- directoryCleaner.execute(new Runnable() {
- @Override
- public void run() {
- deleteExecutorDirs(executor.localDirs);
- }
- });
+ directoryCleaner.execute(() -> deleteExecutorDirs(executor.localDirs));
}
}
}
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index 8c0c400966..2c5827bf7d 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -82,23 +82,19 @@ public class ExternalShuffleClient extends ShuffleClient {
@Override
public void fetchBlocks(
- final String host,
- final int port,
- final String execId,
+ String host,
+ int port,
+ String execId,
String[] blockIds,
BlockFetchingListener listener) {
checkInit();
logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
try {
RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
- new RetryingBlockFetcher.BlockFetchStarter() {
- @Override
- public void createAndStart(String[] blockIds, BlockFetchingListener listener)
- throws IOException, InterruptedException {
+ (blockIds1, listener1) -> {
TransportClient client = clientFactory.createClient(host, port);
- new OneForOneBlockFetcher(client, appId, execId, blockIds, listener).start();
- }
- };
+ new OneForOneBlockFetcher(client, appId, execId, blockIds1, listener1).start();
+ };
int maxRetries = conf.maxIORetries();
if (maxRetries > 0) {
@@ -131,12 +127,9 @@ public class ExternalShuffleClient extends ShuffleClient {
String execId,
ExecutorShuffleInfo executorInfo) throws IOException, InterruptedException {
checkInit();
- TransportClient client = clientFactory.createUnmanagedClient(host, port);
- try {
+ try (TransportClient client = clientFactory.createUnmanagedClient(host, port)) {
ByteBuffer registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteBuffer();
client.sendRpcSync(registerMessage, 5000 /* timeoutMs */);
- } finally {
- client.close();
}
}
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
index 5be855048e..f309dda8af 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
@@ -164,12 +164,9 @@ public class RetryingBlockFetcher {
logger.info("Retrying fetch ({}/{}) for {} outstanding blocks after {} ms",
retryCount, maxRetries, outstandingBlocksIds.size(), retryWaitTime);
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS);
- fetchAllOutstanding();
- }
+ executorService.submit(() -> {
+ Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS);
+ fetchAllOutstanding();
});
}
diff --git a/common/sketch/pom.xml b/common/sketch/pom.xml
index bcd26d4352..1356c4723b 100644
--- a/common/sketch/pom.xml
+++ b/common/sketch/pom.xml
@@ -61,6 +61,7 @@
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
+ <version>3.2.2</version>
<configuration>
<javacArgs combine.children="append">
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
@@ -71,6 +72,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
<configuration>
<compilerArgs combine.children="append">
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
diff --git a/common/unsafe/pom.xml b/common/unsafe/pom.xml
index dc19f4ad5f..f03a4da5e7 100644
--- a/common/unsafe/pom.xml
+++ b/common/unsafe/pom.xml
@@ -98,6 +98,7 @@
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
+ <version>3.2.2</version>
<configuration>
<javacArgs combine.children="append">
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
@@ -108,6 +109,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
<configuration>
<compilerArgs combine.children="append">
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
index 671b8c7475..f13c24ae5e 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
@@ -162,14 +162,9 @@ public final class Platform {
constructor.setAccessible(true);
Field cleanerField = cls.getDeclaredField("cleaner");
cleanerField.setAccessible(true);
- final long memory = allocateMemory(size);
+ long memory = allocateMemory(size);
ByteBuffer buffer = (ByteBuffer) constructor.newInstance(memory, size);
- Cleaner cleaner = Cleaner.create(buffer, new Runnable() {
- @Override
- public void run() {
- freeMemory(memory);
- }
- });
+ Cleaner cleaner = Cleaner.create(buffer, () -> freeMemory(memory));
cleanerField.set(buffer, cleaner);
return buffer;
} catch (Exception e) {
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
index fd6e95c3e0..621f2c6bf3 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
@@ -178,48 +178,52 @@ public final class CalendarInterval implements Serializable {
"Interval string does not match day-time format of 'd h:m:s.n': " + s);
} else {
try {
- if (unit.equals("year")) {
- int year = (int) toLongWithRange("year", m.group(1),
- Integer.MIN_VALUE / 12, Integer.MAX_VALUE / 12);
- result = new CalendarInterval(year * 12, 0L);
-
- } else if (unit.equals("month")) {
- int month = (int) toLongWithRange("month", m.group(1),
- Integer.MIN_VALUE, Integer.MAX_VALUE);
- result = new CalendarInterval(month, 0L);
-
- } else if (unit.equals("week")) {
- long week = toLongWithRange("week", m.group(1),
- Long.MIN_VALUE / MICROS_PER_WEEK, Long.MAX_VALUE / MICROS_PER_WEEK);
- result = new CalendarInterval(0, week * MICROS_PER_WEEK);
-
- } else if (unit.equals("day")) {
- long day = toLongWithRange("day", m.group(1),
- Long.MIN_VALUE / MICROS_PER_DAY, Long.MAX_VALUE / MICROS_PER_DAY);
- result = new CalendarInterval(0, day * MICROS_PER_DAY);
-
- } else if (unit.equals("hour")) {
- long hour = toLongWithRange("hour", m.group(1),
- Long.MIN_VALUE / MICROS_PER_HOUR, Long.MAX_VALUE / MICROS_PER_HOUR);
- result = new CalendarInterval(0, hour * MICROS_PER_HOUR);
-
- } else if (unit.equals("minute")) {
- long minute = toLongWithRange("minute", m.group(1),
- Long.MIN_VALUE / MICROS_PER_MINUTE, Long.MAX_VALUE / MICROS_PER_MINUTE);
- result = new CalendarInterval(0, minute * MICROS_PER_MINUTE);
-
- } else if (unit.equals("second")) {
- long micros = parseSecondNano(m.group(1));
- result = new CalendarInterval(0, micros);
-
- } else if (unit.equals("millisecond")) {
- long millisecond = toLongWithRange("millisecond", m.group(1),
- Long.MIN_VALUE / MICROS_PER_MILLI, Long.MAX_VALUE / MICROS_PER_MILLI);
- result = new CalendarInterval(0, millisecond * MICROS_PER_MILLI);
-
- } else if (unit.equals("microsecond")) {
- long micros = Long.parseLong(m.group(1));
- result = new CalendarInterval(0, micros);
+ switch (unit) {
+ case "year":
+ int year = (int) toLongWithRange("year", m.group(1),
+ Integer.MIN_VALUE / 12, Integer.MAX_VALUE / 12);
+ result = new CalendarInterval(year * 12, 0L);
+ break;
+ case "month":
+ int month = (int) toLongWithRange("month", m.group(1),
+ Integer.MIN_VALUE, Integer.MAX_VALUE);
+ result = new CalendarInterval(month, 0L);
+ break;
+ case "week":
+ long week = toLongWithRange("week", m.group(1),
+ Long.MIN_VALUE / MICROS_PER_WEEK, Long.MAX_VALUE / MICROS_PER_WEEK);
+ result = new CalendarInterval(0, week * MICROS_PER_WEEK);
+ break;
+ case "day":
+ long day = toLongWithRange("day", m.group(1),
+ Long.MIN_VALUE / MICROS_PER_DAY, Long.MAX_VALUE / MICROS_PER_DAY);
+ result = new CalendarInterval(0, day * MICROS_PER_DAY);
+ break;
+ case "hour":
+ long hour = toLongWithRange("hour", m.group(1),
+ Long.MIN_VALUE / MICROS_PER_HOUR, Long.MAX_VALUE / MICROS_PER_HOUR);
+ result = new CalendarInterval(0, hour * MICROS_PER_HOUR);
+ break;
+ case "minute":
+ long minute = toLongWithRange("minute", m.group(1),
+ Long.MIN_VALUE / MICROS_PER_MINUTE, Long.MAX_VALUE / MICROS_PER_MINUTE);
+ result = new CalendarInterval(0, minute * MICROS_PER_MINUTE);
+ break;
+ case "second": {
+ long micros = parseSecondNano(m.group(1));
+ result = new CalendarInterval(0, micros);
+ break;
+ }
+ case "millisecond":
+ long millisecond = toLongWithRange("millisecond", m.group(1),
+ Long.MIN_VALUE / MICROS_PER_MILLI, Long.MAX_VALUE / MICROS_PER_MILLI);
+ result = new CalendarInterval(0, millisecond * MICROS_PER_MILLI);
+ break;
+ case "microsecond": {
+ long micros = Long.parseLong(m.group(1));
+ result = new CalendarInterval(0, micros);
+ break;
+ }
}
} catch (Exception e) {
throw new IllegalArgumentException("Error parsing interval string: " + e.getMessage(), e);