aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-03-16 09:36:34 +0000
committerSean Owen <sowen@cloudera.com>2016-03-16 09:36:34 +0000
commit3b461d9ecd633c4fd659998b99e700d76f58d18a (patch)
tree09e9923fc17bada794d01bc365f130039972a8b7
parent05ab2948ab357fc07222bb3505df80b1886f7310 (diff)
downloadspark-3b461d9ecd633c4fd659998b99e700d76f58d18a.tar.gz
spark-3b461d9ecd633c4fd659998b99e700d76f58d18a.tar.bz2
spark-3b461d9ecd633c4fd659998b99e700d76f58d18a.zip
[SPARK-13823][SPARK-13397][SPARK-13395][CORE] More warnings, StandardCharset follow up
## What changes were proposed in this pull request? Follow up to https://github.com/apache/spark/pull/11657 - Also update `String.getBytes("UTF-8")` to use `StandardCharsets.UTF_8` - And fix one last new Coverity warning that turned up (use of unguarded `wait()` replaced by simpler/more robust `java.util.concurrent` classes in tests) - And while we're here cleaning up Coverity warnings, just fix about 15 more build warnings ## How was this patch tested? Jenkins tests Author: Sean Owen <sowen@cloudera.com> Closes #11725 from srowen/SPARK-13823.2.
-rw-r--r--common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java94
-rw-r--r--common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java13
-rw-r--r--common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java38
-rw-r--r--common/sketch/pom.xml24
-rw-r--r--common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java8
-rw-r--r--common/unsafe/pom.xml2
-rw-r--r--common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java10
-rw-r--r--common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java8
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/SerDe.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala3
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java10
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java2
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java2
-rw-r--r--core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java2
-rw-r--r--core/src/test/scala/org/apache/spark/AccumulatorSuite.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala8
-rw-r--r--launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java33
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala4
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala3
41 files changed, 178 insertions, 184 deletions
diff --git a/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java b/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java
index c0ff9dc5f5..dd0171d1d1 100644
--- a/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java
+++ b/common/network-common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java
@@ -36,6 +36,7 @@ import static org.junit.Assert.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -93,7 +94,7 @@ public class RequestTimeoutIntegrationSuite {
ByteBuffer message,
RpcResponseCallback callback) {
try {
- semaphore.tryAcquire(FOREVER, TimeUnit.MILLISECONDS);
+ semaphore.acquire();
callback.onSuccess(ByteBuffer.allocate(responseSize));
} catch (InterruptedException e) {
// do nothing
@@ -113,20 +114,17 @@ public class RequestTimeoutIntegrationSuite {
// First completes quickly (semaphore starts at 1).
TestCallback callback0 = new TestCallback();
- synchronized (callback0) {
- client.sendRpc(ByteBuffer.allocate(0), callback0);
- callback0.wait(FOREVER);
- assertEquals(responseSize, callback0.successLength);
- }
+ client.sendRpc(ByteBuffer.allocate(0), callback0);
+ callback0.latch.await();
+ assertEquals(responseSize, callback0.successLength);
// Second times out after 2 seconds, with slack. Must be IOException.
TestCallback callback1 = new TestCallback();
- synchronized (callback1) {
- client.sendRpc(ByteBuffer.allocate(0), callback1);
- callback1.wait(4 * 1000);
- assertNotNull(callback1.failure);
- assertTrue(callback1.failure instanceof IOException);
- }
+ client.sendRpc(ByteBuffer.allocate(0), callback1);
+ callback1.latch.await(4, TimeUnit.SECONDS);
+ assertNotNull(callback1.failure);
+ assertTrue(callback1.failure instanceof IOException);
+
semaphore.release();
}
@@ -143,7 +141,7 @@ public class RequestTimeoutIntegrationSuite {
ByteBuffer message,
RpcResponseCallback callback) {
try {
- semaphore.tryAcquire(FOREVER, TimeUnit.MILLISECONDS);
+ semaphore.acquire();
callback.onSuccess(ByteBuffer.allocate(responseSize));
} catch (InterruptedException e) {
// do nothing
@@ -164,24 +162,20 @@ public class RequestTimeoutIntegrationSuite {
TransportClient client0 =
clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
TestCallback callback0 = new TestCallback();
- synchronized (callback0) {
- client0.sendRpc(ByteBuffer.allocate(0), callback0);
- callback0.wait(FOREVER);
- assertTrue(callback0.failure instanceof IOException);
- assertFalse(client0.isActive());
- }
+ client0.sendRpc(ByteBuffer.allocate(0), callback0);
+ callback0.latch.await();
+ assertTrue(callback0.failure instanceof IOException);
+ assertFalse(client0.isActive());
// Increment the semaphore and the second request should succeed quickly.
semaphore.release(2);
TransportClient client1 =
clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
TestCallback callback1 = new TestCallback();
- synchronized (callback1) {
- client1.sendRpc(ByteBuffer.allocate(0), callback1);
- callback1.wait(FOREVER);
- assertEquals(responseSize, callback1.successLength);
- assertNull(callback1.failure);
- }
+ client1.sendRpc(ByteBuffer.allocate(0), callback1);
+ callback1.latch.await();
+ assertEquals(responseSize, callback1.successLength);
+ assertNull(callback1.failure);
}
// The timeout is relative to the LAST request sent, which is kinda weird, but still.
@@ -226,18 +220,14 @@ public class RequestTimeoutIntegrationSuite {
client.fetchChunk(0, 1, callback1);
Uninterruptibles.sleepUninterruptibly(1200, TimeUnit.MILLISECONDS);
- synchronized (callback0) {
- // not complete yet, but should complete soon
- assertEquals(-1, callback0.successLength);
- assertNull(callback0.failure);
- callback0.wait(2 * 1000);
- assertTrue(callback0.failure instanceof IOException);
- }
+ // not complete yet, but should complete soon
+ assertEquals(-1, callback0.successLength);
+ assertNull(callback0.failure);
+ callback0.latch.await(2, TimeUnit.SECONDS);
+ assertTrue(callback0.failure instanceof IOException);
- synchronized (callback1) {
- // failed at same time as previous
- assertTrue(callback0.failure instanceof IOException);
- }
+ // failed at same time as previous
+ assertTrue(callback1.failure instanceof IOException);
}
/**
@@ -248,41 +238,35 @@ public class RequestTimeoutIntegrationSuite {
int successLength = -1;
Throwable failure;
+ final CountDownLatch latch = new CountDownLatch(1);
@Override
public void onSuccess(ByteBuffer response) {
- synchronized(this) {
- successLength = response.remaining();
- this.notifyAll();
- }
+ successLength = response.remaining();
+ latch.countDown();
}
@Override
public void onFailure(Throwable e) {
- synchronized(this) {
- failure = e;
- this.notifyAll();
- }
+ failure = e;
+ latch.countDown();
}
@Override
public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
- synchronized(this) {
- try {
- successLength = buffer.nioByteBuffer().remaining();
- this.notifyAll();
- } catch (IOException e) {
- // weird
- }
+ try {
+ successLength = buffer.nioByteBuffer().remaining();
+ } catch (IOException e) {
+ // weird
+ } finally {
+ latch.countDown();
}
}
@Override
public void onFailure(int chunkIndex, Throwable e) {
- synchronized(this) {
- failure = e;
- this.notifyAll();
- }
+ failure = e;
+ latch.countDown();
}
}
}
diff --git a/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java b/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
index 045773317a..45cc03df43 100644
--- a/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
+++ b/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -276,7 +277,7 @@ public class SparkSaslSuite {
ctx = new SaslTestCtx(rpcHandler, true, false);
- final Object lock = new Object();
+ final CountDownLatch lock = new CountDownLatch(1);
ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
doAnswer(new Answer<Void>() {
@@ -284,17 +285,13 @@ public class SparkSaslSuite {
public Void answer(InvocationOnMock invocation) {
response.set((ManagedBuffer) invocation.getArguments()[1]);
response.get().retain();
- synchronized (lock) {
- lock.notifyAll();
- }
+ lock.countDown();
return null;
}
}).when(callback).onSuccess(anyInt(), any(ManagedBuffer.class));
- synchronized (lock) {
- ctx.client.fetchChunk(0, 0, callback);
- lock.wait(10 * 1000);
- }
+ ctx.client.fetchChunk(0, 0, callback);
+ lock.await(10, TimeUnit.SECONDS);
verify(callback, times(1)).onSuccess(anyInt(), any(ManagedBuffer.class));
verify(callback, never()).onFailure(anyInt(), any(Throwable.class));
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
index 0ea631ea14..5322fcd781 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
@@ -20,6 +20,7 @@ package org.apache.spark.network.sasl;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.Lists;
@@ -197,26 +198,23 @@ public class SaslIntegrationSuite {
final AtomicReference<Throwable> exception = new AtomicReference<>();
+ final CountDownLatch blockFetchLatch = new CountDownLatch(1);
BlockFetchingListener listener = new BlockFetchingListener() {
@Override
- public synchronized void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
- notifyAll();
+ public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
+ blockFetchLatch.countDown();
}
-
@Override
- public synchronized void onBlockFetchFailure(String blockId, Throwable t) {
+ public void onBlockFetchFailure(String blockId, Throwable t) {
exception.set(t);
- notifyAll();
+ blockFetchLatch.countDown();
}
};
- String[] blockIds = new String[] { "shuffle_2_3_4", "shuffle_6_7_8" };
- OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client1, "app-2", "0",
- blockIds, listener);
- synchronized (listener) {
- fetcher.start();
- listener.wait();
- }
+ String[] blockIds = { "shuffle_2_3_4", "shuffle_6_7_8" };
+ OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client1, "app-2", "0", blockIds, listener);
+ fetcher.start();
+ blockFetchLatch.await();
checkSecurityException(exception.get());
// Register an executor so that the next steps work.
@@ -240,24 +238,22 @@ public class SaslIntegrationSuite {
client2 = clientFactory2.createClient(TestUtils.getLocalHost(),
blockServer.getPort());
+ final CountDownLatch chunkReceivedLatch = new CountDownLatch(1);
ChunkReceivedCallback callback = new ChunkReceivedCallback() {
@Override
- public synchronized void onSuccess(int chunkIndex, ManagedBuffer buffer) {
- notifyAll();
+ public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
+ chunkReceivedLatch.countDown();
}
-
@Override
- public synchronized void onFailure(int chunkIndex, Throwable t) {
+ public void onFailure(int chunkIndex, Throwable t) {
exception.set(t);
- notifyAll();
+ chunkReceivedLatch.countDown();
}
};
exception.set(null);
- synchronized (callback) {
- client2.fetchChunk(streamId, 0, callback);
- callback.wait();
- }
+ client2.fetchChunk(streamId, 0, callback);
+ chunkReceivedLatch.await();
checkSecurityException(exception.get());
} finally {
if (client1 != null) {
diff --git a/common/sketch/pom.xml b/common/sketch/pom.xml
index 442043cb51..8bc1f52798 100644
--- a/common/sketch/pom.xml
+++ b/common/sketch/pom.xml
@@ -45,5 +45,29 @@
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <configuration>
+ <javacArgs combine.children="append">
+ <!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
+ <javacArg>-XDignore.symbol.file</javacArg>
+ </javacArgs>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <compilerArgs combine.children="append">
+ <!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
+ <arg>-XDignore.symbol.file</arg>
+ </compilerArgs>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
</build>
</project>
diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java
index feb601d44f..81461f0300 100644
--- a/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java
+++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java
@@ -17,15 +17,11 @@
package org.apache.spark.util.sketch;
-import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
class Utils {
public static byte[] getBytesFromUTF8String(String str) {
- try {
- return str.getBytes("utf-8");
- } catch (UnsupportedEncodingException e) {
- throw new IllegalArgumentException("Only support utf-8 string", e);
- }
+ return str.getBytes(StandardCharsets.UTF_8);
}
public static long integralToLong(Object i) {
diff --git a/common/unsafe/pom.xml b/common/unsafe/pom.xml
index 5250014739..93b9580f26 100644
--- a/common/unsafe/pom.xml
+++ b/common/unsafe/pom.xml
@@ -98,7 +98,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
- <compilerArgs>
+ <compilerArgs combine.children="append">
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
<arg>-XDignore.symbol.file</arg>
</compilerArgs>
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
index e16166ade4..54a5456924 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
@@ -106,15 +106,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
* Creates an UTF8String from String.
*/
public static UTF8String fromString(String str) {
- if (str == null) return null;
- try {
- return fromBytes(str.getBytes("utf-8"));
- } catch (UnsupportedEncodingException e) {
- // Turn the exception into unchecked so we can find out about it at runtime, but
- // don't need to add lots of boilerplate code everywhere.
- throwException(e);
- return null;
- }
+ return str == null ? null : fromBytes(str.getBytes(StandardCharsets.UTF_8));
}
/**
diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
index bef5d712cf..d4160ad029 100644
--- a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
+++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
@@ -17,7 +17,7 @@
package org.apache.spark.unsafe.types;
-import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
@@ -30,9 +30,9 @@ import static org.apache.spark.unsafe.types.UTF8String.*;
public class UTF8StringSuite {
- private static void checkBasic(String str, int len) throws UnsupportedEncodingException {
+ private static void checkBasic(String str, int len) {
UTF8String s1 = fromString(str);
- UTF8String s2 = fromBytes(str.getBytes("utf8"));
+ UTF8String s2 = fromBytes(str.getBytes(StandardCharsets.UTF_8));
assertEquals(s1.numChars(), len);
assertEquals(s2.numChars(), len);
@@ -51,7 +51,7 @@ public class UTF8StringSuite {
}
@Test
- public void basicTest() throws UnsupportedEncodingException {
+ public void basicTest() {
checkBasic("", 0);
checkBasic("hello", 5);
checkBasic("大 千 世 界", 7);
diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index b0d858486b..55db938f09 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -18,6 +18,7 @@
package org.apache.spark.api.python
import java.nio.ByteOrder
+import java.nio.charset.StandardCharsets
import java.util.{ArrayList => JArrayList}
import scala.collection.JavaConverters._
@@ -68,7 +69,8 @@ private[spark] object SerDeUtil extends Logging {
construct(args ++ Array(""))
} else if (args.length == 2 && args(1).isInstanceOf[String]) {
val typecode = args(0).asInstanceOf[String].charAt(0)
- val data: Array[Byte] = args(1).asInstanceOf[String].getBytes("ISO-8859-1")
+ // This must be ISO 8859-1 / Latin 1, not UTF-8, to interoperate correctly
+ val data = args(1).asInstanceOf[String].getBytes(StandardCharsets.ISO_8859_1)
construct(typecode, machineCodes(typecode), data)
} else {
super.construct(args)
diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
index c7fb192f26..48df5bedd6 100644
--- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
@@ -410,7 +410,7 @@ private[spark] object SerDe {
}
def writeString(out: DataOutputStream, value: String): Unit = {
- val utf8 = value.getBytes("UTF-8")
+ val utf8 = value.getBytes(StandardCharsets.UTF_8)
val len = utf8.length
out.writeInt(len)
out.write(utf8, 0, len)
diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
index 1a8e545b4f..d17a7894fd 100644
--- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
@@ -72,7 +72,7 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
def compress(schema: Schema): Array[Byte] = compressCache.getOrElseUpdate(schema, {
val bos = new ByteArrayOutputStream()
val out = codec.compressedOutputStream(bos)
- out.write(schema.toString.getBytes("UTF-8"))
+ out.write(schema.toString.getBytes(StandardCharsets.UTF_8))
out.close()
bos.toByteArray
})
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala b/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala
index 202a5191ad..f6a9f9c557 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala
@@ -19,6 +19,7 @@ package org.apache.spark.status.api.v1
import java.io.OutputStream
import java.lang.annotation.Annotation
import java.lang.reflect.Type
+import java.nio.charset.StandardCharsets
import java.text.SimpleDateFormat
import java.util.{Calendar, SimpleTimeZone}
import javax.ws.rs.Produces
@@ -68,7 +69,7 @@ private[v1] class JacksonMessageWriter extends MessageBodyWriter[Object]{
multivaluedMap: MultivaluedMap[String, AnyRef],
outputStream: OutputStream): Unit = {
t match {
- case ErrorWrapper(err) => outputStream.write(err.getBytes("utf-8"))
+ case ErrorWrapper(err) => outputStream.write(err.getBytes(StandardCharsets.UTF_8))
case _ => mapper.writeValue(outputStream, t)
}
}
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index a7e74c0079..c1036b8fac 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -1068,8 +1068,8 @@ public class JavaAPISuite implements Serializable {
@Test
public void wholeTextFiles() throws Exception {
- byte[] content1 = "spark is easy to use.\n".getBytes("utf-8");
- byte[] content2 = "spark is also easy to use.\n".getBytes("utf-8");
+ byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8);
+ byte[] content2 = "spark is also easy to use.\n".getBytes(StandardCharsets.UTF_8);
String tempDirName = tempDir.getAbsolutePath();
Files.write(content1, new File(tempDirName + "/part-00000"));
@@ -1131,7 +1131,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void binaryFiles() throws Exception {
// Reusing the wholeText files example
- byte[] content1 = "spark is easy to use.\n".getBytes("utf-8");
+ byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8);
String tempDirName = tempDir.getAbsolutePath();
File file1 = new File(tempDirName + "/part-00000");
@@ -1152,7 +1152,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void binaryFilesCaching() throws Exception {
// Reusing the wholeText files example
- byte[] content1 = "spark is easy to use.\n".getBytes("utf-8");
+ byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8);
String tempDirName = tempDir.getAbsolutePath();
File file1 = new File(tempDirName + "/part-00000");
@@ -1181,7 +1181,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void binaryRecords() throws Exception {
// Reusing the wholeText files example
- byte[] content1 = "spark isn't always easy to use.\n".getBytes("utf-8");
+ byte[] content1 = "spark isn't always easy to use.\n".getBytes(StandardCharsets.UTF_8);
int numOfCopies = 10;
String tempDirName = tempDir.getAbsolutePath();
File file1 = new File(tempDirName + "/part-00000");
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
index a3502708aa..4cd3600df1 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
@@ -80,7 +80,7 @@ public class ShuffleInMemorySorterSuite {
sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2));
}
final long recordAddress = memoryManager.encodePageNumberAndOffset(dataPage, position);
- final byte[] strBytes = str.getBytes("utf-8");
+ final byte[] strBytes = str.getBytes(StandardCharsets.UTF_8);
Platform.putInt(baseObject, position, strBytes.length);
position += 4;
Platform.copyMemory(
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
index 90849ab0bd..483319434d 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
@@ -80,7 +80,7 @@ public class UnsafeInMemorySorterSuite {
// Write the records into the data page:
long position = dataPage.getBaseOffset();
for (String str : dataToSort) {
- final byte[] strBytes = str.getBytes("utf-8");
+ final byte[] strBytes = str.getBytes(StandardCharsets.UTF_8);
Platform.putInt(baseObject, position, strBytes.length);
position += 4;
Platform.copyMemory(
diff --git a/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java
index f914081d7d..94f5805853 100644
--- a/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java
+++ b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java
@@ -31,7 +31,6 @@ public class JavaTaskContextCompileCheck {
tc.isCompleted();
tc.isInterrupted();
- tc.isRunningLocally();
tc.addTaskCompletionListener(new JavaTaskCompletionListenerImpl());
tc.addTaskFailureListener(new JavaTaskFailureListenerImpl());
@@ -53,7 +52,6 @@ public class JavaTaskContextCompileCheck {
context.isInterrupted();
context.stageId();
context.partitionId();
- context.isRunningLocally();
context.addTaskCompletionListener(this);
}
}
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 61ab24051e..ec192a8543 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark
+import java.util.concurrent.Semaphore
import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable
@@ -341,7 +342,7 @@ private class SaveInfoListener extends SparkListener {
// Callback to call when a job completes. Parameter is job ID.
@GuardedBy("this")
private var jobCompletionCallback: () => Unit = null
- private var calledJobCompletionCallback: Boolean = false
+ private val jobCompletionSem = new Semaphore(0)
private var exception: Throwable = null
def getCompletedStageInfos: Seq[StageInfo] = completedStageInfos.toArray.toSeq
@@ -353,12 +354,9 @@ private class SaveInfoListener extends SparkListener {
* If `jobCompletionCallback` is set, block until the next call has finished.
* If the callback failed with an exception, throw it.
*/
- def awaitNextJobCompletion(): Unit = synchronized {
+ def awaitNextJobCompletion(): Unit = {
if (jobCompletionCallback != null) {
- while (!calledJobCompletionCallback) {
- wait()
- }
- calledJobCompletionCallback = false
+ jobCompletionSem.acquire()
if (exception != null) {
exception = null
throw exception
@@ -374,7 +372,7 @@ private class SaveInfoListener extends SparkListener {
jobCompletionCallback = callback
}
- override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized {
+ override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
if (jobCompletionCallback != null) {
try {
jobCompletionCallback()
@@ -383,8 +381,7 @@ private class SaveInfoListener extends SparkListener {
// Otherwise, if `jobCompletionCallback` threw something it wouldn't fail the test.
case NonFatal(e) => exception = e
} finally {
- calledJobCompletionCallback = true
- notify()
+ jobCompletionSem.release()
}
}
}
diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
index d91f50f18f..088b05403c 100644
--- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
@@ -285,8 +285,8 @@ class TaskMetricsSuite extends SparkFunSuite {
// set and increment values
in.setBytesRead(1L)
in.setBytesRead(2L)
- in.incRecordsRead(1L)
- in.incRecordsRead(2L)
+ in.incRecordsReadInternal(1L)
+ in.incRecordsReadInternal(2L)
in.setReadMethod(DataReadMethod.Disk)
// assert new values exist
assertValEquals(_.bytesRead, BYTES_READ, 2L)
diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
index 02806a16b9..6da18cfd49 100644
--- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
@@ -121,7 +121,7 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi
new InputStreamReader(buf.createInputStream(), StandardCharsets.UTF_8))
actualString should equal(blockString)
buf.release()
- Success()
+ Success(())
case Failure(t) =>
Failure(t)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 55f4190680..2293c11dad 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler
import java.util.Properties
+import scala.annotation.meta.param
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
import scala.language.reflectiveCalls
import scala.util.control.NonFatal
@@ -67,7 +68,7 @@ class MyRDD(
numPartitions: Int,
dependencies: List[Dependency[_]],
locations: Seq[Seq[String]] = Nil,
- @transient tracker: MapOutputTrackerMaster = null)
+ @(transient @param) tracker: MapOutputTrackerMaster = null)
extends RDD[(Int, Int)](sc, dependencies) with Serializable {
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
diff --git a/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala
index bdee889cdc..f019b1e259 100644
--- a/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.serializer
import java.io._
+import scala.annotation.meta.param
+
import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkFunSuite
@@ -219,7 +221,7 @@ class SerializableClassWithWriteObject(val objectField: Object) extends Serializ
}
-class SerializableClassWithWriteReplace(@transient replacementFieldObject: Object)
+class SerializableClassWithWriteReplace(@(transient @param) replacementFieldObject: Object)
extends Serializable {
private def writeReplace(): Object = {
replacementFieldObject
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
index d30eafd2d4..4d938d5c97 100644
--- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -196,7 +196,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
test("file appender async close stream abruptly") {
// Test FileAppender reaction to closing InputStream using a mock logging appender
val mockAppender = mock(classOf[Appender])
- val loggingEventCaptor = new ArgumentCaptor[LoggingEvent]
+ val loggingEventCaptor = ArgumentCaptor.forClass(classOf[LoggingEvent])
// Make sure only logging errors
val logger = Logger.getRootLogger
@@ -223,7 +223,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
test("file appender async close stream gracefully") {
// Test FileAppender reaction to closing InputStream using a mock logging appender
val mockAppender = mock(classOf[Appender])
- val loggingEventCaptor = new ArgumentCaptor[LoggingEvent]
+ val loggingEventCaptor = ArgumentCaptor.forClass(classOf[LoggingEvent])
// Make sure only logging errors
val logger = Logger.getRootLogger
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index de6f408fa8..6a2d4c9f2c 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -853,7 +853,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
if (hasHadoopInput) {
val inputMetrics = t.registerInputMetrics(DataReadMethod.Hadoop)
inputMetrics.setBytesRead(d + e + f)
- inputMetrics.incRecordsRead(if (hasRecords) (d + e + f) / 100 else -1)
+ inputMetrics.incRecordsReadInternal(if (hasRecords) (d + e + f) / 100 else -1)
} else {
val sr = t.registerTempShuffleReadMetrics()
sr.incRemoteBytesRead(b + d)
diff --git a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
index c12f784471..dda8bee222 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.util.collection.unsafe.sort
+import java.nio.charset.StandardCharsets
+
import com.google.common.primitives.UnsignedBytes
import org.scalatest.prop.PropertyChecks
@@ -87,10 +89,12 @@ class PrefixComparatorsSuite extends SparkFunSuite with PropertyChecks {
// scalastyle:on
forAll (regressionTests) { (s1: String, s2: String) =>
- testPrefixComparison(s1.getBytes("UTF-8"), s2.getBytes("UTF-8"))
+ testPrefixComparison(
+ s1.getBytes(StandardCharsets.UTF_8), s2.getBytes(StandardCharsets.UTF_8))
}
forAll { (s1: String, s2: String) =>
- testPrefixComparison(s1.getBytes("UTF-8"), s2.getBytes("UTF-8"))
+ testPrefixComparison(
+ s1.getBytes(StandardCharsets.UTF_8), s2.getBytes(StandardCharsets.UTF_8))
}
}
diff --git a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
index dc8fbb58d8..13f72b757f 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
@@ -23,11 +23,11 @@ import java.net.InetAddress;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
import static org.apache.spark.launcher.LauncherProtocol.*;
@@ -69,44 +69,31 @@ public class LauncherServerSuite extends BaseSuite {
Socket s = new Socket(InetAddress.getLoopbackAddress(),
LauncherServer.getServerInstance().getPort());
- final Object waitLock = new Object();
+ final Semaphore semaphore = new Semaphore(0);
handle.addListener(new SparkAppHandle.Listener() {
@Override
public void stateChanged(SparkAppHandle handle) {
- wakeUp();
+ semaphore.release();
}
-
@Override
public void infoChanged(SparkAppHandle handle) {
- wakeUp();
- }
-
- private void wakeUp() {
- synchronized (waitLock) {
- waitLock.notifyAll();
- }
+ semaphore.release();
}
});
client = new TestClient(s);
- synchronized (waitLock) {
- client.send(new Hello(handle.getSecret(), "1.4.0"));
- waitLock.wait(TimeUnit.SECONDS.toMillis(10));
- }
+ client.send(new Hello(handle.getSecret(), "1.4.0"));
+ semaphore.tryAcquire(10, TimeUnit.MILLISECONDS);
// Make sure the server matched the client to the handle.
assertNotNull(handle.getConnection());
- synchronized (waitLock) {
- client.send(new SetAppId("app-id"));
- waitLock.wait(TimeUnit.SECONDS.toMillis(10));
- }
+ client.send(new SetAppId("app-id"));
+ semaphore.tryAcquire(10, TimeUnit.MILLISECONDS);
assertEquals("app-id", handle.getAppId());
- synchronized (waitLock) {
- client.send(new SetState(SparkAppHandle.State.RUNNING));
- waitLock.wait(TimeUnit.SECONDS.toMillis(10));
- }
+ client.send(new SetState(SparkAppHandle.State.RUNNING));
+ semaphore.tryAcquire(10, TimeUnit.MILLISECONDS);
assertEquals(SparkAppHandle.State.RUNNING, handle.getState());
handle.stop();
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 53935f328a..1a58779055 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -1205,7 +1205,6 @@ private[python] class PythonMLLibAPI extends Serializable {
private[spark] object SerDe extends Serializable {
val PYSPARK_PACKAGE = "pyspark.mllib"
- val LATIN1 = "ISO-8859-1"
/**
* Base class used for pickle
@@ -1253,7 +1252,8 @@ private[spark] object SerDe extends Serializable {
if (obj.getClass.isArray) {
obj.asInstanceOf[Array[Byte]]
} else {
- obj.asInstanceOf[String].getBytes(LATIN1)
+ // This must be ISO 8859-1 / Latin 1, not UTF-8, to interoperate correctly
+ obj.asInstanceOf[String].getBytes(StandardCharsets.ISO_8859_1)
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
index d6ac4040b7..bd674dadd0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
@@ -444,7 +444,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(Hex(Literal("helloHex".getBytes(StandardCharsets.UTF_8))), "68656C6C6F486578")
// scalastyle:off
// Turn off scala style for non-ascii chars
- checkEvaluation(Hex(Literal("三重的".getBytes("UTF8"))), "E4B889E9878DE79A84")
+ checkEvaluation(Hex(Literal("三重的".getBytes(StandardCharsets.UTF_8))), "E4B889E9878DE79A84")
// scalastyle:on
Seq(LongType, BinaryType, StringType).foreach { dt =>
checkConsistencyBetweenInterpretedAndCodegen(Hex.apply _, dt)
@@ -460,7 +460,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(Unhex(Literal("GG")), null)
// scalastyle:off
// Turn off scala style for non-ascii chars
- checkEvaluation(Unhex(Literal("E4B889E9878DE79A84")), "三重的".getBytes("UTF-8"))
+ checkEvaluation(Unhex(Literal("E4B889E9878DE79A84")), "三重的".getBytes(StandardCharsets.UTF_8))
checkEvaluation(Unhex(Literal("三重的")), null)
// scalastyle:on
checkConsistencyBetweenInterpretedAndCodegen(Unhex, StringType)
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
index 7d768b165f..7234726633 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
@@ -846,8 +846,9 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
" as the dictionary was missing for encoding " + dataEncoding);
}
if (vectorizedDecode()) {
- if (dataEncoding != Encoding.PLAIN_DICTIONARY &&
- dataEncoding != Encoding.RLE_DICTIONARY) {
+ @SuppressWarnings("deprecation")
+ Encoding plainDict = Encoding.PLAIN_DICTIONARY; // var to allow warning suppression
+ if (dataEncoding != plainDict && dataEncoding != Encoding.RLE_DICTIONARY) {
throw new NotImplementedException("Unsupported encoding: " + dataEncoding);
}
this.dataColumn = new VectorizedRleValuesReader();
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
index 8c46516594..da28ec4f53 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.python
import java.io.OutputStream
+import java.nio.charset.StandardCharsets
import scala.collection.JavaConverters._
@@ -136,7 +137,7 @@ object EvaluatePython {
case (c, StringType) => UTF8String.fromString(c.toString)
- case (c: String, BinaryType) => c.getBytes("utf-8")
+ case (c: String, BinaryType) => c.getBytes(StandardCharsets.UTF_8)
case (c, BinaryType) if c.getClass.isArray && c.getClass.getComponentType.getName == "byte" => c
case (c: java.util.List[_], ArrayType(elementType, _)) =>
@@ -185,7 +186,8 @@ object EvaluatePython {
def pickle(obj: Object, out: OutputStream, pickler: Pickler): Unit = {
out.write(Opcodes.GLOBAL)
- out.write((module + "\n" + "_parse_datatype_json_string" + "\n").getBytes("utf-8"))
+ out.write(
+ (module + "\n" + "_parse_datatype_json_string" + "\n").getBytes(StandardCharsets.UTF_8))
val schema = obj.asInstanceOf[StructType]
pickler.save(schema.json)
out.write(Opcodes.TUPLE1)
@@ -209,7 +211,8 @@ object EvaluatePython {
def pickle(obj: Object, out: OutputStream, pickler: Pickler): Unit = {
if (obj == this) {
out.write(Opcodes.GLOBAL)
- out.write((module + "\n" + "_create_row_inbound_converter" + "\n").getBytes("utf-8"))
+ out.write(
+ (module + "\n" + "_create_row_inbound_converter" + "\n").getBytes(StandardCharsets.UTF_8))
} else {
// it will be memorized by Pickler to save some bytes
pickler.save(this)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
index 9ca8c4d2ed..9d7570fe7a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.columnar
import java.nio.{ByteBuffer, ByteOrder}
+import java.nio.charset.StandardCharsets
import org.apache.spark.{Logging, SparkFunSuite}
import org.apache.spark.sql.Row
@@ -67,7 +68,7 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
checkActualSize(LONG, Long.MaxValue, 8)
checkActualSize(FLOAT, Float.MaxValue, 4)
checkActualSize(DOUBLE, Double.MaxValue, 8)
- checkActualSize(STRING, "hello", 4 + "hello".getBytes("utf-8").length)
+ checkActualSize(STRING, "hello", 4 + "hello".getBytes(StandardCharsets.UTF_8).length)
checkActualSize(BINARY, Array.fill[Byte](4)(0.toByte), 4 + 4)
checkActualSize(COMPACT_DECIMAL(15, 10), Decimal(0, 15, 10), 8)
checkActualSize(LARGE_DECIMAL(20, 10), Decimal(0, 20, 10), 5)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index 6e21d5a061..0940878e38 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.columnar
+import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.{QueryTest, Row}
@@ -160,7 +161,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
sparkContext.parallelize((1 to 10000), 10).map { i =>
Row(
s"str${i}: test cache.",
- s"binary${i}: test cache.".getBytes("UTF-8"),
+ s"binary${i}: test cache.".getBytes(StandardCharsets.UTF_8),
null,
i % 2 == 0,
i.toByte,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 4671b2dca9..4a8c128fa9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.json
import java.io.{File, StringWriter}
+import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import scala.collection.JavaConverters._
@@ -27,7 +28,6 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
-import org.scalactic.Tolerance._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
@@ -1292,7 +1292,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val constantValues =
Seq(
- "a string in binary".getBytes("UTF-8"),
+ "a string in binary".getBytes(StandardCharsets.UTF_8),
null,
true,
1.toByte,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala
index 36b929ee1f..f98ea8c5ae 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala
@@ -17,8 +17,8 @@
package org.apache.spark.sql.execution.datasources.parquet
-import java.io.File
import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
import java.util.{List => JList, Map => JMap}
import scala.collection.JavaConverters._
@@ -59,7 +59,7 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared
.setLongColumn(i.toLong * 10)
.setFloatColumn(i.toFloat + 0.1f)
.setDoubleColumn(i.toDouble + 0.2d)
- .setBinaryColumn(ByteBuffer.wrap(s"val_$i".getBytes("UTF-8")))
+ .setBinaryColumn(ByteBuffer.wrap(s"val_$i".getBytes(StandardCharsets.UTF_8)))
.setStringColumn(s"val_$i")
.build())
}
@@ -74,7 +74,7 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared
i.toLong * 10,
i.toFloat + 0.1f,
i.toDouble + 0.2d,
- s"val_$i".getBytes("UTF-8"),
+ s"val_$i".getBytes(StandardCharsets.UTF_8),
s"val_$i")
})
}
@@ -103,7 +103,7 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared
.setMaybeLongColumn(i.toLong * 10)
.setMaybeFloatColumn(i.toFloat + 0.1f)
.setMaybeDoubleColumn(i.toDouble + 0.2d)
- .setMaybeBinaryColumn(ByteBuffer.wrap(s"val_$i".getBytes("UTF-8")))
+ .setMaybeBinaryColumn(ByteBuffer.wrap(s"val_$i".getBytes(StandardCharsets.UTF_8)))
.setMaybeStringColumn(s"val_$i")
.build()
}
@@ -124,7 +124,7 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared
i.toLong * 10,
i.toFloat + 0.1f,
i.toDouble + 0.2d,
- s"val_$i".getBytes("UTF-8"),
+ s"val_$i".getBytes(StandardCharsets.UTF_8),
s"val_$i")
}
})
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index a64df435d8..b394ffb366 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.datasources.parquet
+import java.nio.charset.StandardCharsets
+
import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.filter2.predicate.Operators.{Column => _, _}
@@ -260,7 +262,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
// See https://issues.apache.org/jira/browse/SPARK-11153
ignore("filter pushdown - binary") {
implicit class IntToBinary(int: Int) {
- def b: Array[Byte] = int.toString.getBytes("UTF-8")
+ def b: Array[Byte] = int.toString.getBytes(StandardCharsets.UTF_8)
}
withParquetDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
index 547d06236b..1c8b2ea808 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
@@ -81,6 +81,7 @@ class OuterJoinSuite extends SparkPlanTest with SharedSQLContext {
val buildSide = joinType match {
case LeftOuter => BuildRight
case RightOuter => BuildLeft
+ case _ => fail(s"Unsupported join type $joinType")
}
extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _) =>
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index b6e2f1f6b3..3b53716898 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -272,7 +272,7 @@ private class ScriptTransformationWriterThread(
sb.append(ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES"))
sb.toString()
}
- outputStream.write(data.getBytes("utf-8"))
+ outputStream.write(data.getBytes(StandardCharsets.UTF_8))
} else {
val writable = inputSerde.serialize(
row.asInstanceOf[GenericInternalRow].values, inputSoi)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 5e452d107d..d21bb573d4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hive.execution
import java.io._
+import java.nio.charset.StandardCharsets
import scala.util.control.NonFatal
@@ -127,7 +128,7 @@ abstract class HiveComparisonTest
protected val cacheDigest = java.security.MessageDigest.getInstance("MD5")
protected def getMd5(str: String): String = {
val digest = java.security.MessageDigest.getInstance("MD5")
- digest.update(str.replaceAll(System.lineSeparator(), "\n").getBytes("utf-8"))
+ digest.update(str.replaceAll(System.lineSeparator(), "\n").getBytes(StandardCharsets.UTF_8))
new java.math.BigInteger(1, digest.digest).toString(16)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
index d76d0c44f5..7b0c7a9f00 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.hive.orc
+import java.nio.charset.StandardCharsets
+
import scala.collection.JavaConverters._
import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument}
@@ -190,7 +192,7 @@ class OrcFilterSuite extends QueryTest with OrcTest {
test("filter pushdown - binary") {
implicit class IntToBinary(int: Int) {
- def b: Array[Byte] = int.toString.getBytes("UTF-8")
+ def b: Array[Byte] = int.toString.getBytes(StandardCharsets.UTF_8)
}
withOrcDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df =>
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 57c4ad4248..c395d361a1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -72,7 +72,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
}
test("Read/write binary data") {
- withOrcFile(BinaryData("test".getBytes("utf8")) :: Nil) { file =>
+ withOrcFile(BinaryData("test".getBytes(StandardCharsets.UTF_8)) :: Nil) { file =>
val bytes = read.orc(file).head().getAs[Array[Byte]](0)
assert(new String(bytes, StandardCharsets.UTF_8) === "test")
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
index cfd7f86f84..7654bb2d03 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -105,7 +105,7 @@ class ReceiverTrackerSuite extends TestSuiteBase {
}
/** An input DStream with for testing rate controlling */
-private[streaming] class RateTestInputDStream(@transient _ssc: StreamingContext)
+private[streaming] class RateTestInputDStream(_ssc: StreamingContext)
extends ReceiverInputDStream[Int](_ssc) {
override def getReceiver(): Receiver[Int] = new RateTestReceiver(id)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala
index 78fc344b00..6d9c80d992 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming.util
import java.io.ByteArrayOutputStream
+import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit._
import org.apache.spark.SparkFunSuite
@@ -34,7 +35,7 @@ class RateLimitedOutputStreamSuite extends SparkFunSuite {
val underlying = new ByteArrayOutputStream
val data = "X" * 41000
val stream = new RateLimitedOutputStream(underlying, desiredBytesPerSec = 10000)
- val elapsedNs = benchmark { stream.write(data.getBytes("UTF-8")) }
+ val elapsedNs = benchmark { stream.write(data.getBytes(StandardCharsets.UTF_8)) }
val seconds = SECONDS.convert(elapsedNs, NANOSECONDS)
assert(seconds >= 4, s"Seconds value ($seconds) is less than 4.")