From 76d74090d60f74412bd45487e8db6aff2e8343a2 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Tue, 4 Aug 2015 12:02:26 +0100 Subject: [SPARK-9534] [BUILD] Enable javac lint for scalac parity; fix a lot of build warnings, 1.5.0 edition Enable most javac lint warnings; fix a lot of build warnings. In a few cases, touch up surrounding code in the process. I'll explain several of the changes inline in comments. Author: Sean Owen Closes #7862 from srowen/SPARK-9534 and squashes the following commits: ea51618 [Sean Owen] Enable most javac lint warnings; fix a lot of build warnings. In a few cases, touch up surrounding code in the process. --- .../java/JavaSparkContextVarargsWorkaround.java | 19 ++++--- .../apache/spark/storage/TachyonBlockManager.scala | 9 +++- .../deploy/master/PersistenceEngineSuite.scala | 13 ++--- .../cluster/mesos/MesosSchedulerUtilsSuite.scala | 3 ++ .../spark/examples/ml/JavaOneVsRestExample.java | 1 + .../streaming/JavaStatefulNetworkWordCount.java | 4 +- .../kafka/JavaDirectKafkaStreamSuite.java | 2 +- .../mllib/evaluation/JavaRankingMetricsSuite.java | 14 ++--- .../spark/ml/classification/NaiveBayesSuite.scala | 4 +- .../spark/network/protocol/ChunkFetchFailure.java | 5 ++ .../spark/network/protocol/ChunkFetchRequest.java | 5 ++ .../spark/network/protocol/ChunkFetchSuccess.java | 5 ++ .../apache/spark/network/protocol/RpcFailure.java | 5 ++ .../apache/spark/network/protocol/RpcRequest.java | 5 ++ .../apache/spark/network/protocol/RpcResponse.java | 5 ++ .../apache/spark/network/TestManagedBuffer.java | 5 ++ .../apache/spark/network/sasl/SparkSaslSuite.java | 16 +++--- .../shuffle/ExternalShuffleBlockHandlerSuite.java | 6 ++- .../network/shuffle/RetryingBlockFetcherSuite.java | 47 ++++++++-------- pom.xml | 4 ++ .../org/apache/spark/sql/JavaDataFrameSuite.java | 1 + .../apache/spark/sql/sources/TableScanSuite.scala | 16 +++--- .../sql/hive/client/IsolatedClientLoader.scala | 2 +- .../apache/spark/sql/hive/execution/commands.scala | 2 +- .../apache/spark/sql/hive/JavaDataFrameSuite.java | 2 +- .../spark/sql/hive/execution/SQLQuerySuite.scala | 4 +- .../spark/streaming/scheduler/JobScheduler.scala | 4 +- .../spark/streaming/JavaWriteAheadLogSuite.java | 62 +++++++++++----------- .../apache/spark/streaming/UISeleniumSuite.scala | 4 +- 29 files changed, 167 insertions(+), 107 deletions(-) diff --git a/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java b/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java index 2090efd3b9..d4c42b38ac 100644 --- a/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java +++ b/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java @@ -23,11 +23,13 @@ import java.util.List; // See // http://scala-programming-language.1934581.n4.nabble.com/Workaround-for-implementing-java-varargs-in-2-7-2-final-tp1944767p1944772.html abstract class JavaSparkContextVarargsWorkaround { - public JavaRDD union(JavaRDD... rdds) { + + @SafeVarargs + public final JavaRDD union(JavaRDD... rdds) { if (rdds.length == 0) { throw new IllegalArgumentException("Union called on empty list"); } - ArrayList> rest = new ArrayList>(rdds.length - 1); + List> rest = new ArrayList<>(rdds.length - 1); for (int i = 1; i < rdds.length; i++) { rest.add(rdds[i]); } @@ -38,18 +40,19 @@ abstract class JavaSparkContextVarargsWorkaround { if (rdds.length == 0) { throw new IllegalArgumentException("Union called on empty list"); } - ArrayList rest = new ArrayList(rdds.length - 1); + List rest = new ArrayList<>(rdds.length - 1); for (int i = 1; i < rdds.length; i++) { rest.add(rdds[i]); } return union(rdds[0], rest); } - public JavaPairRDD union(JavaPairRDD... rdds) { + @SafeVarargs + public final JavaPairRDD union(JavaPairRDD... rdds) { if (rdds.length == 0) { throw new IllegalArgumentException("Union called on empty list"); } - ArrayList> rest = new ArrayList>(rdds.length - 1); + List> rest = new ArrayList<>(rdds.length - 1); for (int i = 1; i < rdds.length; i++) { rest.add(rdds[i]); } @@ -57,7 +60,7 @@ abstract class JavaSparkContextVarargsWorkaround { } // These methods take separate "first" and "rest" elements to avoid having the same type erasure - abstract public JavaRDD union(JavaRDD first, List> rest); - abstract public JavaDoubleRDD union(JavaDoubleRDD first, List rest); - abstract public JavaPairRDD union(JavaPairRDD first, List> rest); + public abstract JavaRDD union(JavaRDD first, List> rest); + public abstract JavaDoubleRDD union(JavaDoubleRDD first, List rest); + public abstract JavaPairRDD union(JavaPairRDD first, List> rest); } diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala index b53c86e89a..ebad5bc5ab 100644 --- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala @@ -27,9 +27,10 @@ import scala.util.control.NonFatal import com.google.common.io.ByteStreams import tachyon.client.{ReadType, WriteType, TachyonFS, TachyonFile} +import tachyon.conf.TachyonConf import tachyon.TachyonURI -import org.apache.spark.{SparkException, SparkConf, Logging} +import org.apache.spark.Logging import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.util.Utils @@ -60,7 +61,11 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log rootDirs = s"$storeDir/$appFolderName/$executorId" master = blockManager.conf.get(ExternalBlockStore.MASTER_URL, "tachyon://localhost:19998") - client = if (master != null && master != "") TachyonFS.get(new TachyonURI(master)) else null + client = if (master != null && master != "") { + TachyonFS.get(new TachyonURI(master), new TachyonConf()) + } else { + null + } // original implementation call System.exit, we change it to run without extblkstore support if (client == null) { logError("Failed to connect to the Tachyon as the master address is not configured") diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala index 11e87bd1dd..34775577de 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala @@ -73,11 +73,11 @@ class PersistenceEngineSuite extends SparkFunSuite { assert(persistenceEngine.read[String]("test_").isEmpty) // Test deserializing objects that contain RpcEndpointRef - val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf)) + val testRpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf)) try { // Create a real endpoint so that we can test RpcEndpointRef deserialization - val workerEndpoint = rpcEnv.setupEndpoint("worker", new RpcEndpoint { - override val rpcEnv: RpcEnv = rpcEnv + val workerEndpoint = testRpcEnv.setupEndpoint("worker", new RpcEndpoint { + override val rpcEnv: RpcEnv = testRpcEnv }) val workerToPersist = new WorkerInfo( @@ -93,7 +93,8 @@ class PersistenceEngineSuite extends SparkFunSuite { persistenceEngine.addWorker(workerToPersist) - val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv) + val (storedApps, storedDrivers, storedWorkers) = + persistenceEngine.readPersistedData(testRpcEnv) assert(storedApps.isEmpty) assert(storedDrivers.isEmpty) @@ -110,8 +111,8 @@ class PersistenceEngineSuite extends SparkFunSuite { assert(workerToPersist.webUiPort === recoveryWorkerInfo.webUiPort) assert(workerToPersist.publicAddress === recoveryWorkerInfo.publicAddress) } finally { - rpcEnv.shutdown() - rpcEnv.awaitTermination() + testRpcEnv.shutdown() + testRpcEnv.awaitTermination() } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala index b354914b6f..2eb43b7313 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala @@ -17,10 +17,13 @@ package org.apache.spark.scheduler.cluster.mesos +import scala.language.reflectiveCalls + import org.apache.mesos.Protos.Value import org.mockito.Mockito._ import org.scalatest._ import org.scalatest.mock.MockitoSugar + import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoSugar { diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java index 75063dbf80..e7f2f6f615 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java @@ -178,6 +178,7 @@ public class JavaOneVsRestExample { return params; } + @SuppressWarnings("static") private static Options generateCommandlineOptions() { Option input = OptionBuilder.withArgName("input") .hasArg() diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java index dbf2ef02d7..02f58f48b0 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java @@ -85,7 +85,7 @@ public class JavaStatefulNetworkWordCount { @SuppressWarnings("unchecked") List> tuples = Arrays.asList(new Tuple2("hello", 1), new Tuple2("world", 1)); - JavaPairRDD initialRDD = ssc.sc().parallelizePairs(tuples); + JavaPairRDD initialRDD = ssc.sparkContext().parallelizePairs(tuples); JavaReceiverInputDStream lines = ssc.socketTextStream( args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER_2); @@ -107,7 +107,7 @@ public class JavaStatefulNetworkWordCount { // This will give a Dstream made of state (which is the cumulative count of the words) JavaPairDStream stateDstream = wordsDstream.updateStateByKey(updateFunction, - new HashPartitioner(ssc.sc().defaultParallelism()), initialRDD); + new HashPartitioner(ssc.sparkContext().defaultParallelism()), initialRDD); stateDstream.print(); ssc.start(); diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java index 02cd24a359..9db07d0507 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java @@ -70,7 +70,7 @@ public class JavaDirectKafkaStreamSuite implements Serializable { final String topic1 = "topic1"; final String topic2 = "topic2"; // hold a reference to the current offset ranges, so it can be used downstream - final AtomicReference offsetRanges = new AtomicReference(); + final AtomicReference offsetRanges = new AtomicReference<>(); String[] topic1data = createTopicAndSendData(topic1); String[] topic2data = createTopicAndSendData(topic2); diff --git a/mllib/src/test/java/org/apache/spark/mllib/evaluation/JavaRankingMetricsSuite.java b/mllib/src/test/java/org/apache/spark/mllib/evaluation/JavaRankingMetricsSuite.java index effc8a1a6d..fa4d334801 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/evaluation/JavaRankingMetricsSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/evaluation/JavaRankingMetricsSuite.java @@ -18,12 +18,12 @@ package org.apache.spark.mllib.evaluation; import java.io.Serializable; -import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import scala.Tuple2; import scala.Tuple2$; -import com.google.common.collect.Lists; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -34,18 +34,18 @@ import org.apache.spark.api.java.JavaSparkContext; public class JavaRankingMetricsSuite implements Serializable { private transient JavaSparkContext sc; - private transient JavaRDD, ArrayList>> predictionAndLabels; + private transient JavaRDD, List>> predictionAndLabels; @Before public void setUp() { sc = new JavaSparkContext("local", "JavaRankingMetricsSuite"); - predictionAndLabels = sc.parallelize(Lists.newArrayList( + predictionAndLabels = sc.parallelize(Arrays.asList( Tuple2$.MODULE$.apply( - Lists.newArrayList(1, 6, 2, 7, 8, 3, 9, 10, 4, 5), Lists.newArrayList(1, 2, 3, 4, 5)), + Arrays.asList(1, 6, 2, 7, 8, 3, 9, 10, 4, 5), Arrays.asList(1, 2, 3, 4, 5)), Tuple2$.MODULE$.apply( - Lists.newArrayList(4, 1, 5, 6, 2, 7, 3, 8, 9, 10), Lists.newArrayList(1, 2, 3)), + Arrays.asList(4, 1, 5, 6, 2, 7, 3, 8, 9, 10), Arrays.asList(1, 2, 3)), Tuple2$.MODULE$.apply( - Lists.newArrayList(1, 2, 3, 4, 5), Lists.newArrayList())), 2); + Arrays.asList(1, 2, 3, 4, 5), Arrays.asList())), 2); } @After diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala index aea3d9b694..98bc951116 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala @@ -21,7 +21,7 @@ import breeze.linalg.{Vector => BV} import org.apache.spark.SparkFunSuite import org.apache.spark.ml.param.ParamsSuite -import org.apache.spark.mllib.classification.NaiveBayes +import org.apache.spark.mllib.classification.NaiveBayes.{Multinomial, Bernoulli} import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ @@ -31,8 +31,6 @@ import org.apache.spark.sql.Row class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext { - import NaiveBayes.{Multinomial, Bernoulli} - def validatePrediction(predictionAndLabels: DataFrame): Unit = { val numOfErrorPredictions = predictionAndLabels.collect().count { case Row(prediction: Double, label: Double) => diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java b/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java index f76bb49e87..f0363830b6 100644 --- a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java +++ b/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java @@ -52,6 +52,11 @@ public final class ChunkFetchFailure implements ResponseMessage { return new ChunkFetchFailure(streamChunkId, errorString); } + @Override + public int hashCode() { + return Objects.hashCode(streamChunkId, errorString); + } + @Override public boolean equals(Object other) { if (other instanceof ChunkFetchFailure) { diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchRequest.java b/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchRequest.java index 980947cf13..5a173af54f 100644 --- a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchRequest.java +++ b/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchRequest.java @@ -48,6 +48,11 @@ public final class ChunkFetchRequest implements RequestMessage { return new ChunkFetchRequest(StreamChunkId.decode(buf)); } + @Override + public int hashCode() { + return streamChunkId.hashCode(); + } + @Override public boolean equals(Object other) { if (other instanceof ChunkFetchRequest) { diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchSuccess.java b/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchSuccess.java index ff4936470c..c962fb7ecf 100644 --- a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchSuccess.java +++ b/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchSuccess.java @@ -61,6 +61,11 @@ public final class ChunkFetchSuccess implements ResponseMessage { return new ChunkFetchSuccess(streamChunkId, managedBuf); } + @Override + public int hashCode() { + return Objects.hashCode(streamChunkId, buffer); + } + @Override public boolean equals(Object other) { if (other instanceof ChunkFetchSuccess) { diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java b/network/common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java index 6b991375fc..2dfc7876ba 100644 --- a/network/common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java +++ b/network/common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java @@ -50,6 +50,11 @@ public final class RpcFailure implements ResponseMessage { return new RpcFailure(requestId, errorString); } + @Override + public int hashCode() { + return Objects.hashCode(requestId, errorString); + } + @Override public boolean equals(Object other) { if (other instanceof RpcFailure) { diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/RpcRequest.java b/network/common/src/main/java/org/apache/spark/network/protocol/RpcRequest.java index cdee0b0e03..745039db74 100644 --- a/network/common/src/main/java/org/apache/spark/network/protocol/RpcRequest.java +++ b/network/common/src/main/java/org/apache/spark/network/protocol/RpcRequest.java @@ -59,6 +59,11 @@ public final class RpcRequest implements RequestMessage { return new RpcRequest(requestId, message); } + @Override + public int hashCode() { + return Objects.hashCode(requestId, Arrays.hashCode(message)); + } + @Override public boolean equals(Object other) { if (other instanceof RpcRequest) { diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/RpcResponse.java b/network/common/src/main/java/org/apache/spark/network/protocol/RpcResponse.java index 0a62e09a81..1671cd444f 100644 --- a/network/common/src/main/java/org/apache/spark/network/protocol/RpcResponse.java +++ b/network/common/src/main/java/org/apache/spark/network/protocol/RpcResponse.java @@ -50,6 +50,11 @@ public final class RpcResponse implements ResponseMessage { return new RpcResponse(requestId, response); } + @Override + public int hashCode() { + return Objects.hashCode(requestId, Arrays.hashCode(response)); + } + @Override public boolean equals(Object other) { if (other instanceof RpcResponse) { diff --git a/network/common/src/test/java/org/apache/spark/network/TestManagedBuffer.java b/network/common/src/test/java/org/apache/spark/network/TestManagedBuffer.java index 38113a918f..83c90f9eff 100644 --- a/network/common/src/test/java/org/apache/spark/network/TestManagedBuffer.java +++ b/network/common/src/test/java/org/apache/spark/network/TestManagedBuffer.java @@ -80,6 +80,11 @@ public class TestManagedBuffer extends ManagedBuffer { return underlying.convertToNetty(); } + @Override + public int hashCode() { + return underlying.hashCode(); + } + @Override public boolean equals(Object other) { if (other instanceof ManagedBuffer) { diff --git a/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java b/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java index be6632bb8c..8104004847 100644 --- a/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java +++ b/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java @@ -17,11 +17,11 @@ package org.apache.spark.network.sasl; -import static com.google.common.base.Charsets.UTF_8; import static org.junit.Assert.*; import static org.mockito.Mockito.*; import java.io.File; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.Random; @@ -138,8 +138,8 @@ public class SparkSaslSuite { public Void answer(InvocationOnMock invocation) { byte[] message = (byte[]) invocation.getArguments()[1]; RpcResponseCallback cb = (RpcResponseCallback) invocation.getArguments()[2]; - assertEquals("Ping", new String(message, UTF_8)); - cb.onSuccess("Pong".getBytes(UTF_8)); + assertEquals("Ping", new String(message, StandardCharsets.UTF_8)); + cb.onSuccess("Pong".getBytes(StandardCharsets.UTF_8)); return null; } }) @@ -148,8 +148,9 @@ public class SparkSaslSuite { SaslTestCtx ctx = new SaslTestCtx(rpcHandler, encrypt, false); try { - byte[] response = ctx.client.sendRpcSync("Ping".getBytes(UTF_8), TimeUnit.SECONDS.toMillis(10)); - assertEquals("Pong", new String(response, UTF_8)); + byte[] response = ctx.client.sendRpcSync("Ping".getBytes(StandardCharsets.UTF_8), + TimeUnit.SECONDS.toMillis(10)); + assertEquals("Pong", new String(response, StandardCharsets.UTF_8)); } finally { ctx.close(); } @@ -235,7 +236,7 @@ public class SparkSaslSuite { final String blockSizeConf = "spark.network.sasl.maxEncryptedBlockSize"; System.setProperty(blockSizeConf, "1k"); - final AtomicReference response = new AtomicReference(); + final AtomicReference response = new AtomicReference<>(); final File file = File.createTempFile("sasltest", ".txt"); SaslTestCtx ctx = null; try { @@ -321,7 +322,8 @@ public class SparkSaslSuite { SaslTestCtx ctx = null; try { ctx = new SaslTestCtx(mock(RpcHandler.class), true, true); - ctx.client.sendRpcSync("Ping".getBytes(UTF_8), TimeUnit.SECONDS.toMillis(10)); + ctx.client.sendRpcSync("Ping".getBytes(StandardCharsets.UTF_8), + TimeUnit.SECONDS.toMillis(10)); fail("Should have failed to send RPC to server."); } catch (Exception e) { assertFalse(e.getCause() instanceof TimeoutException); diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java index 73374cdc77..1d197497b7 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java @@ -90,9 +90,11 @@ public class ExternalShuffleBlockHandlerSuite { (StreamHandle) BlockTransferMessage.Decoder.fromByteArray(response.getValue()); assertEquals(2, handle.numChunks); - ArgumentCaptor stream = ArgumentCaptor.forClass(Iterator.class); + @SuppressWarnings("unchecked") + ArgumentCaptor> stream = (ArgumentCaptor>) + (ArgumentCaptor) ArgumentCaptor.forClass(Iterator.class); verify(streamManager, times(1)).registerStream(stream.capture()); - Iterator buffers = (Iterator) stream.getValue(); + Iterator buffers = stream.getValue(); assertEquals(block0Marker, buffers.next()); assertEquals(block1Marker, buffers.next()); assertFalse(buffers.hasNext()); diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java index 1ad0d72ae5..06e46f9241 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java @@ -20,7 +20,9 @@ package org.apache.spark.network.shuffle; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import com.google.common.collect.ImmutableMap; @@ -67,13 +69,13 @@ public class RetryingBlockFetcherSuite { public void testNoFailures() throws IOException { BlockFetchingListener listener = mock(BlockFetchingListener.class); - Map[] interactions = new Map[] { + List> interactions = Arrays.asList( // Immediately return both blocks successfully. ImmutableMap.builder() .put("b0", block0) .put("b1", block1) - .build(), - }; + .build() + ); performInteractions(interactions, listener); @@ -86,13 +88,13 @@ public class RetryingBlockFetcherSuite { public void testUnrecoverableFailure() throws IOException { BlockFetchingListener listener = mock(BlockFetchingListener.class); - Map[] interactions = new Map[] { + List> interactions = Arrays.asList( // b0 throws a non-IOException error, so it will be failed without retry. ImmutableMap.builder() .put("b0", new RuntimeException("Ouch!")) .put("b1", block1) - .build(), - }; + .build() + ); performInteractions(interactions, listener); @@ -105,7 +107,7 @@ public class RetryingBlockFetcherSuite { public void testSingleIOExceptionOnFirst() throws IOException { BlockFetchingListener listener = mock(BlockFetchingListener.class); - Map[] interactions = new Map[] { + List> interactions = Arrays.asList( // IOException will cause a retry. Since b0 fails, we will retry both. ImmutableMap.builder() .put("b0", new IOException("Connection failed or something")) @@ -114,8 +116,8 @@ public class RetryingBlockFetcherSuite { ImmutableMap.builder() .put("b0", block0) .put("b1", block1) - .build(), - }; + .build() + ); performInteractions(interactions, listener); @@ -128,7 +130,7 @@ public class RetryingBlockFetcherSuite { public void testSingleIOExceptionOnSecond() throws IOException { BlockFetchingListener listener = mock(BlockFetchingListener.class); - Map[] interactions = new Map[] { + List> interactions = Arrays.asList( // IOException will cause a retry. Since b1 fails, we will not retry b0. ImmutableMap.builder() .put("b0", block0) @@ -136,8 +138,8 @@ public class RetryingBlockFetcherSuite { .build(), ImmutableMap.builder() .put("b1", block1) - .build(), - }; + .build() + ); performInteractions(interactions, listener); @@ -150,7 +152,7 @@ public class RetryingBlockFetcherSuite { public void testTwoIOExceptions() throws IOException { BlockFetchingListener listener = mock(BlockFetchingListener.class); - Map[] interactions = new Map[] { + List> interactions = Arrays.asList( // b0's IOException will trigger retry, b1's will be ignored. ImmutableMap.builder() .put("b0", new IOException()) @@ -164,8 +166,8 @@ public class RetryingBlockFetcherSuite { // b1 returns successfully within 2 retries. ImmutableMap.builder() .put("b1", block1) - .build(), - }; + .build() + ); performInteractions(interactions, listener); @@ -178,7 +180,7 @@ public class RetryingBlockFetcherSuite { public void testThreeIOExceptions() throws IOException { BlockFetchingListener listener = mock(BlockFetchingListener.class); - Map[] interactions = new Map[] { + List> interactions = Arrays.asList( // b0's IOException will trigger retry, b1's will be ignored. ImmutableMap.builder() .put("b0", new IOException()) @@ -196,8 +198,8 @@ public class RetryingBlockFetcherSuite { // This is not reached -- b1 has failed. ImmutableMap.builder() .put("b1", block1) - .build(), - }; + .build() + ); performInteractions(interactions, listener); @@ -210,7 +212,7 @@ public class RetryingBlockFetcherSuite { public void testRetryAndUnrecoverable() throws IOException { BlockFetchingListener listener = mock(BlockFetchingListener.class); - Map[] interactions = new Map[] { + List> interactions = Arrays.asList( // b0's IOException will trigger retry, subsequent messages will be ignored. ImmutableMap.builder() .put("b0", new IOException()) @@ -226,8 +228,8 @@ public class RetryingBlockFetcherSuite { // b2 succeeds in its last retry. ImmutableMap.builder() .put("b2", block2) - .build(), - }; + .build() + ); performInteractions(interactions, listener); @@ -248,7 +250,8 @@ public class RetryingBlockFetcherSuite { * subset of the original blocks in a second interaction. */ @SuppressWarnings("unchecked") - private void performInteractions(final Map[] interactions, BlockFetchingListener listener) + private static void performInteractions(List> interactions, + BlockFetchingListener listener) throws IOException { TransportConf conf = new TransportConf(new SystemPropertyConfigProvider()); diff --git a/pom.xml b/pom.xml index a958cec867..b4ee3ccb0b 100644 --- a/pom.xml +++ b/pom.xml @@ -1849,6 +1849,7 @@ ${java.version} -target ${java.version} + -Xlint:all,-serial,-path @@ -1862,6 +1863,9 @@ UTF-8 1024m true + + -Xlint:all,-serial,-path + diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java index 2c669bb59a..7302361ab9 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java @@ -167,6 +167,7 @@ public class JavaDataFrameSuite { for (int i = 0; i < result.length(); i++) { Assert.assertEquals(bean.getB()[i], result.apply(i)); } + @SuppressWarnings("unchecked") Seq outputBuffer = (Seq) first.getJavaMap(2).get("hello"); Assert.assertArrayEquals( bean.getC().get("hello"), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala index cfb03ff485..e34e0956d1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala @@ -17,14 +17,12 @@ package org.apache.spark.sql.sources +import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import org.apache.spark.rdd.RDD import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String class DefaultSource extends SimpleScanSource @@ -73,7 +71,7 @@ case class AllDataTypesScan( sqlContext.sparkContext.parallelize(from to to).map { i => Row( s"str_$i", - s"str_$i".getBytes(), + s"str_$i".getBytes(StandardCharsets.UTF_8), i % 2 == 0, i.toByte, i.toShort, @@ -83,7 +81,7 @@ case class AllDataTypesScan( i.toDouble, new java.math.BigDecimal(i), new java.math.BigDecimal(i), - new Date(1970, 1, 1), + Date.valueOf("1970-01-01"), new Timestamp(20000 + i), s"varchar_$i", Seq(i, i + 1), @@ -92,7 +90,7 @@ case class AllDataTypesScan( Map(Map(s"str_$i" -> i.toFloat) -> Row(i.toLong)), Row(i, i.toString), Row(Seq(s"str_$i", s"str_${i + 1}"), - Row(Seq(new Date(1970, 1, i + 1))))) + Row(Seq(Date.valueOf(s"1970-01-${i + 1}"))))) } } } @@ -113,7 +111,7 @@ class TableScanSuite extends DataSourceTest { i.toDouble, new java.math.BigDecimal(i), new java.math.BigDecimal(i), - new Date(1970, 1, 1), + Date.valueOf("1970-01-01"), new Timestamp(20000 + i), s"varchar_$i", Seq(i, i + 1), @@ -121,7 +119,7 @@ class TableScanSuite extends DataSourceTest { Map(i -> i.toString), Map(Map(s"str_$i" -> i.toFloat) -> Row(i.toLong)), Row(i, i.toString), - Row(Seq(s"str_$i", s"str_${i + 1}"), Row(Seq(new Date(1970, 1, i + 1))))) + Row(Seq(s"str_$i", s"str_${i + 1}"), Row(Seq(Date.valueOf(s"1970-01-${i + 1}"))))) }.toSeq before { @@ -280,7 +278,7 @@ class TableScanSuite extends DataSourceTest { sqlTest( "SELECT structFieldComplex.Value.`value_(2)` FROM tableWithSchema", - (1 to 10).map(i => Row(Seq(new Date(1970, 1, i + 1)))).toSeq) + (1 to 10).map(i => Row(Seq(Date.valueOf(s"1970-01-${i + 1}")))).toSeq) test("Caching") { // Cached Query Execution diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index f58bc7d7a0..a7d5a99194 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -77,7 +77,7 @@ private[hive] object IsolatedClientLoader { // TODO: Remove copy logic. val tempDir = Utils.createTempDir(namePrefix = s"hive-${version}") allFiles.foreach(f => FileUtils.copyFileToDirectory(f, tempDir)) - tempDir.listFiles().map(_.toURL) + tempDir.listFiles().map(_.toURI.toURL) } private def resolvedVersions = new scala.collection.mutable.HashMap[HiveVersion, Seq[URL]] diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index a47f9a4feb..05a78930af 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -88,7 +88,7 @@ case class AddJar(path: String) extends RunnableCommand { val currentClassLoader = Utils.getContextOrSparkClassLoader // Add jar to current context - val jarURL = new java.io.File(path).toURL + val jarURL = new java.io.File(path).toURI.toURL val newClassLoader = new java.net.URLClassLoader(Array(jarURL), currentClassLoader) Thread.currentThread.setContextClassLoader(newClassLoader) // We need to explicitly set the class loader associated with the conf in executionHive's diff --git a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaDataFrameSuite.java b/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaDataFrameSuite.java index 741a3cd31c..613b2bcc80 100644 --- a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaDataFrameSuite.java +++ b/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaDataFrameSuite.java @@ -54,7 +54,7 @@ public class JavaDataFrameSuite { for (int i = 0; i < 10; i++) { jsonObjects.add("{\"key\":" + i + ", \"value\":\"str" + i + "\"}"); } - df = hc.jsonRDD(sc.parallelize(jsonObjects)); + df = hc.read().json(sc.parallelize(jsonObjects)); df.registerTempTable("window_table"); } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 95c1da6e97..fb41451803 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -660,7 +660,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils { test("resolve udtf in projection #2") { val rdd = sparkContext.makeRDD((1 to 2).map(i => s"""{"a":[$i, ${i + 1}]}""")) - jsonRDD(rdd).registerTempTable("data") + read.json(rdd).registerTempTable("data") checkAnswer(sql("SELECT explode(map(1, 1)) FROM data LIMIT 1"), Row(1, 1) :: Nil) checkAnswer(sql("SELECT explode(map(1, 1)) as (k1, k2) FROM data LIMIT 1"), Row(1, 1) :: Nil) intercept[AnalysisException] { @@ -675,7 +675,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils { // TGF with non-TGF in project is allowed in Spark SQL, but not in Hive test("TGF with non-TGF in projection") { val rdd = sparkContext.makeRDD( """{"a": "1", "b":"1"}""" :: Nil) - jsonRDD(rdd).registerTempTable("data") + read.json(rdd).registerTempTable("data") checkAnswer( sql("SELECT explode(map(a, b)) as (k1, k2), a, b FROM data"), Row("1", "1", "1", "1") :: Nil) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 58bdda7794..7e735562dc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -40,7 +40,9 @@ private[scheduler] case class ErrorReported(msg: String, e: Throwable) extends J private[streaming] class JobScheduler(val ssc: StreamingContext) extends Logging { - private val jobSets = new ConcurrentHashMap[Time, JobSet] + // Use of ConcurrentHashMap.keySet later causes an odd runtime problem due to Java 7/8 diff + // https://gist.github.com/AlainODea/1375759b8720a3f9f094 + private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet] private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) private val jobExecutor = Executors.newFixedThreadPool(numConcurrentJobs) private val jobGenerator = new JobGenerator(this) diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java index 50e8f9fc15..175b8a496b 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java @@ -17,13 +17,15 @@ package org.apache.spark.streaming; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.Collection; +import java.util.Iterator; +import java.util.List; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.Transformer; +import com.google.common.base.Function; +import com.google.common.collect.Iterators; import org.apache.spark.SparkConf; import org.apache.spark.streaming.util.WriteAheadLog; import org.apache.spark.streaming.util.WriteAheadLogRecordHandle; @@ -32,40 +34,40 @@ import org.apache.spark.streaming.util.WriteAheadLogUtils; import org.junit.Test; import org.junit.Assert; -class JavaWriteAheadLogSuiteHandle extends WriteAheadLogRecordHandle { - int index = -1; - public JavaWriteAheadLogSuiteHandle(int idx) { - index = idx; - } -} - public class JavaWriteAheadLogSuite extends WriteAheadLog { - class Record { + static class JavaWriteAheadLogSuiteHandle extends WriteAheadLogRecordHandle { + int index = -1; + JavaWriteAheadLogSuiteHandle(int idx) { + index = idx; + } + } + + static class Record { long time; int index; ByteBuffer buffer; - public Record(long tym, int idx, ByteBuffer buf) { + Record(long tym, int idx, ByteBuffer buf) { index = idx; time = tym; buffer = buf; } } private int index = -1; - private ArrayList records = new ArrayList(); + private final List records = new ArrayList<>(); // Methods for WriteAheadLog @Override - public WriteAheadLogRecordHandle write(java.nio.ByteBuffer record, long time) { + public WriteAheadLogRecordHandle write(ByteBuffer record, long time) { index += 1; - records.add(new org.apache.spark.streaming.JavaWriteAheadLogSuite.Record(time, index, record)); + records.add(new Record(time, index, record)); return new JavaWriteAheadLogSuiteHandle(index); } @Override - public java.nio.ByteBuffer read(WriteAheadLogRecordHandle handle) { + public ByteBuffer read(WriteAheadLogRecordHandle handle) { if (handle instanceof JavaWriteAheadLogSuiteHandle) { int reqdIndex = ((JavaWriteAheadLogSuiteHandle) handle).index; for (Record record: records) { @@ -78,14 +80,13 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog { } @Override - public java.util.Iterator readAll() { - Collection buffers = CollectionUtils.collect(records, new Transformer() { + public Iterator readAll() { + return Iterators.transform(records.iterator(), new Function() { @Override - public Object transform(Object input) { - return ((Record) input).buffer; + public ByteBuffer apply(Record input) { + return input.buffer; } }); - return buffers.iterator(); } @Override @@ -110,20 +111,21 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog { WriteAheadLog wal = WriteAheadLogUtils.createLogForDriver(conf, null, null); String data1 = "data1"; - WriteAheadLogRecordHandle handle = wal.write(ByteBuffer.wrap(data1.getBytes()), 1234); + WriteAheadLogRecordHandle handle = + wal.write(ByteBuffer.wrap(data1.getBytes(StandardCharsets.UTF_8)), 1234); Assert.assertTrue(handle instanceof JavaWriteAheadLogSuiteHandle); - Assert.assertTrue(new String(wal.read(handle).array()).equals(data1)); + Assert.assertEquals(new String(wal.read(handle).array(), StandardCharsets.UTF_8), data1); - wal.write(ByteBuffer.wrap("data2".getBytes()), 1235); - wal.write(ByteBuffer.wrap("data3".getBytes()), 1236); - wal.write(ByteBuffer.wrap("data4".getBytes()), 1237); + wal.write(ByteBuffer.wrap("data2".getBytes(StandardCharsets.UTF_8)), 1235); + wal.write(ByteBuffer.wrap("data3".getBytes(StandardCharsets.UTF_8)), 1236); + wal.write(ByteBuffer.wrap("data4".getBytes(StandardCharsets.UTF_8)), 1237); wal.clean(1236, false); - java.util.Iterator dataIterator = wal.readAll(); - ArrayList readData = new ArrayList(); + Iterator dataIterator = wal.readAll(); + List readData = new ArrayList<>(); while (dataIterator.hasNext()) { - readData.add(new String(dataIterator.next().array())); + readData.add(new String(dataIterator.next().array(), StandardCharsets.UTF_8)); } - Assert.assertTrue(readData.equals(Arrays.asList("data3", "data4"))); + Assert.assertEquals(readData, Arrays.asList("data3", "data4")); } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala index a08578680c..068a6cb0e8 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala @@ -100,8 +100,8 @@ class UISeleniumSuite // Check stat table val statTableHeaders = findAll(cssSelector("#stat-table th")).map(_.text).toSeq statTableHeaders.exists( - _.matches("Timelines \\(Last \\d+ batches, \\d+ active, \\d+ completed\\)")) should be - (true) + _.matches("Timelines \\(Last \\d+ batches, \\d+ active, \\d+ completed\\)") + ) should be (true) statTableHeaders should contain ("Histograms") val statTableCells = findAll(cssSelector("#stat-table td")).map(_.text).toSeq -- cgit v1.2.3