diff options
author | Sean Owen <sowen@cloudera.com> | 2016-03-13 21:03:49 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-03-13 21:03:49 -0700 |
commit | 184085284185011d7cc6d054b54d2d38eaf1dd77 (patch) | |
tree | 7b068f5bcf02ea959ab3a49c49fbc1cdae979a26 /sql/core | |
parent | 473263f9598d1cf880f421aae1b51eb0b6e3cf79 (diff) | |
download | spark-184085284185011d7cc6d054b54d2d38eaf1dd77.tar.gz spark-184085284185011d7cc6d054b54d2d38eaf1dd77.tar.bz2 spark-184085284185011d7cc6d054b54d2d38eaf1dd77.zip |
[SPARK-13823][CORE][STREAMING][SQL] Always specify Charset in String <-> byte[] conversions (and remaining Coverity items)
## What changes were proposed in this pull request?
- Fixes calls to `new String(byte[])` or `String.getBytes()` that rely on platform default encoding, to use UTF-8
- Same for `InputStreamReader` and `OutputStreamWriter` constructors
- Standardizes on UTF-8 everywhere
- Standardizes specifying the encoding with `StandardCharsets.UTF-8`, not the Guava constant or "UTF-8" (which means handling `UnuspportedEncodingException`)
- (also addresses the other remaining Coverity scan issues, which are pretty trivial; these are separated into commit https://github.com/srowen/spark/commit/1deecd8d9ca986d8adb1a42d315890ce5349d29c )
## How was this patch tested?
Jenkins tests
Author: Sean Owen <sowen@cloudera.com>
Closes #11657 from srowen/SPARK-13823.
Diffstat (limited to 'sql/core')
15 files changed, 53 insertions, 34 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java index 68f146f7a2..b084eda6f8 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.vectorized; import java.math.BigDecimal; import java.math.BigInteger; +import java.nio.charset.StandardCharsets; import java.sql.Date; import java.util.Iterator; import java.util.List; @@ -138,7 +139,7 @@ public class ColumnVectorUtils { } else if (t == DataTypes.DoubleType) { dst.appendDouble(((Double)o).doubleValue()); } else if (t == DataTypes.StringType) { - byte[] b =((String)o).getBytes(); + byte[] b =((String)o).getBytes(StandardCharsets.UTF_8); dst.appendByteArray(b, 0, b.length); } else if (t instanceof DecimalType) { DecimalType dt = (DecimalType)t; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java index 9a8aedfa56..09c001baae 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.vectorized; import java.util.Arrays; import java.util.Iterator; +import java.util.NoSuchElementException; import org.apache.commons.lang.NotImplementedException; @@ -254,6 +255,9 @@ public final class ColumnarBatch { while (rowId < maxRows && ColumnarBatch.this.filteredRows[rowId]) { ++rowId; } + if (rowId >= maxRows) { + throw new NoSuchElementException(); + } row.rowId = rowId++; return row; } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 38aa2dd80a..6a0290c112 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.csv -import java.nio.charset.Charset +import java.nio.charset.StandardCharsets import org.apache.spark.Logging import org.apache.spark.sql.execution.datasources.CompressionCodecs @@ -64,7 +64,7 @@ private[sql] class CSVOptions( parameters.getOrElse("sep", parameters.getOrElse("delimiter", ","))) val parseMode = parameters.getOrElse("mode", "PERMISSIVE") val charset = parameters.getOrElse("encoding", - parameters.getOrElse("charset", Charset.forName("UTF-8").name())) + parameters.getOrElse("charset", StandardCharsets.UTF_8.name())) val quote = getChar("quote", '\"') val escape = getChar("escape", '\\') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala index 8f1421844c..8c3f63d307 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.csv import java.io.{ByteArrayOutputStream, OutputStreamWriter, StringReader} +import java.nio.charset.StandardCharsets import com.univocity.parsers.csv.{CsvParser, CsvParserSettings, CsvWriter, CsvWriterSettings} @@ -76,7 +77,7 @@ private[sql] class LineCsvWriter(params: CSVOptions, headers: Seq[String]) exten def writeRow(row: Seq[String], includeHeader: Boolean): String = { val buffer = new ByteArrayOutputStream() - val outputWriter = new OutputStreamWriter(buffer) + val outputWriter = new OutputStreamWriter(buffer, StandardCharsets.UTF_8) val writer = new CsvWriter(outputWriter, writerSettings) if (includeHeader) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala index aff672281d..42c07c8a23 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.csv -import java.nio.charset.Charset +import java.nio.charset.{Charset, StandardCharsets} import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.io.{LongWritable, Text} @@ -161,7 +161,7 @@ class DefaultSource extends FileFormat with DataSourceRegister { sqlContext: SQLContext, options: CSVOptions, location: String): RDD[String] = { - if (Charset.forName(options.charset) == Charset.forName("UTF-8")) { + if (Charset.forName(options.charset) == StandardCharsets.UTF_8) { sqlContext.sparkContext.textFile(location) } else { val charset = options.charset diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 14ba9f69bb..cce4b74ff2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -18,11 +18,11 @@ package org.apache.spark.sql.execution.streaming import java.io._ +import java.nio.charset.StandardCharsets import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.io.Codec -import com.google.common.base.Charsets.UTF_8 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.spark.Logging @@ -184,7 +184,7 @@ class FileStreamSource( private def writeBatch(id: Int, files: Seq[String]): Unit = { assert(files.nonEmpty, "create a new batch without any file") val output = fs.create(new Path(metadataPath + "/" + id), true) - val writer = new PrintWriter(new OutputStreamWriter(output, UTF_8)) + val writer = new PrintWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8)) try { // scalastyle:off println writer.println(FileStreamSource.VERSION) diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java index 0f9e453d26..9e65158eb0 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java @@ -40,7 +40,6 @@ public class JavaSaveLoadSuite { private transient JavaSparkContext sc; private transient SQLContext sqlContext; - String originalDefaultSource; File path; Dataset<Row> df; @@ -57,7 +56,6 @@ public class JavaSaveLoadSuite { sqlContext = new SQLContext(_sc); sc = new JavaSparkContext(_sc); - originalDefaultSource = sqlContext.conf().defaultDataSourceName(); path = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "datasource").getCanonicalFile(); if (path.exists()) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala index aff9efe4b2..2aa6f8d4ac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import java.nio.charset.StandardCharsets + import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -167,12 +169,12 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { } test("misc sha1 function") { - val df = Seq(("ABC", "ABC".getBytes)).toDF("a", "b") + val df = Seq(("ABC", "ABC".getBytes(StandardCharsets.UTF_8))).toDF("a", "b") checkAnswer( df.select(sha1($"a"), sha1($"b")), Row("3c01bdbb26f358bab27f267924aa2c9a03fcfdb8", "3c01bdbb26f358bab27f267924aa2c9a03fcfdb8")) - val dfEmpty = Seq(("", "".getBytes)).toDF("a", "b") + val dfEmpty = Seq(("", "".getBytes(StandardCharsets.UTF_8))).toDF("a", "b") checkAnswer( dfEmpty.selectExpr("sha1(a)", "sha1(b)"), Row("da39a3ee5e6b4b0d3255bfef95601890afd80709", "da39a3ee5e6b4b0d3255bfef95601890afd80709")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index e6e27ec413..2333fa27ca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql import java.io.File +import java.nio.charset.StandardCharsets import scala.language.postfixOps import scala.util.Random @@ -665,8 +666,8 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { test("showString: binary") { val df = Seq( - ("12".getBytes, "ABC.".getBytes), - ("34".getBytes, "12346".getBytes) + ("12".getBytes(StandardCharsets.UTF_8), "ABC.".getBytes(StandardCharsets.UTF_8)), + ("34".getBytes(StandardCharsets.UTF_8), "12346".getBytes(StandardCharsets.UTF_8)) ).toDF() val expectedAnswer = """+-------+----------------+ || _1| _2| diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MathExpressionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MathExpressionsSuite.scala index 013a90875e..f5a67fd782 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/MathExpressionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/MathExpressionsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import java.nio.charset.StandardCharsets + import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions.{log => logarithm} import org.apache.spark.sql.test.SharedSQLContext @@ -262,9 +264,9 @@ class MathExpressionsSuite extends QueryTest with SharedSQLContext { test("unhex") { val data = Seq(("1C", "737472696E67")).toDF("a", "b") checkAnswer(data.select(unhex('a)), Row(Array[Byte](28.toByte))) - checkAnswer(data.select(unhex('b)), Row("string".getBytes)) + checkAnswer(data.select(unhex('b)), Row("string".getBytes(StandardCharsets.UTF_8))) checkAnswer(data.selectExpr("unhex(a)"), Row(Array[Byte](28.toByte))) - checkAnswer(data.selectExpr("unhex(b)"), Row("string".getBytes)) + checkAnswer(data.selectExpr("unhex(b)"), Row("string".getBytes(StandardCharsets.UTF_8))) checkAnswer(data.selectExpr("""unhex("##")"""), Row(null)) checkAnswer(data.selectExpr("""unhex("G123")"""), Row(null)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala index 0000a5d1ef..1aadd700d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.columnar.compression import java.nio.{ByteBuffer, ByteOrder} +import java.nio.charset.StandardCharsets import org.apache.commons.lang3.RandomStringUtils import org.apache.commons.math3.distribution.LogNormalDistribution @@ -313,7 +314,7 @@ object CompressionSchemeBenchmark extends AllCompressionSchemes { } for (i <- 0 until count) { testData.putInt(strLen) - testData.put(g().getBytes) + testData.put(g().getBytes(StandardCharsets.UTF_8)) } testData.rewind() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala index 97638a66ab..67b3d98c1d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets import scala.util.Random @@ -357,7 +358,8 @@ object ColumnarBatchBenchmark { val maxString = 32 val count = 4 * 1000 - val data = Seq.fill(count)(randomString(minString, maxString)).map(_.getBytes).toArray + val data = Seq.fill(count)(randomString(minString, maxString)) + .map(_.getBytes(StandardCharsets.UTF_8)).toArray def column(memoryMode: MemoryMode) = { i: Int => val column = ColumnVector.allocate(count, BinaryType, memoryMode) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index b3c3e66fbc..ed97f59ea1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.vectorized +import java.nio.charset.StandardCharsets + import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Random @@ -329,18 +331,21 @@ class ColumnarBatchSuite extends SparkFunSuite { var idx = 0 val values = ("Hello" :: "abc" :: Nil).toArray - column.putByteArray(idx, values(0).getBytes, 0, values(0).getBytes().length) + column.putByteArray(idx, values(0).getBytes(StandardCharsets.UTF_8), + 0, values(0).getBytes(StandardCharsets.UTF_8).length) reference += values(0) idx += 1 assert(column.arrayData().elementsAppended == 5) - column.putByteArray(idx, values(1).getBytes, 0, values(1).getBytes().length) + column.putByteArray(idx, values(1).getBytes(StandardCharsets.UTF_8), + 0, values(1).getBytes(StandardCharsets.UTF_8).length) reference += values(1) idx += 1 assert(column.arrayData().elementsAppended == 8) // Just put llo - val offset = column.putByteArray(idx, values(0).getBytes, 2, values(0).getBytes().length - 2) + val offset = column.putByteArray(idx, values(0).getBytes(StandardCharsets.UTF_8), + 2, values(0).getBytes(StandardCharsets.UTF_8).length - 2) reference += "llo" idx += 1 assert(column.arrayData().elementsAppended == 11) @@ -353,7 +358,7 @@ class ColumnarBatchSuite extends SparkFunSuite { // Put a long string val s = "abcdefghijklmnopqrstuvwxyz" - column.putByteArray(idx, (s + s).getBytes) + column.putByteArray(idx, (s + s).getBytes(StandardCharsets.UTF_8)) reference += (s + s) idx += 1 assert(column.arrayData().elementsAppended == 11 + (s + s).length) @@ -473,7 +478,7 @@ class ColumnarBatchSuite extends SparkFunSuite { batch.column(0).putInt(0, 1) batch.column(1).putDouble(0, 1.1) batch.column(2).putNull(0) - batch.column(3).putByteArray(0, "Hello".getBytes) + batch.column(3).putByteArray(0, "Hello".getBytes(StandardCharsets.UTF_8)) batch.setNumRows(1) // Verify the results of the row. @@ -519,17 +524,17 @@ class ColumnarBatchSuite extends SparkFunSuite { batch.column(0).putNull(0) batch.column(1).putDouble(0, 2.2) batch.column(2).putInt(0, 2) - batch.column(3).putByteArray(0, "abc".getBytes) + batch.column(3).putByteArray(0, "abc".getBytes(StandardCharsets.UTF_8)) batch.column(0).putInt(1, 3) batch.column(1).putNull(1) batch.column(2).putInt(1, 3) - batch.column(3).putByteArray(1, "".getBytes) + batch.column(3).putByteArray(1, "".getBytes(StandardCharsets.UTF_8)) batch.column(0).putInt(2, 4) batch.column(1).putDouble(2, 4.4) batch.column(2).putInt(2, 4) - batch.column(3).putByteArray(2, "world".getBytes) + batch.column(3).putByteArray(2, "world".getBytes(StandardCharsets.UTF_8)) batch.setNumRows(3) def rowEquals(x: InternalRow, y: Row): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index e9d77abb8c..e6889bcc78 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -18,8 +18,7 @@ package org.apache.spark.sql.streaming import java.io.{ByteArrayInputStream, File, FileNotFoundException, InputStream} - -import com.google.common.base.Charsets.UTF_8 +import java.nio.charset.StandardCharsets import org.apache.spark.sql.{AnalysisException, StreamTest} import org.apache.spark.sql.catalyst.util._ @@ -392,7 +391,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } test("readBatch") { - def stringToStream(str: String): InputStream = new ByteArrayInputStream(str.getBytes(UTF_8)) + def stringToStream(str: String): InputStream = + new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8)) // Invalid metadata assert(readBatch(stringToStream("")) === Nil) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala index 83c63e04f3..7fa6760b71 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.test +import java.nio.charset.StandardCharsets + import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SQLContext, SQLImplicits} @@ -103,11 +105,11 @@ private[sql] trait SQLTestData { self => protected lazy val binaryData: DataFrame = { val df = sqlContext.sparkContext.parallelize( - BinaryData("12".getBytes, 1) :: - BinaryData("22".getBytes, 5) :: - BinaryData("122".getBytes, 3) :: - BinaryData("121".getBytes, 2) :: - BinaryData("123".getBytes, 4) :: Nil).toDF() + BinaryData("12".getBytes(StandardCharsets.UTF_8), 1) :: + BinaryData("22".getBytes(StandardCharsets.UTF_8), 5) :: + BinaryData("122".getBytes(StandardCharsets.UTF_8), 3) :: + BinaryData("121".getBytes(StandardCharsets.UTF_8), 2) :: + BinaryData("123".getBytes(StandardCharsets.UTF_8), 4) :: Nil).toDF() df.registerTempTable("binaryData") df } |