aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bin/compute-classpath.cmd157
-rw-r--r--core/src/main/scala/org/apache/spark/SparkSaslClient.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/SparkSaslServer.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/network/ReceiverTest.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/network/SenderTest.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala24
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java4
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala2
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala2
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala2
-rw-r--r--repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala44
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/ScriptTransformation.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala9
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/BigDataBenchmarkSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala6
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala2
21 files changed, 185 insertions, 116 deletions
diff --git a/bin/compute-classpath.cmd b/bin/compute-classpath.cmd
index 4f60bff19c..065553eb31 100644
--- a/bin/compute-classpath.cmd
+++ b/bin/compute-classpath.cmd
@@ -1,69 +1,88 @@
-@echo off
-
-rem
-rem Licensed to the Apache Software Foundation (ASF) under one or more
-rem contributor license agreements. See the NOTICE file distributed with
-rem this work for additional information regarding copyright ownership.
-rem The ASF licenses this file to You under the Apache License, Version 2.0
-rem (the "License"); you may not use this file except in compliance with
-rem the License. You may obtain a copy of the License at
-rem
-rem http://www.apache.org/licenses/LICENSE-2.0
-rem
-rem Unless required by applicable law or agreed to in writing, software
-rem distributed under the License is distributed on an "AS IS" BASIS,
-rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-rem See the License for the specific language governing permissions and
-rem limitations under the License.
-rem
-
-rem This script computes Spark's classpath and prints it to stdout; it's used by both the "run"
-rem script and the ExecutorRunner in standalone cluster mode.
-
-set SCALA_VERSION=2.10
-
-rem Figure out where the Spark framework is installed
-set FWDIR=%~dp0..\
-
-rem Load environment variables from conf\spark-env.cmd, if it exists
-if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
-
-rem Build up classpath
-set CLASSPATH=%FWDIR%conf
-if exist "%FWDIR%RELEASE" (
- for %%d in ("%FWDIR%jars\spark-assembly*.jar") do (
- set ASSEMBLY_JAR=%%d
- )
-) else (
- for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*.jar") do (
- set ASSEMBLY_JAR=%%d
- )
-)
-set CLASSPATH=%CLASSPATH%;%ASSEMBLY_JAR%
-
-if "x%SPARK_TESTING%"=="x1" (
- rem Add test clases to path
- set CLASSPATH=%CLASSPATH%;%FWDIR%core\target\scala-%SCALA_VERSION%\test-classes
- set CLASSPATH=%CLASSPATH%;%FWDIR%repl\target\scala-%SCALA_VERSION%\test-classes
- set CLASSPATH=%CLASSPATH%;%FWDIR%mllib\target\scala-%SCALA_VERSION%\test-classes
- set CLASSPATH=%CLASSPATH%;%FWDIR%bagel\target\scala-%SCALA_VERSION%\test-classes
- set CLASSPATH=%CLASSPATH%;%FWDIR%streaming\target\scala-%SCALA_VERSION%\test-classes
-)
-
-rem Add hadoop conf dir - else FileSystem.*, etc fail
-rem Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts
-rem the configurtion files.
-if "x%HADOOP_CONF_DIR%"=="x" goto no_hadoop_conf_dir
- set CLASSPATH=%CLASSPATH%;%HADOOP_CONF_DIR%
-:no_hadoop_conf_dir
-
-if "x%YARN_CONF_DIR%"=="x" goto no_yarn_conf_dir
- set CLASSPATH=%CLASSPATH%;%YARN_CONF_DIR%
-:no_yarn_conf_dir
-
-rem A bit of a hack to allow calling this script within run2.cmd without seeing output
-if "%DONT_PRINT_CLASSPATH%"=="1" goto exit
-
-echo %CLASSPATH%
-
-:exit
+@echo off
+
+rem
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements. See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License. You may obtain a copy of the License at
+rem
+rem http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+rem
+
+rem This script computes Spark's classpath and prints it to stdout; it's used by both the "run"
+rem script and the ExecutorRunner in standalone cluster mode.
+
+set SCALA_VERSION=2.10
+
+rem Figure out where the Spark framework is installed
+set FWDIR=%~dp0..\
+
+rem Load environment variables from conf\spark-env.cmd, if it exists
+if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
+
+rem Build up classpath
+set CLASSPATH=%FWDIR%conf
+if exist "%FWDIR%RELEASE" (
+ for %%d in ("%FWDIR%jars\spark-assembly*.jar") do (
+ set ASSEMBLY_JAR=%%d
+ )
+) else (
+ for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*.jar") do (
+ set ASSEMBLY_JAR=%%d
+ )
+)
+
+set CLASSPATH=%CLASSPATH%;%ASSEMBLY_JAR%
+
+set SPARK_CLASSES=%FWDIR%core\target\scala-%SCALA_VERSION%\classes
+set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%repl\target\scala-%SCALA_VERSION%\classes
+set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%mllib\target\scala-%SCALA_VERSION%\classes
+set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%bagel\target\scala-%SCALA_VERSION%\classes
+set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%graphx\target\scala-%SCALA_VERSION%\classes
+set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%streaming\target\scala-%SCALA_VERSION%\classes
+set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%tools\target\scala-%SCALA_VERSION%\classes
+set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%sql\catalyst\target\scala-%SCALA_VERSION%\classes
+set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%sql\core\target\scala-%SCALA_VERSION%\classes
+set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%sql\hive\target\scala-%SCALA_VERSION%\classes
+
+set SPARK_TEST_CLASSES=%FWDIR%core\target\scala-%SCALA_VERSION%\test-classes
+set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%repl\target\scala-%SCALA_VERSION%\test-classes
+set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%mllib\target\scala-%SCALA_VERSION%\test-classes
+set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%bagel\target\scala-%SCALA_VERSION%\test-classes
+set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%graphx\target\scala-%SCALA_VERSION%\test-classes
+set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%streaming\target\scala-%SCALA_VERSION%\test-classes
+set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%sql\catalyst\target\scala-%SCALA_VERSION%\test-classes
+set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%sql\core\target\scala-%SCALA_VERSION%\test-classes
+set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%sql\hive\target\scala-%SCALA_VERSION%\test-classes
+
+if "x%SPARK_TESTING%"=="x1" (
+ rem Add test clases to path - note, add SPARK_CLASSES and SPARK_TEST_CLASSES before CLASSPATH
+ rem so that local compilation takes precedence over assembled jar
+ set CLASSPATH=%SPARK_CLASSES%;%SPARK_TEST_CLASSES%;%CLASSPATH%
+)
+
+rem Add hadoop conf dir - else FileSystem.*, etc fail
+rem Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts
+rem the configurtion files.
+if "x%HADOOP_CONF_DIR%"=="x" goto no_hadoop_conf_dir
+ set CLASSPATH=%CLASSPATH%;%HADOOP_CONF_DIR%
+:no_hadoop_conf_dir
+
+if "x%YARN_CONF_DIR%"=="x" goto no_yarn_conf_dir
+ set CLASSPATH=%CLASSPATH%;%YARN_CONF_DIR%
+:no_yarn_conf_dir
+
+rem A bit of a hack to allow calling this script within run2.cmd without seeing output
+if "%DONT_PRINT_CLASSPATH%"=="1" goto exit
+
+echo %CLASSPATH%
+
+:exit
diff --git a/core/src/main/scala/org/apache/spark/SparkSaslClient.scala b/core/src/main/scala/org/apache/spark/SparkSaslClient.scala
index 5b14c4291d..65003b6ac6 100644
--- a/core/src/main/scala/org/apache/spark/SparkSaslClient.scala
+++ b/core/src/main/scala/org/apache/spark/SparkSaslClient.scala
@@ -111,10 +111,10 @@ private[spark] class SparkSaslClient(securityMgr: SecurityManager) extends Logg
CallbackHandler {
private val userName: String =
- SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes())
+ SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes("utf-8"))
private val secretKey = securityMgr.getSecretKey()
- private val userPassword: Array[Char] =
- SparkSaslServer.encodePassword(if (secretKey != null) secretKey.getBytes() else "".getBytes())
+ private val userPassword: Array[Char] = SparkSaslServer.encodePassword(
+ if (secretKey != null) secretKey.getBytes("utf-8") else "".getBytes("utf-8"))
/**
* Implementation used to respond to SASL request from the server.
diff --git a/core/src/main/scala/org/apache/spark/SparkSaslServer.scala b/core/src/main/scala/org/apache/spark/SparkSaslServer.scala
index 6161a6fb7a..f6b0a9132a 100644
--- a/core/src/main/scala/org/apache/spark/SparkSaslServer.scala
+++ b/core/src/main/scala/org/apache/spark/SparkSaslServer.scala
@@ -89,7 +89,7 @@ private[spark] class SparkSaslServer(securityMgr: SecurityManager) extends Loggi
extends CallbackHandler {
private val userName: String =
- SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes())
+ SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes("utf-8"))
override def handle(callbacks: Array[Callback]) {
logDebug("In the sasl server callback handler")
@@ -101,7 +101,7 @@ private[spark] class SparkSaslServer(securityMgr: SecurityManager) extends Loggi
case pc: PasswordCallback => {
logDebug("handle: SASL server callback: setting userPassword")
val password: Array[Char] =
- SparkSaslServer.encodePassword(securityMgr.getSecretKey().getBytes())
+ SparkSaslServer.encodePassword(securityMgr.getSecretKey().getBytes("utf-8"))
pc.setPassword(password)
}
case rc: RealmCallback => {
@@ -159,7 +159,7 @@ private[spark] object SparkSaslServer {
* @return Base64-encoded string
*/
def encodeIdentifier(identifier: Array[Byte]): String = {
- new String(Base64.encodeBase64(identifier))
+ new String(Base64.encodeBase64(identifier), "utf-8")
}
/**
@@ -168,7 +168,7 @@ private[spark] object SparkSaslServer {
* @return password as a char array.
*/
def encodePassword(password: Array[Byte]): Array[Char] = {
- new String(Base64.encodeBase64(password)).toCharArray()
+ new String(Base64.encodeBase64(password), "utf-8").toCharArray()
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 1498b017a7..672c344a56 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -187,7 +187,7 @@ private[spark] class PythonRDD[T: ClassTag](
val exLength = stream.readInt()
val obj = new Array[Byte](exLength)
stream.readFully(obj)
- throw new PythonException(new String(obj), readerException)
+ throw new PythonException(new String(obj, "utf-8"), readerException)
case SpecialLengths.END_OF_DATA_SECTION =>
// We've finished the data section of the output, but we can still
// read some accumulator updates:
diff --git a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
index 9dc51e0d40..53a6038a9b 100644
--- a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
@@ -28,7 +28,7 @@ private[spark] object ReceiverTest {
manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
/* println("Received [" + msg + "] from [" + id + "] at " + System.currentTimeMillis) */
- val buffer = ByteBuffer.wrap("response".getBytes)
+ val buffer = ByteBuffer.wrap("response".getBytes("utf-8"))
Some(Message.createBufferMessage(buffer, msg.id))
})
Thread.currentThread.join()
diff --git a/core/src/main/scala/org/apache/spark/network/SenderTest.scala b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
index 14c094c617..b8ea7c2cff 100644
--- a/core/src/main/scala/org/apache/spark/network/SenderTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
@@ -54,7 +54,7 @@ private[spark] object SenderTest {
val responseStr = manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage)
.map { response =>
val buffer = response.asInstanceOf[BufferMessage].buffers(0)
- new String(buffer.array)
+ new String(buffer.array, "utf-8")
}.getOrElse("none")
val finishTime = System.currentTimeMillis
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index e441d4a40c..5d77d37378 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -87,10 +87,10 @@ private[spark] class PipedRDD[T: ClassTag](
// When spark.worker.separated.working.directory option is turned on, each
// task will be run in separate directory. This should be resolve file
// access conflict issue
- val taskDirectory = "./tasks/" + java.util.UUID.randomUUID.toString
+ val taskDirectory = "tasks" + File.separator + java.util.UUID.randomUUID.toString
var workInTaskDirectory = false
logDebug("taskDirectory = " + taskDirectory)
- if (separateWorkingDir == true) {
+ if (separateWorkingDir) {
val currentDir = new File(".")
logDebug("currentDir = " + currentDir.getAbsolutePath())
val taskDirFile = new File(taskDirectory)
@@ -106,13 +106,13 @@ private[spark] class PipedRDD[T: ClassTag](
for (file <- currentDir.list(tasksDirFilter)) {
val fileWithDir = new File(currentDir, file)
Utils.symlink(new File(fileWithDir.getAbsolutePath()),
- new File(taskDirectory + "/" + fileWithDir.getName()))
+ new File(taskDirectory + File.separator + fileWithDir.getName()))
}
pb.directory(taskDirFile)
workInTaskDirectory = true
} catch {
case e: Exception => logError("Unable to setup task working directory: " + e.getMessage +
- " (" + taskDirectory + ")")
+ " (" + taskDirectory + ")", e)
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 084a71c4ca..8351f7156a 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -30,6 +30,7 @@ import scala.io.Source
import scala.reflect.ClassTag
import com.google.common.io.Files
+import org.apache.commons.lang.SystemUtils
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.json4s._
@@ -45,10 +46,13 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream,
*/
private[spark] object Utils extends Logging {
- val osName = System.getProperty("os.name")
-
val random = new Random()
+ def sparkBin(sparkHome: String, which: String): File = {
+ val suffix = if (SystemUtils.IS_OS_WINDOWS) ".cmd" else ""
+ new File(sparkHome + File.separator + "bin", which + suffix)
+ }
+
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
@@ -605,7 +609,7 @@ private[spark] object Utils extends Logging {
*/
def isSymlink(file: File): Boolean = {
if (file == null) throw new NullPointerException("File must not be null")
- if (osName.startsWith("Windows")) return false
+ if (SystemUtils.IS_OS_WINDOWS) return false
val fileInCanonicalDir = if (file.getParent() == null) {
file
} else {
@@ -1008,10 +1012,18 @@ private[spark] object Utils extends Logging {
if (dst.isAbsolute()) {
throw new IOException("Destination must be relative")
}
- val linkCmd = if (osName.startsWith("Windows")) "copy" else "ln -sf"
+ var cmdSuffix = ""
+ val linkCmd = if (SystemUtils.IS_OS_WINDOWS) {
+ // refer to http://technet.microsoft.com/en-us/library/cc771254.aspx
+ cmdSuffix = " /s /e /k /h /y /i"
+ "cmd /c xcopy "
+ } else {
+ cmdSuffix = ""
+ "ln -sf "
+ }
import scala.sys.process._
- (linkCmd + " " + src.getAbsolutePath() + " " + dst.getPath()) lines_! ProcessLogger(line =>
- (logInfo(line)))
+ (linkCmd + src.getAbsolutePath() + " " + dst.getPath() + cmdSuffix) lines_!
+ ProcessLogger(line => (logInfo(line)))
}
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 8d2e9f1846..76c6f5af82 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -610,8 +610,8 @@ public class JavaAPISuite implements Serializable {
@Test
public void wholeTextFiles() throws IOException {
- byte[] content1 = "spark is easy to use.\n".getBytes();
- byte[] content2 = "spark is also easy to use.\n".getBytes();
+ byte[] content1 = "spark is easy to use.\n".getBytes("utf-8");
+ byte[] content2 = "spark is also easy to use.\n".getBytes("utf-8");
File tempDir = Files.createTempDir();
String tempDirName = tempDir.getAbsolutePath();
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
index 3d3c27ed78..62aef0fb47 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
@@ -56,7 +56,7 @@ object MQTTPublisher {
val msg: String = "hello mqtt demo for spark streaming"
while (true) {
- val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes())
+ val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes("utf-8"))
msgtopic.publish(message)
println("Published data. topic: " + msgtopic.getName() + " Message: " + message)
}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index 78603200d2..dd287d0ef9 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -57,7 +57,7 @@ class FlumeStreamSuite extends TestSuiteBase {
for (i <- 0 until input.size) {
val event = new AvroFlumeEvent
- event.setBody(ByteBuffer.wrap(input(i).toString.getBytes()))
+ event.setBody(ByteBuffer.wrap(input(i).toString.getBytes("utf-8")))
event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
client.append(event)
Thread.sleep(500)
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 0beee8b415..77661f71ad 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -92,7 +92,7 @@ class MQTTReceiver(
// Handles Mqtt message
override def messageArrived(arg0: String, arg1: MqttMessage) {
- store(new String(arg1.getPayload()))
+ store(new String(arg1.getPayload(),"utf-8"))
}
override def deliveryComplete(arg0: IMqttDeliveryToken) {
diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 4155007c6d..e33f4f9803 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer
import com.google.common.io.Files
import org.scalatest.FunSuite
import org.apache.spark.SparkContext
+import org.apache.commons.lang3.StringEscapeUtils
class ReplSuite extends FunSuite {
@@ -185,11 +186,12 @@ class ReplSuite extends FunSuite {
out.close()
val output = runInterpreter("local",
"""
- |var file = sc.textFile("%s/input").cache()
+ |var file = sc.textFile("%s").cache()
|file.count()
|file.count()
|file.count()
- """.stripMargin.format(tempDir.getAbsolutePath))
+ """.stripMargin.format(StringEscapeUtils.escapeJava(
+ tempDir.getAbsolutePath + File.separator + "input")))
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
assertContains("res0: Long = 3", output)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
index 5be76890af..4cd52d8288 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
@@ -200,10 +200,10 @@ private[sql] object SHORT extends NativeColumnType(ShortType, 6, 2) {
}
private[sql] object STRING extends NativeColumnType(StringType, 7, 8) {
- override def actualSize(v: String): Int = v.getBytes.length + 4
+ override def actualSize(v: String): Int = v.getBytes("utf-8").length + 4
override def append(v: String, buffer: ByteBuffer) {
- val stringBytes = v.getBytes()
+ val stringBytes = v.getBytes("utf-8")
buffer.putInt(stringBytes.length).put(stringBytes, 0, stringBytes.length)
}
@@ -211,7 +211,7 @@ private[sql] object STRING extends NativeColumnType(StringType, 7, 8) {
val length = buffer.getInt()
val stringBytes = new Array[Byte](length)
buffer.get(stringBytes, 0, length)
- new String(stringBytes)
+ new String(stringBytes, "utf-8")
}
override def setField(row: MutableRow, ordinal: Int, value: String) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
index 1d3608ed2d..325173cf95 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
@@ -58,7 +58,7 @@ class ColumnTypeSuite extends FunSuite {
checkActualSize(DOUBLE, Double.MaxValue, 8)
checkActualSize(FLOAT, Float.MaxValue, 4)
checkActualSize(BOOLEAN, true, 1)
- checkActualSize(STRING, "hello", 4 + 5)
+ checkActualSize(STRING, "hello", 4 + "hello".getBytes("utf-8").length)
val binary = Array.fill[Byte](4)(0: Byte)
checkActualSize(BINARY, binary, 4 + 4)
@@ -91,14 +91,16 @@ class ColumnTypeSuite extends FunSuite {
testNativeColumnType[StringType.type](
STRING,
(buffer: ByteBuffer, string: String) => {
- val bytes = string.getBytes()
- buffer.putInt(bytes.length).put(string.getBytes)
+
+ val bytes = string.getBytes("utf-8")
+ buffer.putInt(bytes.length)
+ buffer.put(bytes)
},
(buffer: ByteBuffer) => {
val length = buffer.getInt()
val bytes = new Array[Byte](length)
- buffer.get(bytes, 0, length)
- new String(bytes)
+ buffer.get(bytes)
+ new String(bytes, "utf-8")
})
testColumnType[BinaryType.type, Array[Byte]](
@@ -161,9 +163,13 @@ class ColumnTypeSuite extends FunSuite {
buffer.rewind()
seq.foreach { expected =>
+ println("buffer = " + buffer + ", expected = " + expected)
+ val extracted = columnType.extract(buffer)
assert(
- expected === columnType.extract(buffer),
- "Extracted value didn't equal to the original one")
+ expected === extracted,
+ "Extracted value didn't equal to the original one. " +
+ hexDump(expected) + " != " + hexDump(extracted) +
+ ", buffer = " + dumpBuffer(buffer.duplicate().rewind().asInstanceOf[ByteBuffer]))
}
}
@@ -179,4 +185,28 @@ class ColumnTypeSuite extends FunSuite {
}
}
}
+
+ private def hexDump(value: Any): String = {
+ if (value.isInstanceOf[String]) {
+ val sb = new StringBuilder()
+ for (ch <- value.asInstanceOf[String].toCharArray) {
+ sb.append(Integer.toHexString(ch & 0xffff)).append(' ')
+ }
+ if (! sb.isEmpty) sb.setLength(sb.length - 1)
+ sb.toString()
+ } else {
+ // for now ..
+ hexDump(value.toString)
+ }
+ }
+
+ private def dumpBuffer(buff: ByteBuffer): Any = {
+ val sb = new StringBuilder()
+ while (buff.hasRemaining) {
+ val b = buff.get()
+ sb.append(Integer.toHexString(b & 0xff)).append(' ')
+ }
+ if (! sb.isEmpty) sb.setLength(sb.length - 1)
+ sb.toString()
+ }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ScriptTransformation.scala
index 610fa9cb84..8258ee5fef 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ScriptTransformation.scala
@@ -71,7 +71,7 @@ case class ScriptTransformation(
iter
.map(outputProjection)
// TODO: Use SerDe
- .map(_.mkString("", "\t", "\n").getBytes).foreach(outputStream.write)
+ .map(_.mkString("", "\t", "\n").getBytes("utf-8")).foreach(outputStream.write)
outputStream.close()
readerThread.join()
outputLines.toIterator
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index 74110ee27b..3ad66a3d7f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -100,14 +100,15 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) {
hiveFilesTemp.delete()
hiveFilesTemp.mkdir()
- val inRepoTests = if (System.getProperty("user.dir").endsWith("sql/hive")) {
- new File("src/test/resources/")
+ val inRepoTests = if (System.getProperty("user.dir").endsWith("sql" + File.separator + "hive")) {
+ new File("src" + File.separator + "test" + File.separator + "resources" + File.separator)
} else {
- new File("sql/hive/src/test/resources")
+ new File("sql" + File.separator + "hive" + File.separator + "src" + File.separator + "test" +
+ File.separator + "resources")
}
def getHiveFile(path: String): File = {
- val stripped = path.replaceAll("""\.\.\/""", "")
+ val stripped = path.replaceAll("""\.\.\/""", "").replace('/', File.separatorChar)
hiveDevHome
.map(new File(_, stripped))
.filter(_.exists)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/BigDataBenchmarkSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/BigDataBenchmarkSuite.scala
index 9b9a823b6e..42a82c1fbf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/BigDataBenchmarkSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/BigDataBenchmarkSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.hive.test.TestHive._
* https://amplab.cs.berkeley.edu/benchmark/
*/
class BigDataBenchmarkSuite extends HiveComparisonTest {
- val testDataDirectory = new File("target/big-data-benchmark-testdata")
+ val testDataDirectory = new File("target" + File.separator + "big-data-benchmark-testdata")
val testTables = Seq(
TestTable(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index ea17e6e93b..edff38b901 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -78,7 +78,8 @@ abstract class HiveComparisonTest
.map(name => new File(targetDir, s"$suiteName.$name"))
/** The local directory with cached golden answer will be stored. */
- protected val answerCache = new File("src/test/resources/golden")
+ protected val answerCache = new File("src" + File.separator + "test" +
+ File.separator + "resources" + File.separator + "golden")
if (!answerCache.exists) {
answerCache.mkdir()
}
@@ -120,7 +121,7 @@ abstract class HiveComparisonTest
protected val cacheDigest = java.security.MessageDigest.getInstance("MD5")
protected def getMd5(str: String): String = {
val digest = java.security.MessageDigest.getInstance("MD5")
- digest.update(str.getBytes)
+ digest.update(str.getBytes("utf-8"))
new java.math.BigInteger(1, digest.digest).toString(16)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index dfe88b960b..0bb76f31c3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.hive.execution
+import java.io.File
+
import org.scalatest.BeforeAndAfter
import org.apache.spark.sql.hive.test.TestHive
@@ -26,7 +28,9 @@ import org.apache.spark.sql.hive.test.TestHive
*/
class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
// TODO: bundle in jar files... get from classpath
- lazy val hiveQueryDir = TestHive.getHiveFile("ql/src/test/queries/clientpositive")
+ lazy val hiveQueryDir = TestHive.getHiveFile("ql" + File.separator + "src" +
+ File.separator + "test" + File.separator + "queries" + File.separator + "clientpositive")
+
def testCases = hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f)
override def beforeAll() {
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 832d45b3ad..718cb19f57 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -64,7 +64,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
override def addSecretKeyToUserCredentials(key: String, secret: String) {
val creds = new Credentials()
- creds.addSecretKey(new Text(key), secret.getBytes())
+ creds.addSecretKey(new Text(key), secret.getBytes("utf-8"))
addCurrentUserCredentials(creds)
}