aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-03-16 09:36:34 +0000
committerSean Owen <sowen@cloudera.com>2016-03-16 09:36:34 +0000
commit3b461d9ecd633c4fd659998b99e700d76f58d18a (patch)
tree09e9923fc17bada794d01bc365f130039972a8b7 /core
parent05ab2948ab357fc07222bb3505df80b1886f7310 (diff)
downloadspark-3b461d9ecd633c4fd659998b99e700d76f58d18a.tar.gz
spark-3b461d9ecd633c4fd659998b99e700d76f58d18a.tar.bz2
spark-3b461d9ecd633c4fd659998b99e700d76f58d18a.zip
[SPARK-13823][SPARK-13397][SPARK-13395][CORE] More warnings, StandardCharset follow up
## What changes were proposed in this pull request? Follow up to https://github.com/apache/spark/pull/11657 - Also update `String.getBytes("UTF-8")` to use `StandardCharsets.UTF_8` - And fix one last new Coverity warning that turned up (use of unguarded `wait()` replaced by simpler/more robust `java.util.concurrent` classes in tests) - And while we're here cleaning up Coverity warnings, just fix about 15 more build warnings ## How was this patch tested? Jenkins tests Author: Sean Owen <sowen@cloudera.com> Closes #11725 from srowen/SPARK-13823.2.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/SerDe.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala3
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java10
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java2
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java2
-rw-r--r--core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java2
-rw-r--r--core/src/test/scala/org/apache/spark/AccumulatorSuite.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala8
16 files changed, 37 insertions, 32 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index b0d858486b..55db938f09 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -18,6 +18,7 @@
package org.apache.spark.api.python
import java.nio.ByteOrder
+import java.nio.charset.StandardCharsets
import java.util.{ArrayList => JArrayList}
import scala.collection.JavaConverters._
@@ -68,7 +69,8 @@ private[spark] object SerDeUtil extends Logging {
construct(args ++ Array(""))
} else if (args.length == 2 && args(1).isInstanceOf[String]) {
val typecode = args(0).asInstanceOf[String].charAt(0)
- val data: Array[Byte] = args(1).asInstanceOf[String].getBytes("ISO-8859-1")
+ // This must be ISO 8859-1 / Latin 1, not UTF-8, to interoperate correctly
+ val data = args(1).asInstanceOf[String].getBytes(StandardCharsets.ISO_8859_1)
construct(typecode, machineCodes(typecode), data)
} else {
super.construct(args)
diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
index c7fb192f26..48df5bedd6 100644
--- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
@@ -410,7 +410,7 @@ private[spark] object SerDe {
}
def writeString(out: DataOutputStream, value: String): Unit = {
- val utf8 = value.getBytes("UTF-8")
+ val utf8 = value.getBytes(StandardCharsets.UTF_8)
val len = utf8.length
out.writeInt(len)
out.write(utf8, 0, len)
diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
index 1a8e545b4f..d17a7894fd 100644
--- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
@@ -72,7 +72,7 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
def compress(schema: Schema): Array[Byte] = compressCache.getOrElseUpdate(schema, {
val bos = new ByteArrayOutputStream()
val out = codec.compressedOutputStream(bos)
- out.write(schema.toString.getBytes("UTF-8"))
+ out.write(schema.toString.getBytes(StandardCharsets.UTF_8))
out.close()
bos.toByteArray
})
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala b/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala
index 202a5191ad..f6a9f9c557 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala
@@ -19,6 +19,7 @@ package org.apache.spark.status.api.v1
import java.io.OutputStream
import java.lang.annotation.Annotation
import java.lang.reflect.Type
+import java.nio.charset.StandardCharsets
import java.text.SimpleDateFormat
import java.util.{Calendar, SimpleTimeZone}
import javax.ws.rs.Produces
@@ -68,7 +69,7 @@ private[v1] class JacksonMessageWriter extends MessageBodyWriter[Object]{
multivaluedMap: MultivaluedMap[String, AnyRef],
outputStream: OutputStream): Unit = {
t match {
- case ErrorWrapper(err) => outputStream.write(err.getBytes("utf-8"))
+ case ErrorWrapper(err) => outputStream.write(err.getBytes(StandardCharsets.UTF_8))
case _ => mapper.writeValue(outputStream, t)
}
}
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index a7e74c0079..c1036b8fac 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -1068,8 +1068,8 @@ public class JavaAPISuite implements Serializable {
@Test
public void wholeTextFiles() throws Exception {
- byte[] content1 = "spark is easy to use.\n".getBytes("utf-8");
- byte[] content2 = "spark is also easy to use.\n".getBytes("utf-8");
+ byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8);
+ byte[] content2 = "spark is also easy to use.\n".getBytes(StandardCharsets.UTF_8);
String tempDirName = tempDir.getAbsolutePath();
Files.write(content1, new File(tempDirName + "/part-00000"));
@@ -1131,7 +1131,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void binaryFiles() throws Exception {
// Reusing the wholeText files example
- byte[] content1 = "spark is easy to use.\n".getBytes("utf-8");
+ byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8);
String tempDirName = tempDir.getAbsolutePath();
File file1 = new File(tempDirName + "/part-00000");
@@ -1152,7 +1152,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void binaryFilesCaching() throws Exception {
// Reusing the wholeText files example
- byte[] content1 = "spark is easy to use.\n".getBytes("utf-8");
+ byte[] content1 = "spark is easy to use.\n".getBytes(StandardCharsets.UTF_8);
String tempDirName = tempDir.getAbsolutePath();
File file1 = new File(tempDirName + "/part-00000");
@@ -1181,7 +1181,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void binaryRecords() throws Exception {
// Reusing the wholeText files example
- byte[] content1 = "spark isn't always easy to use.\n".getBytes("utf-8");
+ byte[] content1 = "spark isn't always easy to use.\n".getBytes(StandardCharsets.UTF_8);
int numOfCopies = 10;
String tempDirName = tempDir.getAbsolutePath();
File file1 = new File(tempDirName + "/part-00000");
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
index a3502708aa..4cd3600df1 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
@@ -80,7 +80,7 @@ public class ShuffleInMemorySorterSuite {
sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2));
}
final long recordAddress = memoryManager.encodePageNumberAndOffset(dataPage, position);
- final byte[] strBytes = str.getBytes("utf-8");
+ final byte[] strBytes = str.getBytes(StandardCharsets.UTF_8);
Platform.putInt(baseObject, position, strBytes.length);
position += 4;
Platform.copyMemory(
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
index 90849ab0bd..483319434d 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
@@ -80,7 +80,7 @@ public class UnsafeInMemorySorterSuite {
// Write the records into the data page:
long position = dataPage.getBaseOffset();
for (String str : dataToSort) {
- final byte[] strBytes = str.getBytes("utf-8");
+ final byte[] strBytes = str.getBytes(StandardCharsets.UTF_8);
Platform.putInt(baseObject, position, strBytes.length);
position += 4;
Platform.copyMemory(
diff --git a/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java
index f914081d7d..94f5805853 100644
--- a/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java
+++ b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java
@@ -31,7 +31,6 @@ public class JavaTaskContextCompileCheck {
tc.isCompleted();
tc.isInterrupted();
- tc.isRunningLocally();
tc.addTaskCompletionListener(new JavaTaskCompletionListenerImpl());
tc.addTaskFailureListener(new JavaTaskFailureListenerImpl());
@@ -53,7 +52,6 @@ public class JavaTaskContextCompileCheck {
context.isInterrupted();
context.stageId();
context.partitionId();
- context.isRunningLocally();
context.addTaskCompletionListener(this);
}
}
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 61ab24051e..ec192a8543 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark
+import java.util.concurrent.Semaphore
import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable
@@ -341,7 +342,7 @@ private class SaveInfoListener extends SparkListener {
// Callback to call when a job completes. Parameter is job ID.
@GuardedBy("this")
private var jobCompletionCallback: () => Unit = null
- private var calledJobCompletionCallback: Boolean = false
+ private val jobCompletionSem = new Semaphore(0)
private var exception: Throwable = null
def getCompletedStageInfos: Seq[StageInfo] = completedStageInfos.toArray.toSeq
@@ -353,12 +354,9 @@ private class SaveInfoListener extends SparkListener {
* If `jobCompletionCallback` is set, block until the next call has finished.
* If the callback failed with an exception, throw it.
*/
- def awaitNextJobCompletion(): Unit = synchronized {
+ def awaitNextJobCompletion(): Unit = {
if (jobCompletionCallback != null) {
- while (!calledJobCompletionCallback) {
- wait()
- }
- calledJobCompletionCallback = false
+ jobCompletionSem.acquire()
if (exception != null) {
exception = null
throw exception
@@ -374,7 +372,7 @@ private class SaveInfoListener extends SparkListener {
jobCompletionCallback = callback
}
- override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized {
+ override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
if (jobCompletionCallback != null) {
try {
jobCompletionCallback()
@@ -383,8 +381,7 @@ private class SaveInfoListener extends SparkListener {
// Otherwise, if `jobCompletionCallback` threw something it wouldn't fail the test.
case NonFatal(e) => exception = e
} finally {
- calledJobCompletionCallback = true
- notify()
+ jobCompletionSem.release()
}
}
}
diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
index d91f50f18f..088b05403c 100644
--- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
@@ -285,8 +285,8 @@ class TaskMetricsSuite extends SparkFunSuite {
// set and increment values
in.setBytesRead(1L)
in.setBytesRead(2L)
- in.incRecordsRead(1L)
- in.incRecordsRead(2L)
+ in.incRecordsReadInternal(1L)
+ in.incRecordsReadInternal(2L)
in.setReadMethod(DataReadMethod.Disk)
// assert new values exist
assertValEquals(_.bytesRead, BYTES_READ, 2L)
diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
index 02806a16b9..6da18cfd49 100644
--- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
@@ -121,7 +121,7 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi
new InputStreamReader(buf.createInputStream(), StandardCharsets.UTF_8))
actualString should equal(blockString)
buf.release()
- Success()
+ Success(())
case Failure(t) =>
Failure(t)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 55f4190680..2293c11dad 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler
import java.util.Properties
+import scala.annotation.meta.param
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
import scala.language.reflectiveCalls
import scala.util.control.NonFatal
@@ -67,7 +68,7 @@ class MyRDD(
numPartitions: Int,
dependencies: List[Dependency[_]],
locations: Seq[Seq[String]] = Nil,
- @transient tracker: MapOutputTrackerMaster = null)
+ @(transient @param) tracker: MapOutputTrackerMaster = null)
extends RDD[(Int, Int)](sc, dependencies) with Serializable {
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
diff --git a/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala
index bdee889cdc..f019b1e259 100644
--- a/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.serializer
import java.io._
+import scala.annotation.meta.param
+
import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkFunSuite
@@ -219,7 +221,7 @@ class SerializableClassWithWriteObject(val objectField: Object) extends Serializ
}
-class SerializableClassWithWriteReplace(@transient replacementFieldObject: Object)
+class SerializableClassWithWriteReplace(@(transient @param) replacementFieldObject: Object)
extends Serializable {
private def writeReplace(): Object = {
replacementFieldObject
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
index d30eafd2d4..4d938d5c97 100644
--- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -196,7 +196,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
test("file appender async close stream abruptly") {
// Test FileAppender reaction to closing InputStream using a mock logging appender
val mockAppender = mock(classOf[Appender])
- val loggingEventCaptor = new ArgumentCaptor[LoggingEvent]
+ val loggingEventCaptor = ArgumentCaptor.forClass(classOf[LoggingEvent])
// Make sure only logging errors
val logger = Logger.getRootLogger
@@ -223,7 +223,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
test("file appender async close stream gracefully") {
// Test FileAppender reaction to closing InputStream using a mock logging appender
val mockAppender = mock(classOf[Appender])
- val loggingEventCaptor = new ArgumentCaptor[LoggingEvent]
+ val loggingEventCaptor = ArgumentCaptor.forClass(classOf[LoggingEvent])
// Make sure only logging errors
val logger = Logger.getRootLogger
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index de6f408fa8..6a2d4c9f2c 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -853,7 +853,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
if (hasHadoopInput) {
val inputMetrics = t.registerInputMetrics(DataReadMethod.Hadoop)
inputMetrics.setBytesRead(d + e + f)
- inputMetrics.incRecordsRead(if (hasRecords) (d + e + f) / 100 else -1)
+ inputMetrics.incRecordsReadInternal(if (hasRecords) (d + e + f) / 100 else -1)
} else {
val sr = t.registerTempShuffleReadMetrics()
sr.incRemoteBytesRead(b + d)
diff --git a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
index c12f784471..dda8bee222 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.util.collection.unsafe.sort
+import java.nio.charset.StandardCharsets
+
import com.google.common.primitives.UnsignedBytes
import org.scalatest.prop.PropertyChecks
@@ -87,10 +89,12 @@ class PrefixComparatorsSuite extends SparkFunSuite with PropertyChecks {
// scalastyle:on
forAll (regressionTests) { (s1: String, s2: String) =>
- testPrefixComparison(s1.getBytes("UTF-8"), s2.getBytes("UTF-8"))
+ testPrefixComparison(
+ s1.getBytes(StandardCharsets.UTF_8), s2.getBytes(StandardCharsets.UTF_8))
}
forAll { (s1: String, s2: String) =>
- testPrefixComparison(s1.getBytes("UTF-8"), s2.getBytes("UTF-8"))
+ testPrefixComparison(
+ s1.getBytes(StandardCharsets.UTF_8), s2.getBytes(StandardCharsets.UTF_8))
}
}