diff options
author | zsxwing <zsxwing@gmail.com> | 2014-10-28 14:26:57 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2014-10-28 14:26:57 -0700 |
commit | abcafcfba38d7c8dba68a5510475c5c49ae54d92 (patch) | |
tree | 26336d2770d7d9a033bbe9f1c1dea6fa5bbbae1d | |
parent | 47a40f60d62ea69b659959994918d4c640f39d5b (diff) | |
download | spark-abcafcfba38d7c8dba68a5510475c5c49ae54d92.tar.gz spark-abcafcfba38d7c8dba68a5510475c5c49ae54d92.tar.bz2 spark-abcafcfba38d7c8dba68a5510475c5c49ae54d92.zip |
[Spark 3922] Refactor spark-core to use Utils.UTF_8
A global UTF8 constant is very helpful to handle encoding problems when converting between String and bytes. There are several solutions here:
1. Add `val UTF_8 = Charset.forName("UTF-8")` to Utils.scala
2. java.nio.charset.StandardCharsets.UTF_8 (require JDK7)
3. io.netty.util.CharsetUtil.UTF_8
4. com.google.common.base.Charsets.UTF_8
5. org.apache.commons.lang.CharEncoding.UTF_8
6. org.apache.commons.lang3.CharEncoding.UTF_8
IMO, I prefer option 1) because people can find it easily.
This is a PR for option 1) and only fixes Spark Core.
Author: zsxwing <zsxwing@gmail.com>
Closes #2781 from zsxwing/SPARK-3922 and squashes the following commits:
f974edd [zsxwing] Merge branch 'master' into SPARK-3922
2d27423 [zsxwing] Refactor spark-core to use Refactor spark-core to use Utils.UTF_8
16 files changed, 55 insertions, 46 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkSaslClient.scala b/core/src/main/scala/org/apache/spark/SparkSaslClient.scala index 65003b6ac6..a954fcc0c3 100644 --- a/core/src/main/scala/org/apache/spark/SparkSaslClient.scala +++ b/core/src/main/scala/org/apache/spark/SparkSaslClient.scala @@ -17,7 +17,6 @@ package org.apache.spark -import java.io.IOException import javax.security.auth.callback.Callback import javax.security.auth.callback.CallbackHandler import javax.security.auth.callback.NameCallback @@ -31,6 +30,8 @@ import javax.security.sasl.SaslException import scala.collection.JavaConversions.mapAsJavaMap +import com.google.common.base.Charsets.UTF_8 + /** * Implements SASL Client logic for Spark */ @@ -111,10 +112,10 @@ private[spark] class SparkSaslClient(securityMgr: SecurityManager) extends Logg CallbackHandler { private val userName: String = - SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes("utf-8")) + SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes(UTF_8)) private val secretKey = securityMgr.getSecretKey() private val userPassword: Array[Char] = SparkSaslServer.encodePassword( - if (secretKey != null) secretKey.getBytes("utf-8") else "".getBytes("utf-8")) + if (secretKey != null) secretKey.getBytes(UTF_8) else "".getBytes(UTF_8)) /** * Implementation used to respond to SASL request from the server. diff --git a/core/src/main/scala/org/apache/spark/SparkSaslServer.scala b/core/src/main/scala/org/apache/spark/SparkSaslServer.scala index f6b0a9132a..7c2afb3646 100644 --- a/core/src/main/scala/org/apache/spark/SparkSaslServer.scala +++ b/core/src/main/scala/org/apache/spark/SparkSaslServer.scala @@ -28,6 +28,8 @@ import javax.security.sasl.Sasl import javax.security.sasl.SaslException import javax.security.sasl.SaslServer import scala.collection.JavaConversions.mapAsJavaMap + +import com.google.common.base.Charsets.UTF_8 import org.apache.commons.net.util.Base64 /** @@ -89,7 +91,7 @@ private[spark] class SparkSaslServer(securityMgr: SecurityManager) extends Loggi extends CallbackHandler { private val userName: String = - SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes("utf-8")) + SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes(UTF_8)) override def handle(callbacks: Array[Callback]) { logDebug("In the sasl server callback handler") @@ -101,7 +103,7 @@ private[spark] class SparkSaslServer(securityMgr: SecurityManager) extends Loggi case pc: PasswordCallback => { logDebug("handle: SASL server callback: setting userPassword") val password: Array[Char] = - SparkSaslServer.encodePassword(securityMgr.getSecretKey().getBytes("utf-8")) + SparkSaslServer.encodePassword(securityMgr.getSecretKey().getBytes(UTF_8)) pc.setPassword(password) } case rc: RealmCallback => { @@ -159,7 +161,7 @@ private[spark] object SparkSaslServer { * @return Base64-encoded string */ def encodeIdentifier(identifier: Array[Byte]): String = { - new String(Base64.encodeBase64(identifier), "utf-8") + new String(Base64.encodeBase64(identifier), UTF_8) } /** @@ -168,7 +170,7 @@ private[spark] object SparkSaslServer { * @return password as a char array. */ def encodePassword(password: Array[Byte]): Array[Char] = { - new String(Base64.encodeBase64(password), "utf-8").toCharArray() + new String(Base64.encodeBase64(password), UTF_8).toCharArray() } } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 163dca6cad..61b125ef7c 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -19,7 +19,6 @@ package org.apache.spark.api.python import java.io._ import java.net._ -import java.nio.charset.Charset import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections} import scala.collection.JavaConversions._ @@ -27,6 +26,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.language.existentials +import com.google.common.base.Charsets.UTF_8 import net.razorvine.pickle.{Pickler, Unpickler} import org.apache.hadoop.conf.Configuration @@ -134,7 +134,7 @@ private[spark] class PythonRDD( val exLength = stream.readInt() val obj = new Array[Byte](exLength) stream.readFully(obj) - throw new PythonException(new String(obj, "utf-8"), + throw new PythonException(new String(obj, UTF_8), writerThread.exception.getOrElse(null)) case SpecialLengths.END_OF_DATA_SECTION => // We've finished the data section of the output, but we can still @@ -318,7 +318,6 @@ private object SpecialLengths { } private[spark] object PythonRDD extends Logging { - val UTF8 = Charset.forName("UTF-8") // remember the broadcasts sent to each worker private val workerBroadcasts = new mutable.WeakHashMap[Socket, mutable.Set[Long]]() @@ -586,7 +585,7 @@ private[spark] object PythonRDD extends Logging { } def writeUTF(str: String, dataOut: DataOutputStream) { - val bytes = str.getBytes(UTF8) + val bytes = str.getBytes(UTF_8) dataOut.writeInt(bytes.length) dataOut.write(bytes) } @@ -849,7 +848,7 @@ private[spark] object PythonRDD extends Logging { private class BytesToString extends org.apache.spark.api.java.function.Function[Array[Byte], String] { - override def call(arr: Array[Byte]) : String = new String(arr, PythonRDD.UTF8) + override def call(arr: Array[Byte]) : String = new String(arr, UTF_8) } /** diff --git a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala index d11db978b8..e9ca9166eb 100644 --- a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala +++ b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala @@ -18,7 +18,8 @@ package org.apache.spark.api.python import java.io.{DataOutput, DataInput} -import java.nio.charset.Charset + +import com.google.common.base.Charsets.UTF_8 import org.apache.hadoop.io._ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat @@ -136,7 +137,7 @@ object WriteInputFormatTestDataGenerator { sc.parallelize(intKeys).saveAsSequenceFile(intPath) sc.parallelize(intKeys.map{ case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath) sc.parallelize(intKeys.map{ case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath) - sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes(Charset.forName("UTF-8"))) } + sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes(UTF_8)) } ).saveAsSequenceFile(bytesPath) val bools = Seq((1, true), (2, true), (2, false), (3, true), (2, false), (1, false)) sc.parallelize(bools).saveAsSequenceFile(boolPath) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 9f99117625..3bf0b9492d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConversions._ import scala.collection.Map import akka.actor.ActorRef -import com.google.common.base.Charsets +import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileUtil, Path} @@ -178,7 +178,7 @@ private[spark] class DriverRunner( val stderr = new File(baseDir, "stderr") val header = "Launch Command: %s\n%s\n\n".format( command.mkString("\"", "\" \"", "\""), "=" * 40) - Files.append(header, stderr, Charsets.UTF_8) + Files.append(header, stderr, UTF_8) CommandUtils.redirectStream(process.getErrorStream, stderr) } runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 71d7385b08..030a651469 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -20,7 +20,7 @@ package org.apache.spark.deploy.worker import java.io._ import akka.actor.ActorRef -import com.google.common.base.Charsets +import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files import org.apache.spark.{SparkConf, Logging} @@ -151,7 +151,7 @@ private[spark] class ExecutorRunner( stdoutAppender = FileAppender(process.getInputStream, stdout, conf) val stderr = new File(executorDir, "stderr") - Files.write(header, stderr, Charsets.UTF_8) + Files.write(header, stderr, UTF_8) stderrAppender = FileAppender(process.getErrorStream, stderr, conf) state = ExecutorState.RUNNING diff --git a/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClient.scala b/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClient.scala index 5aea7ba2f3..3ab13b96d7 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClient.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClient.scala @@ -19,13 +19,13 @@ package org.apache.spark.network.netty.client import java.util.concurrent.TimeoutException +import com.google.common.base.Charsets.UTF_8 import io.netty.bootstrap.Bootstrap import io.netty.buffer.PooledByteBufAllocator import io.netty.channel.socket.SocketChannel import io.netty.channel.{ChannelFutureListener, ChannelFuture, ChannelInitializer, ChannelOption} import io.netty.handler.codec.LengthFieldBasedFrameDecoder import io.netty.handler.codec.string.StringEncoder -import io.netty.util.CharsetUtil import org.apache.spark.Logging @@ -61,7 +61,7 @@ class BlockFetchingClient(factory: BlockFetchingClientFactory, hostname: String, b.handler(new ChannelInitializer[SocketChannel] { override def initChannel(ch: SocketChannel): Unit = { ch.pipeline - .addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)) + .addLast("encoder", new StringEncoder(UTF_8)) // maxFrameLength = 2G, lengthFieldOffset = 0, lengthFieldLength = 4 .addLast("framedLengthDecoder", new LengthFieldBasedFrameDecoder(Int.MaxValue, 0, 4)) .addLast("handler", handler) diff --git a/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandler.scala b/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandler.scala index 83265b1642..d9d3f7bef0 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandler.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandler.scala @@ -17,6 +17,7 @@ package org.apache.spark.network.netty.client +import com.google.common.base.Charsets.UTF_8 import io.netty.buffer.ByteBuf import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler} @@ -67,7 +68,7 @@ class BlockFetchingClientHandler extends SimpleChannelInboundHandler[ByteBuf] wi val blockIdLen = in.readInt() val blockIdBytes = new Array[Byte](math.abs(blockIdLen)) in.readBytes(blockIdBytes) - val blockId = new String(blockIdBytes) + val blockId = new String(blockIdBytes, UTF_8) val blockSize = totalLen - math.abs(blockIdLen) - 4 def server = ctx.channel.remoteAddress.toString @@ -76,7 +77,7 @@ class BlockFetchingClientHandler extends SimpleChannelInboundHandler[ByteBuf] wi if (blockIdLen < 0) { val errorMessageBytes = new Array[Byte](blockSize) in.readBytes(errorMessageBytes) - val errorMsg = new String(errorMessageBytes) + val errorMsg = new String(errorMessageBytes, UTF_8) logTrace(s"Received block $blockId ($blockSize B) with error $errorMsg from $server") val listener = outstandingRequests.get(blockId) diff --git a/core/src/main/scala/org/apache/spark/network/netty/server/BlockServer.scala b/core/src/main/scala/org/apache/spark/network/netty/server/BlockServer.scala index 7b2f9a8d4d..9194c7ced3 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/server/BlockServer.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/server/BlockServer.scala @@ -19,6 +19,7 @@ package org.apache.spark.network.netty.server import java.net.InetSocketAddress +import com.google.common.base.Charsets.UTF_8 import io.netty.bootstrap.ServerBootstrap import io.netty.buffer.PooledByteBufAllocator import io.netty.channel.{ChannelFuture, ChannelInitializer, ChannelOption} @@ -30,7 +31,6 @@ import io.netty.channel.socket.nio.NioServerSocketChannel import io.netty.channel.socket.oio.OioServerSocketChannel import io.netty.handler.codec.LineBasedFrameDecoder import io.netty.handler.codec.string.StringDecoder -import io.netty.util.CharsetUtil import org.apache.spark.{Logging, SparkConf} import org.apache.spark.network.netty.NettyConfig @@ -131,7 +131,7 @@ class BlockServer(conf: NettyConfig, dataProvider: BlockDataProvider) extends Lo override def initChannel(ch: SocketChannel): Unit = { ch.pipeline .addLast("frameDecoder", new LineBasedFrameDecoder(1024)) // max block id length 1024 - .addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8)) + .addLast("stringDecoder", new StringDecoder(UTF_8)) .addLast("blockHeaderEncoder", new BlockHeaderEncoder) .addLast("handler", new BlockServerHandler(dataProvider)) } diff --git a/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerChannelInitializer.scala b/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerChannelInitializer.scala index cc70bd0c5c..188154d51d 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerChannelInitializer.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerChannelInitializer.scala @@ -17,13 +17,13 @@ package org.apache.spark.network.netty.server +import com.google.common.base.Charsets.UTF_8 import io.netty.channel.ChannelInitializer import io.netty.channel.socket.SocketChannel import io.netty.handler.codec.LineBasedFrameDecoder import io.netty.handler.codec.string.StringDecoder -import io.netty.util.CharsetUtil -import org.apache.spark.storage.BlockDataProvider +import org.apache.spark.storage.BlockDataProvider /** Channel initializer that sets up the pipeline for the BlockServer. */ private[netty] @@ -33,7 +33,7 @@ class BlockServerChannelInitializer(dataProvider: BlockDataProvider) override def initChannel(ch: SocketChannel): Unit = { ch.pipeline .addLast("frameDecoder", new LineBasedFrameDecoder(1024)) // max block id length 1024 - .addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8)) + .addLast("stringDecoder", new StringDecoder(UTF_8)) .addLast("blockHeaderEncoder", new BlockHeaderEncoder) .addLast("handler", new BlockServerHandler(dataProvider)) } diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala index bda4bf5093..8408b75bb4 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala @@ -31,6 +31,8 @@ import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future, Promise} import scala.language.postfixOps +import com.google.common.base.Charsets.UTF_8 + import org.apache.spark._ import org.apache.spark.util.Utils @@ -923,7 +925,7 @@ private[nio] class ConnectionManager( val errorMsgByteBuf = ackMessage.asInstanceOf[BufferMessage].buffers.head val errorMsgBytes = new Array[Byte](errorMsgByteBuf.limit()) errorMsgByteBuf.get(errorMsgBytes) - val errorMsg = new String(errorMsgBytes, "utf-8") + val errorMsg = new String(errorMsgBytes, UTF_8) val e = new IOException( s"sendMessageReliably failed with ACK that signalled a remote error: $errorMsg") if (!promise.tryFailure(e)) { diff --git a/core/src/main/scala/org/apache/spark/network/nio/Message.scala b/core/src/main/scala/org/apache/spark/network/nio/Message.scala index 3ad04591da..fb4a979b82 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/Message.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/Message.scala @@ -22,6 +22,8 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer +import com.google.common.base.Charsets.UTF_8 + import org.apache.spark.util.Utils private[nio] abstract class Message(val typ: Long, val id: Int) { @@ -92,7 +94,7 @@ private[nio] object Message { */ def createErrorMessage(exception: Exception, ackId: Int): BufferMessage = { val exceptionString = Utils.exceptionString(exception) - val serializedExceptionString = ByteBuffer.wrap(exceptionString.getBytes("utf-8")) + val serializedExceptionString = ByteBuffer.wrap(exceptionString.getBytes(UTF_8)) val errorMessage = createBufferMessage(serializedExceptionString, ackId) errorMessage.hasError = true errorMessage diff --git a/core/src/test/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandlerSuite.scala b/core/src/test/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandlerSuite.scala index 903ab09ae4..f629322ff6 100644 --- a/core/src/test/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandlerSuite.scala +++ b/core/src/test/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandlerSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.network.netty.client import java.nio.ByteBuffer +import com.google.common.base.Charsets.UTF_8 import io.netty.buffer.Unpooled import io.netty.channel.embedded.EmbeddedChannel @@ -42,7 +43,7 @@ class BlockFetchingClientHandlerSuite extends FunSuite with PrivateMethodTester parsedBlockId = bid val bytes = new Array[Byte](refCntBuf.byteBuffer().remaining) refCntBuf.byteBuffer().get(bytes) - parsedBlockData = new String(bytes) + parsedBlockData = new String(bytes, UTF_8) } } ) diff --git a/core/src/test/scala/org/apache/spark/network/netty/server/BlockHeaderEncoderSuite.scala b/core/src/test/scala/org/apache/spark/network/netty/server/BlockHeaderEncoderSuite.scala index 3ee281cb13..3f8d0cf8f3 100644 --- a/core/src/test/scala/org/apache/spark/network/netty/server/BlockHeaderEncoderSuite.scala +++ b/core/src/test/scala/org/apache/spark/network/netty/server/BlockHeaderEncoderSuite.scala @@ -17,12 +17,12 @@ package org.apache.spark.network.netty.server +import com.google.common.base.Charsets.UTF_8 import io.netty.buffer.ByteBuf import io.netty.channel.embedded.EmbeddedChannel import org.scalatest.FunSuite - class BlockHeaderEncoderSuite extends FunSuite { test("encode normal block data") { @@ -35,7 +35,7 @@ class BlockHeaderEncoderSuite extends FunSuite { val blockIdBytes = new Array[Byte](blockId.length) out.readBytes(blockIdBytes) - assert(new String(blockIdBytes) === blockId) + assert(new String(blockIdBytes, UTF_8) === blockId) assert(out.readableBytes() === 0) channel.close() @@ -52,11 +52,11 @@ class BlockHeaderEncoderSuite extends FunSuite { val blockIdBytes = new Array[Byte](blockId.length) out.readBytes(blockIdBytes) - assert(new String(blockIdBytes) === blockId) + assert(new String(blockIdBytes, UTF_8) === blockId) val errorMsgBytes = new Array[Byte](errorMsg.length) out.readBytes(errorMsgBytes) - assert(new String(errorMsgBytes) === errorMsg) + assert(new String(errorMsgBytes, UTF_8) === errorMsg) assert(out.readableBytes() === 0) channel.close() diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala index d2bee448d4..4dc5b6103d 100644 --- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala @@ -18,13 +18,13 @@ package org.apache.spark.util import java.io._ -import java.nio.charset.Charset import scala.collection.mutable.HashSet import scala.reflect._ import org.scalatest.{BeforeAndAfter, FunSuite} +import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files import org.apache.spark.{Logging, SparkConf} @@ -44,11 +44,11 @@ class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging { test("basic file appender") { val testString = (1 to 1000).mkString(", ") - val inputStream = new ByteArrayInputStream(testString.getBytes(Charset.forName("UTF-8"))) + val inputStream = new ByteArrayInputStream(testString.getBytes(UTF_8)) val appender = new FileAppender(inputStream, testFile) inputStream.close() appender.awaitTermination() - assert(Files.toString(testFile, Charset.forName("UTF-8")) === testString) + assert(Files.toString(testFile, UTF_8) === testString) } test("rolling file appender - time-based rolling") { @@ -96,7 +96,7 @@ class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging { val allGeneratedFiles = new HashSet[String]() val items = (1 to 10).map { _.toString * 10000 } for (i <- 0 until items.size) { - testOutputStream.write(items(i).getBytes(Charset.forName("UTF-8"))) + testOutputStream.write(items(i).getBytes(UTF_8)) testOutputStream.flush() allGeneratedFiles ++= RollingFileAppender.getSortedRolledOverFiles( testFile.getParentFile.toString, testFile.getName).map(_.toString) @@ -199,7 +199,7 @@ class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging { // send data to appender through the input stream, and wait for the data to be written val expectedText = textToAppend.mkString("") for (i <- 0 until textToAppend.size) { - outputStream.write(textToAppend(i).getBytes(Charset.forName("UTF-8"))) + outputStream.write(textToAppend(i).getBytes(UTF_8)) outputStream.flush() Thread.sleep(sleepTimeBetweenTexts) } @@ -214,7 +214,7 @@ class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging { logInfo("Filtered files: \n" + generatedFiles.mkString("\n")) assert(generatedFiles.size > 1) val allText = generatedFiles.map { file => - Files.toString(file, Charset.forName("UTF-8")) + Files.toString(file, UTF_8) }.mkString("") assert(allText === expectedText) generatedFiles diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index ea7ef0524d..65579bb9af 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -23,7 +23,7 @@ import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStr import java.net.{BindException, ServerSocket, URI} import java.nio.{ByteBuffer, ByteOrder} -import com.google.common.base.Charsets +import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files import org.scalatest.FunSuite @@ -118,7 +118,7 @@ class UtilsSuite extends FunSuite { tmpDir2.deleteOnExit() val f1Path = tmpDir2 + "/f1" val f1 = new FileOutputStream(f1Path) - f1.write("1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(Charsets.UTF_8)) + f1.write("1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(UTF_8)) f1.close() // Read first few bytes @@ -146,9 +146,9 @@ class UtilsSuite extends FunSuite { val tmpDir = Utils.createTempDir() tmpDir.deleteOnExit() val files = (1 to 3).map(i => new File(tmpDir, i.toString)) - Files.write("0123456789", files(0), Charsets.UTF_8) - Files.write("abcdefghij", files(1), Charsets.UTF_8) - Files.write("ABCDEFGHIJ", files(2), Charsets.UTF_8) + Files.write("0123456789", files(0), UTF_8) + Files.write("abcdefghij", files(1), UTF_8) + Files.write("ABCDEFGHIJ", files(2), UTF_8) // Read first few bytes in the 1st file assert(Utils.offsetBytes(files, 0, 5) === "01234") @@ -339,7 +339,7 @@ class UtilsSuite extends FunSuite { try { System.setProperty("spark.test.fileNameLoadB", "2") Files.write("spark.test.fileNameLoadA true\n" + - "spark.test.fileNameLoadB 1\n", outFile, Charsets.UTF_8) + "spark.test.fileNameLoadB 1\n", outFile, UTF_8) val properties = Utils.getPropertiesFromFile(outFile.getAbsolutePath) properties .filter { case (k, v) => k.startsWith("spark.")} |