aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java19
-rw-r--r--core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala13
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala3
-rw-r--r--examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java1
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java4
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java2
-rw-r--r--mllib/src/test/java/org/apache/spark/mllib/evaluation/JavaRankingMetricsSuite.java14
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala4
-rw-r--r--network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java5
-rw-r--r--network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchRequest.java5
-rw-r--r--network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchSuccess.java5
-rw-r--r--network/common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java5
-rw-r--r--network/common/src/main/java/org/apache/spark/network/protocol/RpcRequest.java5
-rw-r--r--network/common/src/main/java/org/apache/spark/network/protocol/RpcResponse.java5
-rw-r--r--network/common/src/test/java/org/apache/spark/network/TestManagedBuffer.java5
-rw-r--r--network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java16
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java6
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java47
-rw-r--r--pom.xml4
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java1
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala16
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala2
-rw-r--r--sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaDataFrameSuite.java2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala4
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java62
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala4
29 files changed, 167 insertions, 107 deletions
diff --git a/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java b/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
index 2090efd3b9..d4c42b38ac 100644
--- a/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
+++ b/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
@@ -23,11 +23,13 @@ import java.util.List;
// See
// http://scala-programming-language.1934581.n4.nabble.com/Workaround-for-implementing-java-varargs-in-2-7-2-final-tp1944767p1944772.html
abstract class JavaSparkContextVarargsWorkaround {
- public <T> JavaRDD<T> union(JavaRDD<T>... rdds) {
+
+ @SafeVarargs
+ public final <T> JavaRDD<T> union(JavaRDD<T>... rdds) {
if (rdds.length == 0) {
throw new IllegalArgumentException("Union called on empty list");
}
- ArrayList<JavaRDD<T>> rest = new ArrayList<JavaRDD<T>>(rdds.length - 1);
+ List<JavaRDD<T>> rest = new ArrayList<>(rdds.length - 1);
for (int i = 1; i < rdds.length; i++) {
rest.add(rdds[i]);
}
@@ -38,18 +40,19 @@ abstract class JavaSparkContextVarargsWorkaround {
if (rdds.length == 0) {
throw new IllegalArgumentException("Union called on empty list");
}
- ArrayList<JavaDoubleRDD> rest = new ArrayList<JavaDoubleRDD>(rdds.length - 1);
+ List<JavaDoubleRDD> rest = new ArrayList<>(rdds.length - 1);
for (int i = 1; i < rdds.length; i++) {
rest.add(rdds[i]);
}
return union(rdds[0], rest);
}
- public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V>... rdds) {
+ @SafeVarargs
+ public final <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V>... rdds) {
if (rdds.length == 0) {
throw new IllegalArgumentException("Union called on empty list");
}
- ArrayList<JavaPairRDD<K, V>> rest = new ArrayList<JavaPairRDD<K, V>>(rdds.length - 1);
+ List<JavaPairRDD<K, V>> rest = new ArrayList<>(rdds.length - 1);
for (int i = 1; i < rdds.length; i++) {
rest.add(rdds[i]);
}
@@ -57,7 +60,7 @@ abstract class JavaSparkContextVarargsWorkaround {
}
// These methods take separate "first" and "rest" elements to avoid having the same type erasure
- abstract public <T> JavaRDD<T> union(JavaRDD<T> first, List<JavaRDD<T>> rest);
- abstract public JavaDoubleRDD union(JavaDoubleRDD first, List<JavaDoubleRDD> rest);
- abstract public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V> first, List<JavaPairRDD<K, V>> rest);
+ public abstract <T> JavaRDD<T> union(JavaRDD<T> first, List<JavaRDD<T>> rest);
+ public abstract JavaDoubleRDD union(JavaDoubleRDD first, List<JavaDoubleRDD> rest);
+ public abstract <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V> first, List<JavaPairRDD<K, V>> rest);
}
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
index b53c86e89a..ebad5bc5ab 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
@@ -27,9 +27,10 @@ import scala.util.control.NonFatal
import com.google.common.io.ByteStreams
import tachyon.client.{ReadType, WriteType, TachyonFS, TachyonFile}
+import tachyon.conf.TachyonConf
import tachyon.TachyonURI
-import org.apache.spark.{SparkException, SparkConf, Logging}
+import org.apache.spark.Logging
import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.util.Utils
@@ -60,7 +61,11 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
rootDirs = s"$storeDir/$appFolderName/$executorId"
master = blockManager.conf.get(ExternalBlockStore.MASTER_URL, "tachyon://localhost:19998")
- client = if (master != null && master != "") TachyonFS.get(new TachyonURI(master)) else null
+ client = if (master != null && master != "") {
+ TachyonFS.get(new TachyonURI(master), new TachyonConf())
+ } else {
+ null
+ }
// original implementation call System.exit, we change it to run without extblkstore support
if (client == null) {
logError("Failed to connect to the Tachyon as the master address is not configured")
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
index 11e87bd1dd..34775577de 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
@@ -73,11 +73,11 @@ class PersistenceEngineSuite extends SparkFunSuite {
assert(persistenceEngine.read[String]("test_").isEmpty)
// Test deserializing objects that contain RpcEndpointRef
- val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
+ val testRpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
try {
// Create a real endpoint so that we can test RpcEndpointRef deserialization
- val workerEndpoint = rpcEnv.setupEndpoint("worker", new RpcEndpoint {
- override val rpcEnv: RpcEnv = rpcEnv
+ val workerEndpoint = testRpcEnv.setupEndpoint("worker", new RpcEndpoint {
+ override val rpcEnv: RpcEnv = testRpcEnv
})
val workerToPersist = new WorkerInfo(
@@ -93,7 +93,8 @@ class PersistenceEngineSuite extends SparkFunSuite {
persistenceEngine.addWorker(workerToPersist)
- val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
+ val (storedApps, storedDrivers, storedWorkers) =
+ persistenceEngine.readPersistedData(testRpcEnv)
assert(storedApps.isEmpty)
assert(storedDrivers.isEmpty)
@@ -110,8 +111,8 @@ class PersistenceEngineSuite extends SparkFunSuite {
assert(workerToPersist.webUiPort === recoveryWorkerInfo.webUiPort)
assert(workerToPersist.publicAddress === recoveryWorkerInfo.publicAddress)
} finally {
- rpcEnv.shutdown()
- rpcEnv.awaitTermination()
+ testRpcEnv.shutdown()
+ testRpcEnv.awaitTermination()
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
index b354914b6f..2eb43b7313 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
@@ -17,10 +17,13 @@
package org.apache.spark.scheduler.cluster.mesos
+import scala.language.reflectiveCalls
+
import org.apache.mesos.Protos.Value
import org.mockito.Mockito._
import org.scalatest._
import org.scalatest.mock.MockitoSugar
+
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoSugar {
diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java
index 75063dbf80..e7f2f6f615 100644
--- a/examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java
@@ -178,6 +178,7 @@ public class JavaOneVsRestExample {
return params;
}
+ @SuppressWarnings("static")
private static Options generateCommandlineOptions() {
Option input = OptionBuilder.withArgName("input")
.hasArg()
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
index dbf2ef02d7..02f58f48b0 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
@@ -85,7 +85,7 @@ public class JavaStatefulNetworkWordCount {
@SuppressWarnings("unchecked")
List<Tuple2<String, Integer>> tuples = Arrays.asList(new Tuple2<String, Integer>("hello", 1),
new Tuple2<String, Integer>("world", 1));
- JavaPairRDD<String, Integer> initialRDD = ssc.sc().parallelizePairs(tuples);
+ JavaPairRDD<String, Integer> initialRDD = ssc.sparkContext().parallelizePairs(tuples);
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER_2);
@@ -107,7 +107,7 @@ public class JavaStatefulNetworkWordCount {
// This will give a Dstream made of state (which is the cumulative count of the words)
JavaPairDStream<String, Integer> stateDstream = wordsDstream.updateStateByKey(updateFunction,
- new HashPartitioner(ssc.sc().defaultParallelism()), initialRDD);
+ new HashPartitioner(ssc.sparkContext().defaultParallelism()), initialRDD);
stateDstream.print();
ssc.start();
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
index 02cd24a359..9db07d0507 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
@@ -70,7 +70,7 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
final String topic1 = "topic1";
final String topic2 = "topic2";
// hold a reference to the current offset ranges, so it can be used downstream
- final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference();
+ final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
String[] topic1data = createTopicAndSendData(topic1);
String[] topic2data = createTopicAndSendData(topic2);
diff --git a/mllib/src/test/java/org/apache/spark/mllib/evaluation/JavaRankingMetricsSuite.java b/mllib/src/test/java/org/apache/spark/mllib/evaluation/JavaRankingMetricsSuite.java
index effc8a1a6d..fa4d334801 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/evaluation/JavaRankingMetricsSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/evaluation/JavaRankingMetricsSuite.java
@@ -18,12 +18,12 @@
package org.apache.spark.mllib.evaluation;
import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import scala.Tuple2;
import scala.Tuple2$;
-import com.google.common.collect.Lists;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -34,18 +34,18 @@ import org.apache.spark.api.java.JavaSparkContext;
public class JavaRankingMetricsSuite implements Serializable {
private transient JavaSparkContext sc;
- private transient JavaRDD<Tuple2<ArrayList<Integer>, ArrayList<Integer>>> predictionAndLabels;
+ private transient JavaRDD<Tuple2<List<Integer>, List<Integer>>> predictionAndLabels;
@Before
public void setUp() {
sc = new JavaSparkContext("local", "JavaRankingMetricsSuite");
- predictionAndLabels = sc.parallelize(Lists.newArrayList(
+ predictionAndLabels = sc.parallelize(Arrays.asList(
Tuple2$.MODULE$.apply(
- Lists.newArrayList(1, 6, 2, 7, 8, 3, 9, 10, 4, 5), Lists.newArrayList(1, 2, 3, 4, 5)),
+ Arrays.asList(1, 6, 2, 7, 8, 3, 9, 10, 4, 5), Arrays.asList(1, 2, 3, 4, 5)),
Tuple2$.MODULE$.apply(
- Lists.newArrayList(4, 1, 5, 6, 2, 7, 3, 8, 9, 10), Lists.newArrayList(1, 2, 3)),
+ Arrays.asList(4, 1, 5, 6, 2, 7, 3, 8, 9, 10), Arrays.asList(1, 2, 3)),
Tuple2$.MODULE$.apply(
- Lists.newArrayList(1, 2, 3, 4, 5), Lists.<Integer>newArrayList())), 2);
+ Arrays.asList(1, 2, 3, 4, 5), Arrays.<Integer>asList())), 2);
}
@After
diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala
index aea3d9b694..98bc951116 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala
@@ -21,7 +21,7 @@ import breeze.linalg.{Vector => BV}
import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.param.ParamsSuite
-import org.apache.spark.mllib.classification.NaiveBayes
+import org.apache.spark.mllib.classification.NaiveBayes.{Multinomial, Bernoulli}
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.mllib.util.TestingUtils._
@@ -31,8 +31,6 @@ import org.apache.spark.sql.Row
class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext {
- import NaiveBayes.{Multinomial, Bernoulli}
-
def validatePrediction(predictionAndLabels: DataFrame): Unit = {
val numOfErrorPredictions = predictionAndLabels.collect().count {
case Row(prediction: Double, label: Double) =>
diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java b/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java
index f76bb49e87..f0363830b6 100644
--- a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java
+++ b/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java
@@ -53,6 +53,11 @@ public final class ChunkFetchFailure implements ResponseMessage {
}
@Override
+ public int hashCode() {
+ return Objects.hashCode(streamChunkId, errorString);
+ }
+
+ @Override
public boolean equals(Object other) {
if (other instanceof ChunkFetchFailure) {
ChunkFetchFailure o = (ChunkFetchFailure) other;
diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchRequest.java b/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchRequest.java
index 980947cf13..5a173af54f 100644
--- a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchRequest.java
+++ b/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchRequest.java
@@ -49,6 +49,11 @@ public final class ChunkFetchRequest implements RequestMessage {
}
@Override
+ public int hashCode() {
+ return streamChunkId.hashCode();
+ }
+
+ @Override
public boolean equals(Object other) {
if (other instanceof ChunkFetchRequest) {
ChunkFetchRequest o = (ChunkFetchRequest) other;
diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchSuccess.java b/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchSuccess.java
index ff4936470c..c962fb7ecf 100644
--- a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchSuccess.java
+++ b/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchSuccess.java
@@ -62,6 +62,11 @@ public final class ChunkFetchSuccess implements ResponseMessage {
}
@Override
+ public int hashCode() {
+ return Objects.hashCode(streamChunkId, buffer);
+ }
+
+ @Override
public boolean equals(Object other) {
if (other instanceof ChunkFetchSuccess) {
ChunkFetchSuccess o = (ChunkFetchSuccess) other;
diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java b/network/common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java
index 6b991375fc..2dfc7876ba 100644
--- a/network/common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java
+++ b/network/common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java
@@ -51,6 +51,11 @@ public final class RpcFailure implements ResponseMessage {
}
@Override
+ public int hashCode() {
+ return Objects.hashCode(requestId, errorString);
+ }
+
+ @Override
public boolean equals(Object other) {
if (other instanceof RpcFailure) {
RpcFailure o = (RpcFailure) other;
diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/RpcRequest.java b/network/common/src/main/java/org/apache/spark/network/protocol/RpcRequest.java
index cdee0b0e03..745039db74 100644
--- a/network/common/src/main/java/org/apache/spark/network/protocol/RpcRequest.java
+++ b/network/common/src/main/java/org/apache/spark/network/protocol/RpcRequest.java
@@ -60,6 +60,11 @@ public final class RpcRequest implements RequestMessage {
}
@Override
+ public int hashCode() {
+ return Objects.hashCode(requestId, Arrays.hashCode(message));
+ }
+
+ @Override
public boolean equals(Object other) {
if (other instanceof RpcRequest) {
RpcRequest o = (RpcRequest) other;
diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/RpcResponse.java b/network/common/src/main/java/org/apache/spark/network/protocol/RpcResponse.java
index 0a62e09a81..1671cd444f 100644
--- a/network/common/src/main/java/org/apache/spark/network/protocol/RpcResponse.java
+++ b/network/common/src/main/java/org/apache/spark/network/protocol/RpcResponse.java
@@ -51,6 +51,11 @@ public final class RpcResponse implements ResponseMessage {
}
@Override
+ public int hashCode() {
+ return Objects.hashCode(requestId, Arrays.hashCode(response));
+ }
+
+ @Override
public boolean equals(Object other) {
if (other instanceof RpcResponse) {
RpcResponse o = (RpcResponse) other;
diff --git a/network/common/src/test/java/org/apache/spark/network/TestManagedBuffer.java b/network/common/src/test/java/org/apache/spark/network/TestManagedBuffer.java
index 38113a918f..83c90f9eff 100644
--- a/network/common/src/test/java/org/apache/spark/network/TestManagedBuffer.java
+++ b/network/common/src/test/java/org/apache/spark/network/TestManagedBuffer.java
@@ -81,6 +81,11 @@ public class TestManagedBuffer extends ManagedBuffer {
}
@Override
+ public int hashCode() {
+ return underlying.hashCode();
+ }
+
+ @Override
public boolean equals(Object other) {
if (other instanceof ManagedBuffer) {
try {
diff --git a/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java b/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
index be6632bb8c..8104004847 100644
--- a/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
@@ -17,11 +17,11 @@
package org.apache.spark.network.sasl;
-import static com.google.common.base.Charsets.UTF_8;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import java.io.File;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
@@ -138,8 +138,8 @@ public class SparkSaslSuite {
public Void answer(InvocationOnMock invocation) {
byte[] message = (byte[]) invocation.getArguments()[1];
RpcResponseCallback cb = (RpcResponseCallback) invocation.getArguments()[2];
- assertEquals("Ping", new String(message, UTF_8));
- cb.onSuccess("Pong".getBytes(UTF_8));
+ assertEquals("Ping", new String(message, StandardCharsets.UTF_8));
+ cb.onSuccess("Pong".getBytes(StandardCharsets.UTF_8));
return null;
}
})
@@ -148,8 +148,9 @@ public class SparkSaslSuite {
SaslTestCtx ctx = new SaslTestCtx(rpcHandler, encrypt, false);
try {
- byte[] response = ctx.client.sendRpcSync("Ping".getBytes(UTF_8), TimeUnit.SECONDS.toMillis(10));
- assertEquals("Pong", new String(response, UTF_8));
+ byte[] response = ctx.client.sendRpcSync("Ping".getBytes(StandardCharsets.UTF_8),
+ TimeUnit.SECONDS.toMillis(10));
+ assertEquals("Pong", new String(response, StandardCharsets.UTF_8));
} finally {
ctx.close();
}
@@ -235,7 +236,7 @@ public class SparkSaslSuite {
final String blockSizeConf = "spark.network.sasl.maxEncryptedBlockSize";
System.setProperty(blockSizeConf, "1k");
- final AtomicReference<ManagedBuffer> response = new AtomicReference();
+ final AtomicReference<ManagedBuffer> response = new AtomicReference<>();
final File file = File.createTempFile("sasltest", ".txt");
SaslTestCtx ctx = null;
try {
@@ -321,7 +322,8 @@ public class SparkSaslSuite {
SaslTestCtx ctx = null;
try {
ctx = new SaslTestCtx(mock(RpcHandler.class), true, true);
- ctx.client.sendRpcSync("Ping".getBytes(UTF_8), TimeUnit.SECONDS.toMillis(10));
+ ctx.client.sendRpcSync("Ping".getBytes(StandardCharsets.UTF_8),
+ TimeUnit.SECONDS.toMillis(10));
fail("Should have failed to send RPC to server.");
} catch (Exception e) {
assertFalse(e.getCause() instanceof TimeoutException);
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
index 73374cdc77..1d197497b7 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
@@ -90,9 +90,11 @@ public class ExternalShuffleBlockHandlerSuite {
(StreamHandle) BlockTransferMessage.Decoder.fromByteArray(response.getValue());
assertEquals(2, handle.numChunks);
- ArgumentCaptor<Iterator> stream = ArgumentCaptor.forClass(Iterator.class);
+ @SuppressWarnings("unchecked")
+ ArgumentCaptor<Iterator<ManagedBuffer>> stream = (ArgumentCaptor<Iterator<ManagedBuffer>>)
+ (ArgumentCaptor<?>) ArgumentCaptor.forClass(Iterator.class);
verify(streamManager, times(1)).registerStream(stream.capture());
- Iterator<ManagedBuffer> buffers = (Iterator<ManagedBuffer>) stream.getValue();
+ Iterator<ManagedBuffer> buffers = stream.getValue();
assertEquals(block0Marker, buffers.next());
assertEquals(block1Marker, buffers.next());
assertFalse(buffers.hasNext());
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
index 1ad0d72ae5..06e46f9241 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
@@ -20,7 +20,9 @@ package org.apache.spark.network.shuffle;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Map;
import com.google.common.collect.ImmutableMap;
@@ -67,13 +69,13 @@ public class RetryingBlockFetcherSuite {
public void testNoFailures() throws IOException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);
- Map[] interactions = new Map[] {
+ List<? extends Map<String, Object>> interactions = Arrays.asList(
// Immediately return both blocks successfully.
ImmutableMap.<String, Object>builder()
.put("b0", block0)
.put("b1", block1)
- .build(),
- };
+ .build()
+ );
performInteractions(interactions, listener);
@@ -86,13 +88,13 @@ public class RetryingBlockFetcherSuite {
public void testUnrecoverableFailure() throws IOException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);
- Map[] interactions = new Map[] {
+ List<? extends Map<String, Object>> interactions = Arrays.asList(
// b0 throws a non-IOException error, so it will be failed without retry.
ImmutableMap.<String, Object>builder()
.put("b0", new RuntimeException("Ouch!"))
.put("b1", block1)
- .build(),
- };
+ .build()
+ );
performInteractions(interactions, listener);
@@ -105,7 +107,7 @@ public class RetryingBlockFetcherSuite {
public void testSingleIOExceptionOnFirst() throws IOException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);
- Map[] interactions = new Map[] {
+ List<? extends Map<String, Object>> interactions = Arrays.asList(
// IOException will cause a retry. Since b0 fails, we will retry both.
ImmutableMap.<String, Object>builder()
.put("b0", new IOException("Connection failed or something"))
@@ -114,8 +116,8 @@ public class RetryingBlockFetcherSuite {
ImmutableMap.<String, Object>builder()
.put("b0", block0)
.put("b1", block1)
- .build(),
- };
+ .build()
+ );
performInteractions(interactions, listener);
@@ -128,7 +130,7 @@ public class RetryingBlockFetcherSuite {
public void testSingleIOExceptionOnSecond() throws IOException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);
- Map[] interactions = new Map[] {
+ List<? extends Map<String, Object>> interactions = Arrays.asList(
// IOException will cause a retry. Since b1 fails, we will not retry b0.
ImmutableMap.<String, Object>builder()
.put("b0", block0)
@@ -136,8 +138,8 @@ public class RetryingBlockFetcherSuite {
.build(),
ImmutableMap.<String, Object>builder()
.put("b1", block1)
- .build(),
- };
+ .build()
+ );
performInteractions(interactions, listener);
@@ -150,7 +152,7 @@ public class RetryingBlockFetcherSuite {
public void testTwoIOExceptions() throws IOException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);
- Map[] interactions = new Map[] {
+ List<? extends Map<String, Object>> interactions = Arrays.asList(
// b0's IOException will trigger retry, b1's will be ignored.
ImmutableMap.<String, Object>builder()
.put("b0", new IOException())
@@ -164,8 +166,8 @@ public class RetryingBlockFetcherSuite {
// b1 returns successfully within 2 retries.
ImmutableMap.<String, Object>builder()
.put("b1", block1)
- .build(),
- };
+ .build()
+ );
performInteractions(interactions, listener);
@@ -178,7 +180,7 @@ public class RetryingBlockFetcherSuite {
public void testThreeIOExceptions() throws IOException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);
- Map[] interactions = new Map[] {
+ List<? extends Map<String, Object>> interactions = Arrays.asList(
// b0's IOException will trigger retry, b1's will be ignored.
ImmutableMap.<String, Object>builder()
.put("b0", new IOException())
@@ -196,8 +198,8 @@ public class RetryingBlockFetcherSuite {
// This is not reached -- b1 has failed.
ImmutableMap.<String, Object>builder()
.put("b1", block1)
- .build(),
- };
+ .build()
+ );
performInteractions(interactions, listener);
@@ -210,7 +212,7 @@ public class RetryingBlockFetcherSuite {
public void testRetryAndUnrecoverable() throws IOException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);
- Map[] interactions = new Map[] {
+ List<? extends Map<String, Object>> interactions = Arrays.asList(
// b0's IOException will trigger retry, subsequent messages will be ignored.
ImmutableMap.<String, Object>builder()
.put("b0", new IOException())
@@ -226,8 +228,8 @@ public class RetryingBlockFetcherSuite {
// b2 succeeds in its last retry.
ImmutableMap.<String, Object>builder()
.put("b2", block2)
- .build(),
- };
+ .build()
+ );
performInteractions(interactions, listener);
@@ -248,7 +250,8 @@ public class RetryingBlockFetcherSuite {
* subset of the original blocks in a second interaction.
*/
@SuppressWarnings("unchecked")
- private void performInteractions(final Map[] interactions, BlockFetchingListener listener)
+ private static void performInteractions(List<? extends Map<String, Object>> interactions,
+ BlockFetchingListener listener)
throws IOException {
TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
diff --git a/pom.xml b/pom.xml
index a958cec867..b4ee3ccb0b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1849,6 +1849,7 @@
<javacArg>${java.version}</javacArg>
<javacArg>-target</javacArg>
<javacArg>${java.version}</javacArg>
+ <javacArg>-Xlint:all,-serial,-path</javacArg>
</javacArgs>
</configuration>
</plugin>
@@ -1862,6 +1863,9 @@
<encoding>UTF-8</encoding>
<maxmem>1024m</maxmem>
<fork>true</fork>
+ <compilerArgs>
+ <arg>-Xlint:all,-serial,-path</arg>
+ </compilerArgs>
</configuration>
</plugin>
<!-- Surefire runs all Java tests -->
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
index 2c669bb59a..7302361ab9 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
@@ -167,6 +167,7 @@ public class JavaDataFrameSuite {
for (int i = 0; i < result.length(); i++) {
Assert.assertEquals(bean.getB()[i], result.apply(i));
}
+ @SuppressWarnings("unchecked")
Seq<Integer> outputBuffer = (Seq<Integer>) first.getJavaMap(2).get("hello");
Assert.assertArrayEquals(
bean.getC().get("hello"),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
index cfb03ff485..e34e0956d1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
@@ -17,14 +17,12 @@
package org.apache.spark.sql.sources
+import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
class DefaultSource extends SimpleScanSource
@@ -73,7 +71,7 @@ case class AllDataTypesScan(
sqlContext.sparkContext.parallelize(from to to).map { i =>
Row(
s"str_$i",
- s"str_$i".getBytes(),
+ s"str_$i".getBytes(StandardCharsets.UTF_8),
i % 2 == 0,
i.toByte,
i.toShort,
@@ -83,7 +81,7 @@ case class AllDataTypesScan(
i.toDouble,
new java.math.BigDecimal(i),
new java.math.BigDecimal(i),
- new Date(1970, 1, 1),
+ Date.valueOf("1970-01-01"),
new Timestamp(20000 + i),
s"varchar_$i",
Seq(i, i + 1),
@@ -92,7 +90,7 @@ case class AllDataTypesScan(
Map(Map(s"str_$i" -> i.toFloat) -> Row(i.toLong)),
Row(i, i.toString),
Row(Seq(s"str_$i", s"str_${i + 1}"),
- Row(Seq(new Date(1970, 1, i + 1)))))
+ Row(Seq(Date.valueOf(s"1970-01-${i + 1}")))))
}
}
}
@@ -113,7 +111,7 @@ class TableScanSuite extends DataSourceTest {
i.toDouble,
new java.math.BigDecimal(i),
new java.math.BigDecimal(i),
- new Date(1970, 1, 1),
+ Date.valueOf("1970-01-01"),
new Timestamp(20000 + i),
s"varchar_$i",
Seq(i, i + 1),
@@ -121,7 +119,7 @@ class TableScanSuite extends DataSourceTest {
Map(i -> i.toString),
Map(Map(s"str_$i" -> i.toFloat) -> Row(i.toLong)),
Row(i, i.toString),
- Row(Seq(s"str_$i", s"str_${i + 1}"), Row(Seq(new Date(1970, 1, i + 1)))))
+ Row(Seq(s"str_$i", s"str_${i + 1}"), Row(Seq(Date.valueOf(s"1970-01-${i + 1}")))))
}.toSeq
before {
@@ -280,7 +278,7 @@ class TableScanSuite extends DataSourceTest {
sqlTest(
"SELECT structFieldComplex.Value.`value_(2)` FROM tableWithSchema",
- (1 to 10).map(i => Row(Seq(new Date(1970, 1, i + 1)))).toSeq)
+ (1 to 10).map(i => Row(Seq(Date.valueOf(s"1970-01-${i + 1}")))).toSeq)
test("Caching") {
// Cached Query Execution
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index f58bc7d7a0..a7d5a99194 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -77,7 +77,7 @@ private[hive] object IsolatedClientLoader {
// TODO: Remove copy logic.
val tempDir = Utils.createTempDir(namePrefix = s"hive-${version}")
allFiles.foreach(f => FileUtils.copyFileToDirectory(f, tempDir))
- tempDir.listFiles().map(_.toURL)
+ tempDir.listFiles().map(_.toURI.toURL)
}
private def resolvedVersions = new scala.collection.mutable.HashMap[HiveVersion, Seq[URL]]
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index a47f9a4feb..05a78930af 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -88,7 +88,7 @@ case class AddJar(path: String) extends RunnableCommand {
val currentClassLoader = Utils.getContextOrSparkClassLoader
// Add jar to current context
- val jarURL = new java.io.File(path).toURL
+ val jarURL = new java.io.File(path).toURI.toURL
val newClassLoader = new java.net.URLClassLoader(Array(jarURL), currentClassLoader)
Thread.currentThread.setContextClassLoader(newClassLoader)
// We need to explicitly set the class loader associated with the conf in executionHive's
diff --git a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaDataFrameSuite.java b/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaDataFrameSuite.java
index 741a3cd31c..613b2bcc80 100644
--- a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaDataFrameSuite.java
+++ b/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaDataFrameSuite.java
@@ -54,7 +54,7 @@ public class JavaDataFrameSuite {
for (int i = 0; i < 10; i++) {
jsonObjects.add("{\"key\":" + i + ", \"value\":\"str" + i + "\"}");
}
- df = hc.jsonRDD(sc.parallelize(jsonObjects));
+ df = hc.read().json(sc.parallelize(jsonObjects));
df.registerTempTable("window_table");
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 95c1da6e97..fb41451803 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -660,7 +660,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils {
test("resolve udtf in projection #2") {
val rdd = sparkContext.makeRDD((1 to 2).map(i => s"""{"a":[$i, ${i + 1}]}"""))
- jsonRDD(rdd).registerTempTable("data")
+ read.json(rdd).registerTempTable("data")
checkAnswer(sql("SELECT explode(map(1, 1)) FROM data LIMIT 1"), Row(1, 1) :: Nil)
checkAnswer(sql("SELECT explode(map(1, 1)) as (k1, k2) FROM data LIMIT 1"), Row(1, 1) :: Nil)
intercept[AnalysisException] {
@@ -675,7 +675,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils {
// TGF with non-TGF in project is allowed in Spark SQL, but not in Hive
test("TGF with non-TGF in projection") {
val rdd = sparkContext.makeRDD( """{"a": "1", "b":"1"}""" :: Nil)
- jsonRDD(rdd).registerTempTable("data")
+ read.json(rdd).registerTempTable("data")
checkAnswer(
sql("SELECT explode(map(a, b)) as (k1, k2), a, b FROM data"),
Row("1", "1", "1", "1") :: Nil)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 58bdda7794..7e735562dc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -40,7 +40,9 @@ private[scheduler] case class ErrorReported(msg: String, e: Throwable) extends J
private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging {
- private val jobSets = new ConcurrentHashMap[Time, JobSet]
+ // Use of ConcurrentHashMap.keySet later causes an odd runtime problem due to Java 7/8 diff
+ // https://gist.github.com/AlainODea/1375759b8720a3f9f094
+ private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]
private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
private val jobExecutor = Executors.newFixedThreadPool(numConcurrentJobs)
private val jobGenerator = new JobGenerator(this)
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
index 50e8f9fc15..175b8a496b 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
@@ -17,13 +17,15 @@
package org.apache.spark.streaming;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.nio.ByteBuffer;
import java.util.Arrays;
-import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.Transformer;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.util.WriteAheadLog;
import org.apache.spark.streaming.util.WriteAheadLogRecordHandle;
@@ -32,40 +34,40 @@ import org.apache.spark.streaming.util.WriteAheadLogUtils;
import org.junit.Test;
import org.junit.Assert;
-class JavaWriteAheadLogSuiteHandle extends WriteAheadLogRecordHandle {
- int index = -1;
- public JavaWriteAheadLogSuiteHandle(int idx) {
- index = idx;
- }
-}
-
public class JavaWriteAheadLogSuite extends WriteAheadLog {
- class Record {
+ static class JavaWriteAheadLogSuiteHandle extends WriteAheadLogRecordHandle {
+ int index = -1;
+ JavaWriteAheadLogSuiteHandle(int idx) {
+ index = idx;
+ }
+ }
+
+ static class Record {
long time;
int index;
ByteBuffer buffer;
- public Record(long tym, int idx, ByteBuffer buf) {
+ Record(long tym, int idx, ByteBuffer buf) {
index = idx;
time = tym;
buffer = buf;
}
}
private int index = -1;
- private ArrayList<Record> records = new ArrayList<Record>();
+ private final List<Record> records = new ArrayList<>();
// Methods for WriteAheadLog
@Override
- public WriteAheadLogRecordHandle write(java.nio.ByteBuffer record, long time) {
+ public WriteAheadLogRecordHandle write(ByteBuffer record, long time) {
index += 1;
- records.add(new org.apache.spark.streaming.JavaWriteAheadLogSuite.Record(time, index, record));
+ records.add(new Record(time, index, record));
return new JavaWriteAheadLogSuiteHandle(index);
}
@Override
- public java.nio.ByteBuffer read(WriteAheadLogRecordHandle handle) {
+ public ByteBuffer read(WriteAheadLogRecordHandle handle) {
if (handle instanceof JavaWriteAheadLogSuiteHandle) {
int reqdIndex = ((JavaWriteAheadLogSuiteHandle) handle).index;
for (Record record: records) {
@@ -78,14 +80,13 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog {
}
@Override
- public java.util.Iterator<java.nio.ByteBuffer> readAll() {
- Collection<ByteBuffer> buffers = CollectionUtils.collect(records, new Transformer() {
+ public Iterator<ByteBuffer> readAll() {
+ return Iterators.transform(records.iterator(), new Function<Record,ByteBuffer>() {
@Override
- public Object transform(Object input) {
- return ((Record) input).buffer;
+ public ByteBuffer apply(Record input) {
+ return input.buffer;
}
});
- return buffers.iterator();
}
@Override
@@ -110,20 +111,21 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog {
WriteAheadLog wal = WriteAheadLogUtils.createLogForDriver(conf, null, null);
String data1 = "data1";
- WriteAheadLogRecordHandle handle = wal.write(ByteBuffer.wrap(data1.getBytes()), 1234);
+ WriteAheadLogRecordHandle handle =
+ wal.write(ByteBuffer.wrap(data1.getBytes(StandardCharsets.UTF_8)), 1234);
Assert.assertTrue(handle instanceof JavaWriteAheadLogSuiteHandle);
- Assert.assertTrue(new String(wal.read(handle).array()).equals(data1));
+ Assert.assertEquals(new String(wal.read(handle).array(), StandardCharsets.UTF_8), data1);
- wal.write(ByteBuffer.wrap("data2".getBytes()), 1235);
- wal.write(ByteBuffer.wrap("data3".getBytes()), 1236);
- wal.write(ByteBuffer.wrap("data4".getBytes()), 1237);
+ wal.write(ByteBuffer.wrap("data2".getBytes(StandardCharsets.UTF_8)), 1235);
+ wal.write(ByteBuffer.wrap("data3".getBytes(StandardCharsets.UTF_8)), 1236);
+ wal.write(ByteBuffer.wrap("data4".getBytes(StandardCharsets.UTF_8)), 1237);
wal.clean(1236, false);
- java.util.Iterator<java.nio.ByteBuffer> dataIterator = wal.readAll();
- ArrayList<String> readData = new ArrayList<String>();
+ Iterator<ByteBuffer> dataIterator = wal.readAll();
+ List<String> readData = new ArrayList<>();
while (dataIterator.hasNext()) {
- readData.add(new String(dataIterator.next().array()));
+ readData.add(new String(dataIterator.next().array(), StandardCharsets.UTF_8));
}
- Assert.assertTrue(readData.equals(Arrays.asList("data3", "data4")));
+ Assert.assertEquals(readData, Arrays.asList("data3", "data4"));
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index a08578680c..068a6cb0e8 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -100,8 +100,8 @@ class UISeleniumSuite
// Check stat table
val statTableHeaders = findAll(cssSelector("#stat-table th")).map(_.text).toSeq
statTableHeaders.exists(
- _.matches("Timelines \\(Last \\d+ batches, \\d+ active, \\d+ completed\\)")) should be
- (true)
+ _.matches("Timelines \\(Last \\d+ batches, \\d+ active, \\d+ completed\\)")
+ ) should be (true)
statTableHeaders should contain ("Histograms")
val statTableCells = findAll(cssSelector("#stat-table td")).map(_.text).toSeq