aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2014-10-28 14:26:57 -0700
committerReynold Xin <rxin@databricks.com>2014-10-28 14:26:57 -0700
commitabcafcfba38d7c8dba68a5510475c5c49ae54d92 (patch)
tree26336d2770d7d9a033bbe9f1c1dea6fa5bbbae1d /core
parent47a40f60d62ea69b659959994918d4c640f39d5b (diff)
downloadspark-abcafcfba38d7c8dba68a5510475c5c49ae54d92.tar.gz
spark-abcafcfba38d7c8dba68a5510475c5c49ae54d92.tar.bz2
spark-abcafcfba38d7c8dba68a5510475c5c49ae54d92.zip
[Spark 3922] Refactor spark-core to use Utils.UTF_8
A global UTF8 constant is very helpful to handle encoding problems when converting between String and bytes. There are several solutions here: 1. Add `val UTF_8 = Charset.forName("UTF-8")` to Utils.scala 2. java.nio.charset.StandardCharsets.UTF_8 (require JDK7) 3. io.netty.util.CharsetUtil.UTF_8 4. com.google.common.base.Charsets.UTF_8 5. org.apache.commons.lang.CharEncoding.UTF_8 6. org.apache.commons.lang3.CharEncoding.UTF_8 IMO, I prefer option 1) because people can find it easily. This is a PR for option 1) and only fixes Spark Core. Author: zsxwing <zsxwing@gmail.com> Closes #2781 from zsxwing/SPARK-3922 and squashes the following commits: f974edd [zsxwing] Merge branch 'master' into SPARK-3922 2d27423 [zsxwing] Refactor spark-core to use Refactor spark-core to use Utils.UTF_8
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkSaslClient.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/SparkSaslServer.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClient.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandler.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/server/BlockServer.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/server/BlockServerChannelInitializer.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/Message.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandlerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/network/netty/server/BlockHeaderEncoderSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala12
16 files changed, 55 insertions, 46 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkSaslClient.scala b/core/src/main/scala/org/apache/spark/SparkSaslClient.scala
index 65003b6ac6..a954fcc0c3 100644
--- a/core/src/main/scala/org/apache/spark/SparkSaslClient.scala
+++ b/core/src/main/scala/org/apache/spark/SparkSaslClient.scala
@@ -17,7 +17,6 @@
package org.apache.spark
-import java.io.IOException
import javax.security.auth.callback.Callback
import javax.security.auth.callback.CallbackHandler
import javax.security.auth.callback.NameCallback
@@ -31,6 +30,8 @@ import javax.security.sasl.SaslException
import scala.collection.JavaConversions.mapAsJavaMap
+import com.google.common.base.Charsets.UTF_8
+
/**
* Implements SASL Client logic for Spark
*/
@@ -111,10 +112,10 @@ private[spark] class SparkSaslClient(securityMgr: SecurityManager) extends Logg
CallbackHandler {
private val userName: String =
- SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes("utf-8"))
+ SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes(UTF_8))
private val secretKey = securityMgr.getSecretKey()
private val userPassword: Array[Char] = SparkSaslServer.encodePassword(
- if (secretKey != null) secretKey.getBytes("utf-8") else "".getBytes("utf-8"))
+ if (secretKey != null) secretKey.getBytes(UTF_8) else "".getBytes(UTF_8))
/**
* Implementation used to respond to SASL request from the server.
diff --git a/core/src/main/scala/org/apache/spark/SparkSaslServer.scala b/core/src/main/scala/org/apache/spark/SparkSaslServer.scala
index f6b0a9132a..7c2afb3646 100644
--- a/core/src/main/scala/org/apache/spark/SparkSaslServer.scala
+++ b/core/src/main/scala/org/apache/spark/SparkSaslServer.scala
@@ -28,6 +28,8 @@ import javax.security.sasl.Sasl
import javax.security.sasl.SaslException
import javax.security.sasl.SaslServer
import scala.collection.JavaConversions.mapAsJavaMap
+
+import com.google.common.base.Charsets.UTF_8
import org.apache.commons.net.util.Base64
/**
@@ -89,7 +91,7 @@ private[spark] class SparkSaslServer(securityMgr: SecurityManager) extends Loggi
extends CallbackHandler {
private val userName: String =
- SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes("utf-8"))
+ SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes(UTF_8))
override def handle(callbacks: Array[Callback]) {
logDebug("In the sasl server callback handler")
@@ -101,7 +103,7 @@ private[spark] class SparkSaslServer(securityMgr: SecurityManager) extends Loggi
case pc: PasswordCallback => {
logDebug("handle: SASL server callback: setting userPassword")
val password: Array[Char] =
- SparkSaslServer.encodePassword(securityMgr.getSecretKey().getBytes("utf-8"))
+ SparkSaslServer.encodePassword(securityMgr.getSecretKey().getBytes(UTF_8))
pc.setPassword(password)
}
case rc: RealmCallback => {
@@ -159,7 +161,7 @@ private[spark] object SparkSaslServer {
* @return Base64-encoded string
*/
def encodeIdentifier(identifier: Array[Byte]): String = {
- new String(Base64.encodeBase64(identifier), "utf-8")
+ new String(Base64.encodeBase64(identifier), UTF_8)
}
/**
@@ -168,7 +170,7 @@ private[spark] object SparkSaslServer {
* @return password as a char array.
*/
def encodePassword(password: Array[Byte]): Array[Char] = {
- new String(Base64.encodeBase64(password), "utf-8").toCharArray()
+ new String(Base64.encodeBase64(password), UTF_8).toCharArray()
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 163dca6cad..61b125ef7c 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -19,7 +19,6 @@ package org.apache.spark.api.python
import java.io._
import java.net._
-import java.nio.charset.Charset
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
import scala.collection.JavaConversions._
@@ -27,6 +26,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.language.existentials
+import com.google.common.base.Charsets.UTF_8
import net.razorvine.pickle.{Pickler, Unpickler}
import org.apache.hadoop.conf.Configuration
@@ -134,7 +134,7 @@ private[spark] class PythonRDD(
val exLength = stream.readInt()
val obj = new Array[Byte](exLength)
stream.readFully(obj)
- throw new PythonException(new String(obj, "utf-8"),
+ throw new PythonException(new String(obj, UTF_8),
writerThread.exception.getOrElse(null))
case SpecialLengths.END_OF_DATA_SECTION =>
// We've finished the data section of the output, but we can still
@@ -318,7 +318,6 @@ private object SpecialLengths {
}
private[spark] object PythonRDD extends Logging {
- val UTF8 = Charset.forName("UTF-8")
// remember the broadcasts sent to each worker
private val workerBroadcasts = new mutable.WeakHashMap[Socket, mutable.Set[Long]]()
@@ -586,7 +585,7 @@ private[spark] object PythonRDD extends Logging {
}
def writeUTF(str: String, dataOut: DataOutputStream) {
- val bytes = str.getBytes(UTF8)
+ val bytes = str.getBytes(UTF_8)
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
}
@@ -849,7 +848,7 @@ private[spark] object PythonRDD extends Logging {
private
class BytesToString extends org.apache.spark.api.java.function.Function[Array[Byte], String] {
- override def call(arr: Array[Byte]) : String = new String(arr, PythonRDD.UTF8)
+ override def call(arr: Array[Byte]) : String = new String(arr, UTF_8)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
index d11db978b8..e9ca9166eb 100644
--- a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
@@ -18,7 +18,8 @@
package org.apache.spark.api.python
import java.io.{DataOutput, DataInput}
-import java.nio.charset.Charset
+
+import com.google.common.base.Charsets.UTF_8
import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
@@ -136,7 +137,7 @@ object WriteInputFormatTestDataGenerator {
sc.parallelize(intKeys).saveAsSequenceFile(intPath)
sc.parallelize(intKeys.map{ case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath)
sc.parallelize(intKeys.map{ case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath)
- sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes(Charset.forName("UTF-8"))) }
+ sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes(UTF_8)) }
).saveAsSequenceFile(bytesPath)
val bools = Seq((1, true), (2, true), (2, false), (3, true), (2, false), (1, false))
sc.parallelize(bools).saveAsSequenceFile(boolPath)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 9f99117625..3bf0b9492d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConversions._
import scala.collection.Map
import akka.actor.ActorRef
-import com.google.common.base.Charsets
+import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileUtil, Path}
@@ -178,7 +178,7 @@ private[spark] class DriverRunner(
val stderr = new File(baseDir, "stderr")
val header = "Launch Command: %s\n%s\n\n".format(
command.mkString("\"", "\" \"", "\""), "=" * 40)
- Files.append(header, stderr, Charsets.UTF_8)
+ Files.append(header, stderr, UTF_8)
CommandUtils.redirectStream(process.getErrorStream, stderr)
}
runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 71d7385b08..030a651469 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -20,7 +20,7 @@ package org.apache.spark.deploy.worker
import java.io._
import akka.actor.ActorRef
-import com.google.common.base.Charsets
+import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
import org.apache.spark.{SparkConf, Logging}
@@ -151,7 +151,7 @@ private[spark] class ExecutorRunner(
stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
val stderr = new File(executorDir, "stderr")
- Files.write(header, stderr, Charsets.UTF_8)
+ Files.write(header, stderr, UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
state = ExecutorState.RUNNING
diff --git a/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClient.scala b/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClient.scala
index 5aea7ba2f3..3ab13b96d7 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClient.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClient.scala
@@ -19,13 +19,13 @@ package org.apache.spark.network.netty.client
import java.util.concurrent.TimeoutException
+import com.google.common.base.Charsets.UTF_8
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.PooledByteBufAllocator
import io.netty.channel.socket.SocketChannel
import io.netty.channel.{ChannelFutureListener, ChannelFuture, ChannelInitializer, ChannelOption}
import io.netty.handler.codec.LengthFieldBasedFrameDecoder
import io.netty.handler.codec.string.StringEncoder
-import io.netty.util.CharsetUtil
import org.apache.spark.Logging
@@ -61,7 +61,7 @@ class BlockFetchingClient(factory: BlockFetchingClientFactory, hostname: String,
b.handler(new ChannelInitializer[SocketChannel] {
override def initChannel(ch: SocketChannel): Unit = {
ch.pipeline
- .addLast("encoder", new StringEncoder(CharsetUtil.UTF_8))
+ .addLast("encoder", new StringEncoder(UTF_8))
// maxFrameLength = 2G, lengthFieldOffset = 0, lengthFieldLength = 4
.addLast("framedLengthDecoder", new LengthFieldBasedFrameDecoder(Int.MaxValue, 0, 4))
.addLast("handler", handler)
diff --git a/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandler.scala b/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandler.scala
index 83265b1642..d9d3f7bef0 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandler.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandler.scala
@@ -17,6 +17,7 @@
package org.apache.spark.network.netty.client
+import com.google.common.base.Charsets.UTF_8
import io.netty.buffer.ByteBuf
import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
@@ -67,7 +68,7 @@ class BlockFetchingClientHandler extends SimpleChannelInboundHandler[ByteBuf] wi
val blockIdLen = in.readInt()
val blockIdBytes = new Array[Byte](math.abs(blockIdLen))
in.readBytes(blockIdBytes)
- val blockId = new String(blockIdBytes)
+ val blockId = new String(blockIdBytes, UTF_8)
val blockSize = totalLen - math.abs(blockIdLen) - 4
def server = ctx.channel.remoteAddress.toString
@@ -76,7 +77,7 @@ class BlockFetchingClientHandler extends SimpleChannelInboundHandler[ByteBuf] wi
if (blockIdLen < 0) {
val errorMessageBytes = new Array[Byte](blockSize)
in.readBytes(errorMessageBytes)
- val errorMsg = new String(errorMessageBytes)
+ val errorMsg = new String(errorMessageBytes, UTF_8)
logTrace(s"Received block $blockId ($blockSize B) with error $errorMsg from $server")
val listener = outstandingRequests.get(blockId)
diff --git a/core/src/main/scala/org/apache/spark/network/netty/server/BlockServer.scala b/core/src/main/scala/org/apache/spark/network/netty/server/BlockServer.scala
index 7b2f9a8d4d..9194c7ced3 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/server/BlockServer.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/server/BlockServer.scala
@@ -19,6 +19,7 @@ package org.apache.spark.network.netty.server
import java.net.InetSocketAddress
+import com.google.common.base.Charsets.UTF_8
import io.netty.bootstrap.ServerBootstrap
import io.netty.buffer.PooledByteBufAllocator
import io.netty.channel.{ChannelFuture, ChannelInitializer, ChannelOption}
@@ -30,7 +31,6 @@ import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.channel.socket.oio.OioServerSocketChannel
import io.netty.handler.codec.LineBasedFrameDecoder
import io.netty.handler.codec.string.StringDecoder
-import io.netty.util.CharsetUtil
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.network.netty.NettyConfig
@@ -131,7 +131,7 @@ class BlockServer(conf: NettyConfig, dataProvider: BlockDataProvider) extends Lo
override def initChannel(ch: SocketChannel): Unit = {
ch.pipeline
.addLast("frameDecoder", new LineBasedFrameDecoder(1024)) // max block id length 1024
- .addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8))
+ .addLast("stringDecoder", new StringDecoder(UTF_8))
.addLast("blockHeaderEncoder", new BlockHeaderEncoder)
.addLast("handler", new BlockServerHandler(dataProvider))
}
diff --git a/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerChannelInitializer.scala b/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerChannelInitializer.scala
index cc70bd0c5c..188154d51d 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerChannelInitializer.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerChannelInitializer.scala
@@ -17,13 +17,13 @@
package org.apache.spark.network.netty.server
+import com.google.common.base.Charsets.UTF_8
import io.netty.channel.ChannelInitializer
import io.netty.channel.socket.SocketChannel
import io.netty.handler.codec.LineBasedFrameDecoder
import io.netty.handler.codec.string.StringDecoder
-import io.netty.util.CharsetUtil
-import org.apache.spark.storage.BlockDataProvider
+import org.apache.spark.storage.BlockDataProvider
/** Channel initializer that sets up the pipeline for the BlockServer. */
private[netty]
@@ -33,7 +33,7 @@ class BlockServerChannelInitializer(dataProvider: BlockDataProvider)
override def initChannel(ch: SocketChannel): Unit = {
ch.pipeline
.addLast("frameDecoder", new LineBasedFrameDecoder(1024)) // max block id length 1024
- .addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8))
+ .addLast("stringDecoder", new StringDecoder(UTF_8))
.addLast("blockHeaderEncoder", new BlockHeaderEncoder)
.addLast("handler", new BlockServerHandler(dataProvider))
}
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
index bda4bf5093..8408b75bb4 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -31,6 +31,8 @@ import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
import scala.language.postfixOps
+import com.google.common.base.Charsets.UTF_8
+
import org.apache.spark._
import org.apache.spark.util.Utils
@@ -923,7 +925,7 @@ private[nio] class ConnectionManager(
val errorMsgByteBuf = ackMessage.asInstanceOf[BufferMessage].buffers.head
val errorMsgBytes = new Array[Byte](errorMsgByteBuf.limit())
errorMsgByteBuf.get(errorMsgBytes)
- val errorMsg = new String(errorMsgBytes, "utf-8")
+ val errorMsg = new String(errorMsgBytes, UTF_8)
val e = new IOException(
s"sendMessageReliably failed with ACK that signalled a remote error: $errorMsg")
if (!promise.tryFailure(e)) {
diff --git a/core/src/main/scala/org/apache/spark/network/nio/Message.scala b/core/src/main/scala/org/apache/spark/network/nio/Message.scala
index 3ad04591da..fb4a979b82 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/Message.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/Message.scala
@@ -22,6 +22,8 @@ import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
+import com.google.common.base.Charsets.UTF_8
+
import org.apache.spark.util.Utils
private[nio] abstract class Message(val typ: Long, val id: Int) {
@@ -92,7 +94,7 @@ private[nio] object Message {
*/
def createErrorMessage(exception: Exception, ackId: Int): BufferMessage = {
val exceptionString = Utils.exceptionString(exception)
- val serializedExceptionString = ByteBuffer.wrap(exceptionString.getBytes("utf-8"))
+ val serializedExceptionString = ByteBuffer.wrap(exceptionString.getBytes(UTF_8))
val errorMessage = createBufferMessage(serializedExceptionString, ackId)
errorMessage.hasError = true
errorMessage
diff --git a/core/src/test/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandlerSuite.scala b/core/src/test/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandlerSuite.scala
index 903ab09ae4..f629322ff6 100644
--- a/core/src/test/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandlerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandlerSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.network.netty.client
import java.nio.ByteBuffer
+import com.google.common.base.Charsets.UTF_8
import io.netty.buffer.Unpooled
import io.netty.channel.embedded.EmbeddedChannel
@@ -42,7 +43,7 @@ class BlockFetchingClientHandlerSuite extends FunSuite with PrivateMethodTester
parsedBlockId = bid
val bytes = new Array[Byte](refCntBuf.byteBuffer().remaining)
refCntBuf.byteBuffer().get(bytes)
- parsedBlockData = new String(bytes)
+ parsedBlockData = new String(bytes, UTF_8)
}
}
)
diff --git a/core/src/test/scala/org/apache/spark/network/netty/server/BlockHeaderEncoderSuite.scala b/core/src/test/scala/org/apache/spark/network/netty/server/BlockHeaderEncoderSuite.scala
index 3ee281cb13..3f8d0cf8f3 100644
--- a/core/src/test/scala/org/apache/spark/network/netty/server/BlockHeaderEncoderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/netty/server/BlockHeaderEncoderSuite.scala
@@ -17,12 +17,12 @@
package org.apache.spark.network.netty.server
+import com.google.common.base.Charsets.UTF_8
import io.netty.buffer.ByteBuf
import io.netty.channel.embedded.EmbeddedChannel
import org.scalatest.FunSuite
-
class BlockHeaderEncoderSuite extends FunSuite {
test("encode normal block data") {
@@ -35,7 +35,7 @@ class BlockHeaderEncoderSuite extends FunSuite {
val blockIdBytes = new Array[Byte](blockId.length)
out.readBytes(blockIdBytes)
- assert(new String(blockIdBytes) === blockId)
+ assert(new String(blockIdBytes, UTF_8) === blockId)
assert(out.readableBytes() === 0)
channel.close()
@@ -52,11 +52,11 @@ class BlockHeaderEncoderSuite extends FunSuite {
val blockIdBytes = new Array[Byte](blockId.length)
out.readBytes(blockIdBytes)
- assert(new String(blockIdBytes) === blockId)
+ assert(new String(blockIdBytes, UTF_8) === blockId)
val errorMsgBytes = new Array[Byte](errorMsg.length)
out.readBytes(errorMsgBytes)
- assert(new String(errorMsgBytes) === errorMsg)
+ assert(new String(errorMsgBytes, UTF_8) === errorMsg)
assert(out.readableBytes() === 0)
channel.close()
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
index d2bee448d4..4dc5b6103d 100644
--- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -18,13 +18,13 @@
package org.apache.spark.util
import java.io._
-import java.nio.charset.Charset
import scala.collection.mutable.HashSet
import scala.reflect._
import org.scalatest.{BeforeAndAfter, FunSuite}
+import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
import org.apache.spark.{Logging, SparkConf}
@@ -44,11 +44,11 @@ class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {
test("basic file appender") {
val testString = (1 to 1000).mkString(", ")
- val inputStream = new ByteArrayInputStream(testString.getBytes(Charset.forName("UTF-8")))
+ val inputStream = new ByteArrayInputStream(testString.getBytes(UTF_8))
val appender = new FileAppender(inputStream, testFile)
inputStream.close()
appender.awaitTermination()
- assert(Files.toString(testFile, Charset.forName("UTF-8")) === testString)
+ assert(Files.toString(testFile, UTF_8) === testString)
}
test("rolling file appender - time-based rolling") {
@@ -96,7 +96,7 @@ class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {
val allGeneratedFiles = new HashSet[String]()
val items = (1 to 10).map { _.toString * 10000 }
for (i <- 0 until items.size) {
- testOutputStream.write(items(i).getBytes(Charset.forName("UTF-8")))
+ testOutputStream.write(items(i).getBytes(UTF_8))
testOutputStream.flush()
allGeneratedFiles ++= RollingFileAppender.getSortedRolledOverFiles(
testFile.getParentFile.toString, testFile.getName).map(_.toString)
@@ -199,7 +199,7 @@ class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {
// send data to appender through the input stream, and wait for the data to be written
val expectedText = textToAppend.mkString("")
for (i <- 0 until textToAppend.size) {
- outputStream.write(textToAppend(i).getBytes(Charset.forName("UTF-8")))
+ outputStream.write(textToAppend(i).getBytes(UTF_8))
outputStream.flush()
Thread.sleep(sleepTimeBetweenTexts)
}
@@ -214,7 +214,7 @@ class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {
logInfo("Filtered files: \n" + generatedFiles.mkString("\n"))
assert(generatedFiles.size > 1)
val allText = generatedFiles.map { file =>
- Files.toString(file, Charset.forName("UTF-8"))
+ Files.toString(file, UTF_8)
}.mkString("")
assert(allText === expectedText)
generatedFiles
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index ea7ef0524d..65579bb9af 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -23,7 +23,7 @@ import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStr
import java.net.{BindException, ServerSocket, URI}
import java.nio.{ByteBuffer, ByteOrder}
-import com.google.common.base.Charsets
+import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
import org.scalatest.FunSuite
@@ -118,7 +118,7 @@ class UtilsSuite extends FunSuite {
tmpDir2.deleteOnExit()
val f1Path = tmpDir2 + "/f1"
val f1 = new FileOutputStream(f1Path)
- f1.write("1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(Charsets.UTF_8))
+ f1.write("1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(UTF_8))
f1.close()
// Read first few bytes
@@ -146,9 +146,9 @@ class UtilsSuite extends FunSuite {
val tmpDir = Utils.createTempDir()
tmpDir.deleteOnExit()
val files = (1 to 3).map(i => new File(tmpDir, i.toString))
- Files.write("0123456789", files(0), Charsets.UTF_8)
- Files.write("abcdefghij", files(1), Charsets.UTF_8)
- Files.write("ABCDEFGHIJ", files(2), Charsets.UTF_8)
+ Files.write("0123456789", files(0), UTF_8)
+ Files.write("abcdefghij", files(1), UTF_8)
+ Files.write("ABCDEFGHIJ", files(2), UTF_8)
// Read first few bytes in the 1st file
assert(Utils.offsetBytes(files, 0, 5) === "01234")
@@ -339,7 +339,7 @@ class UtilsSuite extends FunSuite {
try {
System.setProperty("spark.test.fileNameLoadB", "2")
Files.write("spark.test.fileNameLoadA true\n" +
- "spark.test.fileNameLoadB 1\n", outFile, Charsets.UTF_8)
+ "spark.test.fileNameLoadB 1\n", outFile, UTF_8)
val properties = Utils.getPropertiesFromFile(outFile.getAbsolutePath)
properties
.filter { case (k, v) => k.startsWith("spark.")}