aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-06-11 16:07:15 -0700
committerReynold Xin <rxin@databricks.com>2015-06-11 16:07:15 -0700
commit7d669a56ffc7a4f5827830ef3c27d45cc0e8774f (patch)
tree26345664c6976534f8ba212e60c4b65855053f3e
parent9cbdf31ec1399d4d43a1863c15688ce78b6dfd92 (diff)
downloadspark-7d669a56ffc7a4f5827830ef3c27d45cc0e8774f.tar.gz
spark-7d669a56ffc7a4f5827830ef3c27d45cc0e8774f.tar.bz2
spark-7d669a56ffc7a4f5827830ef3c27d45cc0e8774f.zip
[SPARK-8286] Rewrite UTF8String in Java and move it into unsafe package.
Unit test is still in Scala. Author: Reynold Xin <rxin@databricks.com> Closes #6738 from rxin/utf8string-java and squashes the following commits: 562dc6e [Reynold Xin] Flag... 98e600b [Reynold Xin] Another try with encoding setting .. cfa6bdf [Reynold Xin] Merge branch 'master' into utf8string-java a3b124d [Reynold Xin] Try different UTF-8 encoded characters. 1ff7c82 [Reynold Xin] Enable UTF-8 encoding. 82d58cc [Reynold Xin] Reset run-tests. 2cb3c69 [Reynold Xin] Use utf-8 encoding in set bytes. 53f8ef4 [Reynold Xin] Hack Jenkins to run one test. 9a48e8d [Reynold Xin] Fixed runtime compilation error. 911c450 [Reynold Xin] Moved unit test also to Java. 4eff7bd [Reynold Xin] Improved unit test coverage. 8e89a3c [Reynold Xin] Fixed tests. 77c64bd [Reynold Xin] Fixed string type codegen. ffedb62 [Reynold Xin] Code review feedback. 0967ce6 [Reynold Xin] Fixed import ordering. 45a123d [Reynold Xin] [SPARK-8286] Rewrite UTF8String in Java and move it into unsafe package.
-rw-r--r--project/SparkBuild.scala4
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala1
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala9
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala1
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala7
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/StringType.scala1
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/UTF8String.scala221
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala1
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala10
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/types/UTF8StringSuite.scala70
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala7
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala13
-rw-r--r--unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java212
-rw-r--r--unsafe/src/test/java/org/apache/spark/unsafe/bitset/BitSetSuite.java1
-rw-r--r--unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java93
34 files changed, 390 insertions, 335 deletions
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index aa75a64b63..41b7eba3a0 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -149,7 +149,9 @@ object SparkBuild extends PomBuild {
javacOptions in (Compile, doc) ++= {
val Array(major, minor, _) = System.getProperty("java.version").split("\\.", 3)
if (major.toInt >= 1 && minor.toInt >= 8) Seq("-Xdoclint:all", "-Xdoclint:-missing") else Seq.empty
- }
+ },
+
+ javacOptions in Compile ++= Seq("-encoding", "UTF-8")
)
def enable(settings: Seq[Setting[_]])(projectRef: ProjectRef) = {
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index ec97fe603c..143acc9f5e 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -30,7 +30,7 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.BaseMutableRow;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.types.UTF8String;
+import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.PlatformDependent;
import org.apache.spark.unsafe.bitset.BitSetMethods;
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index beb82dbc08..7e4b11a495 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -28,6 +28,7 @@ import scala.collection.mutable.HashMap
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
/**
* Functions to convert Scala types to Catalyst types and vice versa.
@@ -257,7 +258,7 @@ object CatalystTypeConverters {
private object StringConverter extends CatalystTypeConverter[Any, String, Any] {
override def toCatalystImpl(scalaValue: Any): UTF8String = scalaValue match {
- case str: String => UTF8String(str)
+ case str: String => UTF8String.fromString(str)
case utf8: UTF8String => utf8
}
override def toScala(catalystValue: Any): String = catalystValue match {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 6998cc8d96..90698cd572 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst
+import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index 037efd7558..4c7123fcb7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -24,6 +24,7 @@ import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
/** Cast the child expression to the target data type. */
case class Cast(child: Expression, dataType: DataType) extends UnaryExpression with Logging {
@@ -111,11 +112,11 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
// UDFToString
private[this] def castToString(from: DataType): Any => Any = from match {
- case BinaryType => buildCast[Array[Byte]](_, UTF8String(_))
- case DateType => buildCast[Int](_, d => UTF8String(DateUtils.toString(d)))
+ case BinaryType => buildCast[Array[Byte]](_, UTF8String.fromBytes)
+ case DateType => buildCast[Int](_, d => UTF8String.fromString(DateUtils.toString(d)))
case TimestampType => buildCast[Long](_,
- t => UTF8String(timestampToString(DateUtils.toJavaTimestamp(t))))
- case _ => buildCast[Any](_, o => UTF8String(o.toString))
+ t => UTF8String.fromString(timestampToString(DateUtils.toJavaTimestamp(t))))
+ case _ => buildCast[Any](_, o => UTF8String.fromString(o.toString))
}
// BinaryConverter
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
index 2c884517d6..98eda61a80 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
/**
* A parent class for mutable container objects that are reused when the values are changed,
@@ -240,7 +241,8 @@ final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableR
}
}
- override def setString(ordinal: Int, value: String): Unit = update(ordinal, UTF8String(value))
+ override def setString(ordinal: Int, value: String): Unit =
+ update(ordinal, UTF8String.fromString(value))
override def getString(ordinal: Int): String = apply(ordinal).toString
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala
index 5b2c857278..5350123bf4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.PlatformDependent
import org.apache.spark.unsafe.array.ByteArrayMethods
+import org.apache.spark.unsafe.types.UTF8String
/**
* Converts Rows into UnsafeRow format. This class is NOT thread-safe.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index ecf8e0d1a7..536e477330 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -26,6 +26,8 @@ import org.codehaus.janino.ClassBodyEvaluator
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
// These classes are here to avoid issues with serialization and integration with quasiquotes.
class IntegerHashSet extends org.apache.spark.util.collection.OpenHashSet[Int]
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index ef50c50e13..a33007bda1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
object Literal {
def apply(v: Any): Literal = v match {
@@ -32,7 +33,7 @@ object Literal {
case f: Float => Literal(f, FloatType)
case b: Byte => Literal(b, ByteType)
case s: Short => Literal(s, ShortType)
- case s: String => Literal(UTF8String(s), StringType)
+ case s: String => Literal(UTF8String.fromString(s), StringType)
case b: Boolean => Literal(b, BooleanType)
case d: BigDecimal => Literal(Decimal(d), DecimalType.Unlimited)
case d: java.math.BigDecimal => Literal(Decimal(d), DecimalType.Unlimited)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
index 5fd892c42e..5d2d82077f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
@@ -17,7 +17,8 @@
package org.apache.spark.sql.catalyst.expressions
-import org.apache.spark.sql.types.{UTF8String, DataType, StructType, AtomicType}
+import org.apache.spark.sql.types.{DataType, StructType, AtomicType}
+import org.apache.spark.unsafe.types.UTF8String
/**
* An extended interface to [[Row]] that allows the values for each column to be updated. Setting
@@ -197,7 +198,9 @@ class GenericMutableRow(v: Array[Any]) extends GenericRow(v) with MutableRow {
override def setFloat(ordinal: Int, value: Float): Unit = { values(ordinal) = value }
override def setInt(ordinal: Int, value: Int): Unit = { values(ordinal) = value }
override def setLong(ordinal: Int, value: Long): Unit = { values(ordinal) = value }
- override def setString(ordinal: Int, value: String) { values(ordinal) = UTF8String(value)}
+ override def setString(ordinal: Int, value: String) {
+ values(ordinal) = UTF8String.fromString(value)
+ }
override def setNullAt(i: Int): Unit = { values(i) = null }
override def setShort(ordinal: Int, value: Short): Unit = { values(ordinal) = value }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
index 345038323d..4f4c19526e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
@@ -22,6 +22,7 @@ import java.util.regex.Pattern
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
trait StringRegexExpression extends ExpectsInputTypes {
self: BinaryExpression =>
@@ -277,7 +278,7 @@ case class Substring(str: Expression, pos: Expression, len: Expression)
ba.slice(st, end)
case s: UTF8String =>
val (st, end) = slicePos(start, length, () => s.length())
- s.slice(st, end)
+ s.substring(st, end)
}
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StringType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StringType.scala
index 134ab0af4e..1e9476ad06 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StringType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StringType.scala
@@ -22,6 +22,7 @@ import scala.reflect.runtime.universe.typeTag
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.ScalaReflectionLock
+import org.apache.spark.unsafe.types.UTF8String
/**
* :: DeveloperApi ::
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UTF8String.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UTF8String.scala
deleted file mode 100644
index f5d8fcced3..0000000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UTF8String.scala
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.types
-
-import java.util.Arrays
-
-import org.apache.spark.annotation.DeveloperApi
-
-/**
- * :: DeveloperApi ::
- * A UTF-8 String, as internal representation of StringType in SparkSQL
- *
- * A String encoded in UTF-8 as an Array[Byte], which can be used for comparison,
- * search, see http://en.wikipedia.org/wiki/UTF-8 for details.
- *
- * Note: This is not designed for general use cases, should not be used outside SQL.
- */
-@DeveloperApi
-final class UTF8String extends Ordered[UTF8String] with Serializable {
-
- private[this] var bytes: Array[Byte] = _
-
- /**
- * Update the UTF8String with String.
- */
- def set(str: String): UTF8String = {
- bytes = str.getBytes("utf-8")
- this
- }
-
- /**
- * Update the UTF8String with Array[Byte], which should be encoded in UTF-8
- */
- def set(bytes: Array[Byte]): UTF8String = {
- this.bytes = bytes
- this
- }
-
- /**
- * Return the number of bytes for a code point with the first byte as `b`
- * @param b The first byte of a code point
- */
- @inline
- private[this] def numOfBytes(b: Byte): Int = {
- val offset = (b & 0xFF) - 192
- if (offset >= 0) UTF8String.bytesOfCodePointInUTF8(offset) else 1
- }
-
- /**
- * Return the number of code points in it.
- *
- * This is only used by Substring() when `start` is negative.
- */
- def length(): Int = {
- var len = 0
- var i: Int = 0
- while (i < bytes.length) {
- i += numOfBytes(bytes(i))
- len += 1
- }
- len
- }
-
- def getBytes: Array[Byte] = {
- bytes
- }
-
- /**
- * Return a substring of this,
- * @param start the position of first code point
- * @param until the position after last code point
- */
- def slice(start: Int, until: Int): UTF8String = {
- if (until <= start || start >= bytes.length || bytes == null) {
- new UTF8String
- }
-
- var c = 0
- var i: Int = 0
- while (c < start && i < bytes.length) {
- i += numOfBytes(bytes(i))
- c += 1
- }
- var j = i
- while (c < until && j < bytes.length) {
- j += numOfBytes(bytes(j))
- c += 1
- }
- UTF8String(Arrays.copyOfRange(bytes, i, j))
- }
-
- def contains(sub: UTF8String): Boolean = {
- val b = sub.getBytes
- if (b.length == 0) {
- return true
- }
- var i: Int = 0
- while (i <= bytes.length - b.length) {
- // In worst case, it's O(N*K), but should works fine with SQL
- if (bytes(i) == b(0) && Arrays.equals(Arrays.copyOfRange(bytes, i, i + b.length), b)) {
- return true
- }
- i += 1
- }
- false
- }
-
- def startsWith(prefix: UTF8String): Boolean = {
- val b = prefix.getBytes
- if (b.length > bytes.length) {
- return false
- }
- Arrays.equals(Arrays.copyOfRange(bytes, 0, b.length), b)
- }
-
- def endsWith(suffix: UTF8String): Boolean = {
- val b = suffix.getBytes
- if (b.length > bytes.length) {
- return false
- }
- Arrays.equals(Arrays.copyOfRange(bytes, bytes.length - b.length, bytes.length), b)
- }
-
- def toUpperCase(): UTF8String = {
- // upper case depends on locale, fallback to String.
- UTF8String(toString().toUpperCase)
- }
-
- def toLowerCase(): UTF8String = {
- // lower case depends on locale, fallback to String.
- UTF8String(toString().toLowerCase)
- }
-
- override def toString(): String = {
- new String(bytes, "utf-8")
- }
-
- override def clone(): UTF8String = new UTF8String().set(this.bytes)
-
- override def compare(other: UTF8String): Int = {
- var i: Int = 0
- val b = other.getBytes
- while (i < bytes.length && i < b.length) {
- val res = bytes(i).compareTo(b(i))
- if (res != 0) return res
- i += 1
- }
- bytes.length - b.length
- }
-
- override def compareTo(other: UTF8String): Int = {
- compare(other)
- }
-
- override def equals(other: Any): Boolean = other match {
- case s: UTF8String =>
- Arrays.equals(bytes, s.getBytes)
- case s: String =>
- // This is only used for Catalyst unit tests
- // fail fast
- bytes.length >= s.length && length() == s.length && toString() == s
- case _ =>
- false
- }
-
- override def hashCode(): Int = {
- Arrays.hashCode(bytes)
- }
-}
-
-/**
- * :: DeveloperApi ::
- */
-@DeveloperApi
-object UTF8String {
- // number of tailing bytes in a UTF8 sequence for a code point
- // see http://en.wikipedia.org/wiki/UTF-8, 192-256 of Byte 1
- private[types] val bytesOfCodePointInUTF8: Array[Int] = Array(2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
- 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
- 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
- 4, 4, 4, 4, 4, 4, 4, 4,
- 5, 5, 5, 5,
- 6, 6, 6, 6)
-
- /**
- * Create a UTF-8 String from String
- */
- def apply(s: String): UTF8String = {
- if (s != null) {
- new UTF8String().set(s)
- } else {
- null
- }
- }
-
- /**
- * Create a UTF-8 String from Array[Byte], which should be encoded in UTF-8
- */
- def apply(bytes: Array[Byte]): UTF8String = {
- if (bytes != null) {
- new UTF8String().set(bytes)
- } else {
- null
- }
- }
-}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala
index f151dd2a47..bcc594cb7c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala
@@ -21,6 +21,7 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.analysis.UnresolvedExtractValue
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
class ComplexTypeSuite extends SparkFunSuite with ExpressionEvalHelper {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala
index 88a36aa121..72bbc4efeb 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala
@@ -20,11 +20,13 @@ package org.apache.spark.sql.catalyst.expressions
import scala.collection.JavaConverters._
import scala.util.Random
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, TaskMemoryManager, MemoryAllocator}
import org.scalatest.{BeforeAndAfterEach, Matchers}
+import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, TaskMemoryManager, MemoryAllocator}
+import org.apache.spark.unsafe.types.UTF8String
+
class UnsafeFixedWidthAggregationMapSuite
extends SparkFunSuite
@@ -82,7 +84,7 @@ class UnsafeFixedWidthAggregationMapSuite
1024, // initial capacity
false // disable perf metrics
)
- val groupKey = new GenericRow(Array[Any](UTF8String("cats")))
+ val groupKey = new GenericRow(Array[Any](UTF8String.fromString("cats")))
// Looking up a key stores a zero-entry in the map (like Python Counters or DefaultDicts)
map.getAggregationBuffer(groupKey)
@@ -111,7 +113,7 @@ class UnsafeFixedWidthAggregationMapSuite
val rand = new Random(42)
val groupKeys: Set[String] = Seq.fill(512)(rand.nextString(1024)).toSet
groupKeys.foreach { keyString =>
- map.getAggregationBuffer(new GenericRow(Array[Any](UTF8String(keyString))))
+ map.getAggregationBuffer(new GenericRow(Array[Any](UTF8String.fromString(keyString))))
}
val seenKeys: Set[String] = map.iterator().asScala.map { entry =>
entry.key.getString(0)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/UTF8StringSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/UTF8StringSuite.scala
deleted file mode 100644
index 81d7ab010f..0000000000
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/UTF8StringSuite.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.types
-
-import org.apache.spark.SparkFunSuite
-
-// scalastyle:off
-class UTF8StringSuite extends SparkFunSuite {
- test("basic") {
- def check(str: String, len: Int) {
-
- assert(UTF8String(str).length == len)
- assert(UTF8String(str.getBytes("utf8")).length() == len)
-
- assert(UTF8String(str) == str)
- assert(UTF8String(str.getBytes("utf8")) == str)
- assert(UTF8String(str).toString == str)
- assert(UTF8String(str.getBytes("utf8")).toString == str)
- assert(UTF8String(str.getBytes("utf8")) == UTF8String(str))
-
- assert(UTF8String(str).hashCode() == UTF8String(str.getBytes("utf8")).hashCode())
- }
-
- check("hello", 5)
- check("世 界", 3)
- }
-
- test("contains") {
- assert(UTF8String("hello").contains(UTF8String("ello")))
- assert(!UTF8String("hello").contains(UTF8String("vello")))
- assert(UTF8String("大千世界").contains(UTF8String("千世")))
- assert(!UTF8String("大千世界").contains(UTF8String("世千")))
- }
-
- test("prefix") {
- assert(UTF8String("hello").startsWith(UTF8String("hell")))
- assert(!UTF8String("hello").startsWith(UTF8String("ell")))
- assert(UTF8String("大千世界").startsWith(UTF8String("大千")))
- assert(!UTF8String("大千世界").startsWith(UTF8String("千")))
- }
-
- test("suffix") {
- assert(UTF8String("hello").endsWith(UTF8String("ello")))
- assert(!UTF8String("hello").endsWith(UTF8String("ellov")))
- assert(UTF8String("大千世界").endsWith(UTF8String("世界")))
- assert(!UTF8String("大千世界").endsWith(UTF8String("世")))
- }
-
- test("slice") {
- assert(UTF8String("hello").slice(1, 3) == UTF8String("el"))
- assert(UTF8String("大千世界").slice(0, 1) == UTF8String("大"))
- assert(UTF8String("大千世界").slice(1, 3) == UTF8String("千世"))
- assert(UTF8String("大千世界").slice(3, 5) == UTF8String("界"))
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
index 83881a3687..11c79c865f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.columnar
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
private[sql] class ColumnStatisticsSchema(a: Attribute) extends Serializable {
val upperBound = AttributeReference(a.name + ".upperBound", a.dataType, nullable = true)()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
index c9c4d630fb..8e21020917 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.MutableRow
import org.apache.spark.sql.execution.SparkSqlSerializer
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
/**
* An abstract class that represents type of a column. Used to append/extract Java objects into/from
@@ -320,7 +321,7 @@ private[sql] object STRING extends NativeColumnType(StringType, 7, 8) {
val length = buffer.getInt()
val stringBytes = new Array[Byte](length)
buffer.get(stringBytes, 0, length)
- UTF8String(stringBytes)
+ UTF8String.fromBytes(stringBytes)
}
override def setField(row: MutableRow, ordinal: Int, value: UTF8String): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
index 60f3b2d539..202e4488a6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
@@ -28,6 +28,7 @@ import org.apache.spark.serializer._
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, MutableRow, SpecificMutableRow}
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
/**
* The serialization stream for [[SparkSqlSerializer2]]. It assumes that the object passed in
@@ -434,7 +435,7 @@ private[sql] object SparkSqlSerializer2 {
val length = in.readInt()
val bytes = new Array[Byte](length)
in.readFully(bytes)
- mutableRow.update(i, UTF8String(bytes))
+ mutableRow.update(i, UTF8String.fromBytes(bytes))
}
case BinaryType =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index 720b529d59..83c1f65d5c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.unsafe.types.UTF8String
import scala.collection.mutable.HashSet
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
index b1333ec09a..2b45a83d14 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
import net.razorvine.pickle.{Pickler, Unpickler}
+import org.apache.spark.{Accumulator, Logging => SparkLogging}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.python.{PythonBroadcast, PythonRDD}
import org.apache.spark.broadcast.Broadcast
@@ -34,7 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.types._
-import org.apache.spark.{Accumulator, Logging => SparkLogging}
+import org.apache.spark.unsafe.types.UTF8String
/**
* A serialized version of a Python lambda function. Suitable for use in a [[PythonRDD]].
@@ -203,8 +204,10 @@ object EvaluatePython {
case (c: Long, IntegerType) => c.toInt
case (c: Int, LongType) => c.toLong
case (c: Double, FloatType) => c.toFloat
- case (c: String, StringType) => UTF8String(c)
- case (c, StringType) if !c.isInstanceOf[String] => UTF8String(c.toString)
+ case (c: String, StringType) => UTF8String.fromString(c)
+ case (c, StringType) =>
+ // If we get here, c is not a string. Call toString on it.
+ UTF8String.fromString(c.toString)
case (c, _) => c
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
index 9028d5ed72..e75e6681c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.{Row, SpecificMutableRow}
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.types._
import org.apache.spark.sql.sources._
+import org.apache.spark.unsafe.types.UTF8String
/**
* Data corresponding to one partition of a JDBCRDD.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
index 4e07cf36ae..f16075ce58 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
@@ -28,6 +28,8 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
private[sql] object JacksonParser {
def apply(
@@ -54,7 +56,7 @@ private[sql] object JacksonParser {
convertField(factory, parser, schema)
case (VALUE_STRING, StringType) =>
- UTF8String(parser.getText)
+ UTF8String.fromString(parser.getText)
case (VALUE_STRING, _) if parser.getTextLength < 1 =>
// guard the non string type
@@ -74,7 +76,7 @@ private[sql] object JacksonParser {
val generator = factory.createGenerator(writer, JsonEncoding.UTF8)
generator.copyCurrentStructure(parser)
generator.close()
- UTF8String(writer.toByteArray)
+ UTF8String.fromBytes(writer.toByteArray)
case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, FloatType) =>
parser.getFloatValue
@@ -152,7 +154,8 @@ private[sql] object JacksonParser {
valueType: DataType): Map[UTF8String, Any] = {
val builder = Map.newBuilder[UTF8String, Any]
while (nextUntil(parser, JsonToken.END_OBJECT)) {
- builder += UTF8String(parser.getCurrentName) -> convertField(factory, parser, valueType)
+ builder +=
+ UTF8String.fromString(parser.getCurrentName) -> convertField(factory, parser, valueType)
}
builder.result()
@@ -180,7 +183,7 @@ private[sql] object JacksonParser {
val row = new GenericMutableRow(schema.length)
for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecords)) {
require(schema(corruptIndex).dataType == StringType)
- row.update(corruptIndex, UTF8String(record))
+ row.update(corruptIndex, UTF8String.fromString(record))
}
Seq(row)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index fb0d137bdb..e4acf1ddaf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -30,6 +30,8 @@ import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
private[sql] object JsonRDD extends Logging {
@@ -317,7 +319,7 @@ private[sql] object JsonRDD extends Logging {
parsed
} catch {
case e: JsonProcessingException =>
- Map(columnNameOfCorruptRecords -> UTF8String(record)) :: Nil
+ Map(columnNameOfCorruptRecords -> UTF8String.fromString(record)) :: Nil
}
}
})
@@ -409,7 +411,7 @@ private[sql] object JsonRDD extends Logging {
null
} else {
desiredType match {
- case StringType => UTF8String(toString(value))
+ case StringType => UTF8String.fromString(toString(value))
case _ if value == null || value == "" => null // guard the non string type
case IntegerType => value.asInstanceOf[IntegerType.InternalType]
case LongType => toLong(value)
@@ -423,7 +425,7 @@ private[sql] object JsonRDD extends Logging {
val map = value.asInstanceOf[Map[String, Any]]
map.map {
case (k, v) =>
- (UTF8String(k), enforceCorrectType(v, valueType))
+ (UTF8String.fromString(k), enforceCorrectType(v, valueType))
}.map(identity)
case struct: StructType => asRow(value.asInstanceOf[Map[String, Any]], struct)
case DateType => toDate(value)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index ddc5097f88..ab9f878d1e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.parquet.CatalystConverter.FieldType
import org.apache.spark.sql.types._
import org.apache.spark.sql.parquet.timestamp.NanoTime
+import org.apache.spark.unsafe.types.UTF8String
/**
* Collection of converters of Parquet types (group and primitive types) that
@@ -222,7 +223,7 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
updateField(fieldIndex, value.getBytes)
protected[parquet] def updateString(fieldIndex: Int, value: Array[Byte]): Unit =
- updateField(fieldIndex, UTF8String(value))
+ updateField(fieldIndex, UTF8String.fromBytes(value))
protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit =
updateField(fieldIndex, readTimestamp(value))
@@ -423,7 +424,7 @@ private[parquet] class CatalystPrimitiveRowConverter(
current.update(fieldIndex, value.getBytes)
override protected[parquet] def updateString(fieldIndex: Int, value: Array[Byte]): Unit =
- current.update(fieldIndex, UTF8String(value))
+ current.update(fieldIndex, UTF8String.fromBytes(value))
override protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit =
current.setLong(fieldIndex, readTimestamp(value))
@@ -719,7 +720,7 @@ private[parquet] class CatalystNativeArrayConverter(
override protected[parquet] def updateString(fieldIndex: Int, value: Array[Byte]): Unit = {
checkGrowBuffer()
- buffer(elements) = UTF8String(value).asInstanceOf[NativeType]
+ buffer(elements) = UTF8String.fromBytes(value).asInstanceOf[NativeType]
elements += 1
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
index 88ae88e968..4d659f261a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
@@ -31,6 +31,7 @@ import org.apache.spark.SparkEnv
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.sources
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
private[sql] object ParquetFilters {
val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index e03dbdec04..c62c592b3f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -31,6 +31,7 @@ import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
/**
* A `parquet.io.api.RecordMaterializer` for Rows.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index c6a4dabbab..edda3f2017 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -26,9 +26,10 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.types.{StringType, StructType, UTF8String}
+import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{SaveMode, Strategy, execution, sources}
import org.apache.spark.util.Utils
+import org.apache.spark.unsafe.types.UTF8String
/**
* A Strategy for planning scans over data sources defined using the sources API.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
index 8421e670ff..6daddfb2c4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
@@ -22,12 +22,14 @@ import java.nio.ByteBuffer
import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
+import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.columnar.ColumnarTestUtils._
import org.apache.spark.sql.execution.SparkSqlSerializer
import org.apache.spark.sql.types._
-import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
+import org.apache.spark.unsafe.types.UTF8String
+
class ColumnTypeSuite extends SparkFunSuite with Logging {
val DEFAULT_BUFFER_SIZE = 512
@@ -66,7 +68,7 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
checkActualSize(FLOAT, Float.MaxValue, 4)
checkActualSize(FIXED_DECIMAL(15, 10), Decimal(0, 15, 10), 8)
checkActualSize(BOOLEAN, true, 1)
- checkActualSize(STRING, UTF8String("hello"), 4 + "hello".getBytes("utf-8").length)
+ checkActualSize(STRING, UTF8String.fromString("hello"), 4 + "hello".getBytes("utf-8").length)
checkActualSize(DATE, 0, 4)
checkActualSize(TIMESTAMP, 0L, 8)
@@ -118,7 +120,7 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
val length = buffer.getInt()
val bytes = new Array[Byte](length)
buffer.get(bytes)
- UTF8String(bytes)
+ UTF8String.fromBytes(bytes)
})
testColumnType[BinaryType.type, Array[Byte]](
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
index c5d38595c0..1bc7eb3631 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
@@ -22,7 +22,10 @@ import scala.util.Random
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
-import org.apache.spark.sql.types.{AtomicType, DataType, Decimal, UTF8String}
+import org.apache.spark.sql.types.{AtomicType, DataType, Decimal}
+import org.apache.spark.sql.types.{DataType, Decimal, AtomicType}
+import org.apache.spark.unsafe.types.UTF8String
+
object ColumnarTestUtils {
def makeNullRow(length: Int): GenericMutableRow = {
@@ -46,7 +49,7 @@ object ColumnarTestUtils {
case FLOAT => Random.nextFloat()
case DOUBLE => Random.nextDouble()
case FIXED_DECIMAL(precision, scale) => Decimal(Random.nextLong() % 100, precision, scale)
- case STRING => UTF8String(Random.nextString(Random.nextInt(32)))
+ case STRING => UTF8String.fromString(Random.nextString(Random.nextInt(32)))
case BOOLEAN => Random.nextBoolean()
case BINARY => randomBytes(Random.nextInt(32))
case DATE => Random.nextInt()
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index 1f14cba78f..fd01a8722b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.types
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
/* Implicit conversions */
import scala.collection.JavaConversions._
@@ -242,9 +243,9 @@ private[hive] trait HiveInspectors {
def unwrap(data: Any, oi: ObjectInspector): Any = oi match {
case coi: ConstantObjectInspector if coi.getWritableConstantValue == null => null
case poi: WritableConstantStringObjectInspector =>
- UTF8String(poi.getWritableConstantValue.toString)
+ UTF8String.fromString(poi.getWritableConstantValue.toString)
case poi: WritableConstantHiveVarcharObjectInspector =>
- UTF8String(poi.getWritableConstantValue.getHiveVarchar.getValue)
+ UTF8String.fromString(poi.getWritableConstantValue.getHiveVarchar.getValue)
case poi: WritableConstantHiveDecimalObjectInspector =>
HiveShim.toCatalystDecimal(
PrimitiveObjectInspectorFactory.javaHiveDecimalObjectInspector,
@@ -288,13 +289,13 @@ private[hive] trait HiveInspectors {
case pi: PrimitiveObjectInspector => pi match {
// We think HiveVarchar is also a String
case hvoi: HiveVarcharObjectInspector if hvoi.preferWritable() =>
- UTF8String(hvoi.getPrimitiveWritableObject(data).getHiveVarchar.getValue)
+ UTF8String.fromString(hvoi.getPrimitiveWritableObject(data).getHiveVarchar.getValue)
case hvoi: HiveVarcharObjectInspector =>
- UTF8String(hvoi.getPrimitiveJavaObject(data).getValue)
+ UTF8String.fromString(hvoi.getPrimitiveJavaObject(data).getValue)
case x: StringObjectInspector if x.preferWritable() =>
- UTF8String(x.getPrimitiveWritableObject(data).toString)
+ UTF8String.fromString(x.getPrimitiveWritableObject(data).toString)
case x: StringObjectInspector =>
- UTF8String(x.getPrimitiveJavaObject(data))
+ UTF8String.fromString(x.getPrimitiveJavaObject(data))
case x: IntObjectInspector if x.preferWritable() => x.get(data)
case x: BooleanObjectInspector if x.preferWritable() => x.get(data)
case x: FloatObjectInspector if x.preferWritable() => x.get(data)
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
new file mode 100644
index 0000000000..a351680195
--- /dev/null
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.types;
+
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import javax.annotation.Nullable;
+
+import org.apache.spark.unsafe.PlatformDependent;
+
+/**
+ * A UTF-8 String for internal Spark use.
+ * <p>
+ * A String encoded in UTF-8 as an Array[Byte], which can be used for comparison,
+ * search, see http://en.wikipedia.org/wiki/UTF-8 for details.
+ * <p>
+ * Note: This is not designed for general use cases, should not be used outside SQL.
+ */
+public final class UTF8String implements Comparable<UTF8String>, Serializable {
+
+ @Nullable
+ private byte[] bytes;
+
+ private static int[] bytesOfCodePointInUTF8 = {2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
+ 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
+ 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
+ 4, 4, 4, 4, 4, 4, 4, 4,
+ 5, 5, 5, 5,
+ 6, 6, 6, 6};
+
+ public static UTF8String fromBytes(byte[] bytes) {
+ return (bytes != null) ? new UTF8String().set(bytes) : null;
+ }
+
+ public static UTF8String fromString(String str) {
+ return (str != null) ? new UTF8String().set(str) : null;
+ }
+
+ /**
+ * Updates the UTF8String with String.
+ */
+ public UTF8String set(final String str) {
+ try {
+ bytes = str.getBytes("utf-8");
+ } catch (UnsupportedEncodingException e) {
+ // Turn the exception into unchecked so we can find out about it at runtime, but
+ // don't need to add lots of boilerplate code everywhere.
+ PlatformDependent.throwException(e);
+ }
+ return this;
+ }
+
+ /**
+ * Updates the UTF8String with byte[], which should be encoded in UTF-8.
+ */
+ public UTF8String set(final byte[] bytes) {
+ this.bytes = bytes;
+ return this;
+ }
+
+ /**
+ * Returns the number of bytes for a code point with the first byte as `b`
+ * @param b The first byte of a code point
+ */
+ public int numBytes(final byte b) {
+ final int offset = (b & 0xFF) - 192;
+ return (offset >= 0) ? bytesOfCodePointInUTF8[offset] : 1;
+ }
+
+ /**
+ * Returns the number of code points in it.
+ *
+ * This is only used by Substring() when `start` is negative.
+ */
+ public int length() {
+ int len = 0;
+ for (int i = 0; i < bytes.length; i+= numBytes(bytes[i])) {
+ len += 1;
+ }
+ return len;
+ }
+
+ public byte[] getBytes() {
+ return bytes;
+ }
+
+ /**
+ * Returns a substring of this.
+ * @param start the position of first code point
+ * @param until the position after last code point, exclusive.
+ */
+ public UTF8String substring(final int start, final int until) {
+ if (until <= start || start >= bytes.length) {
+ return UTF8String.fromBytes(new byte[0]);
+ }
+
+ int i = 0;
+ int c = 0;
+ for (; i < bytes.length && c < start; i += numBytes(bytes[i])) {
+ c += 1;
+ }
+
+ int j = i;
+ for (; j < bytes.length && c < until; j += numBytes(bytes[i])) {
+ c += 1;
+ }
+
+ return UTF8String.fromBytes(Arrays.copyOfRange(bytes, i, j));
+ }
+
+ public boolean contains(final UTF8String substring) {
+ final byte[] b = substring.getBytes();
+ if (b.length == 0) {
+ return true;
+ }
+
+ for (int i = 0; i <= bytes.length - b.length; i++) {
+ // TODO: Avoid copying.
+ if (bytes[i] == b[0] && Arrays.equals(Arrays.copyOfRange(bytes, i, i + b.length), b)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean startsWith(final UTF8String prefix) {
+ final byte[] b = prefix.getBytes();
+ // TODO: Avoid copying.
+ return b.length <= bytes.length && Arrays.equals(Arrays.copyOfRange(bytes, 0, b.length), b);
+ }
+
+ public boolean endsWith(final UTF8String suffix) {
+ final byte[] b = suffix.getBytes();
+ return b.length <= bytes.length &&
+ Arrays.equals(Arrays.copyOfRange(bytes, bytes.length - b.length, bytes.length), b);
+ }
+
+ public UTF8String toUpperCase() {
+ return UTF8String.fromString(toString().toUpperCase());
+ }
+
+ public UTF8String toLowerCase() {
+ return UTF8String.fromString(toString().toLowerCase());
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return new String(bytes, "utf-8");
+ } catch (UnsupportedEncodingException e) {
+ // Turn the exception into unchecked so we can find out about it at runtime, but
+ // don't need to add lots of boilerplate code everywhere.
+ PlatformDependent.throwException(e);
+ return "unknown"; // we will never reach here.
+ }
+ }
+
+ @Override
+ public UTF8String clone() {
+ return new UTF8String().set(bytes);
+ }
+
+ @Override
+ public int compareTo(final UTF8String other) {
+ final byte[] b = other.getBytes();
+ for (int i = 0; i < bytes.length && i < b.length; i++) {
+ int res = bytes[i] - b[i];
+ if (res != 0) {
+ return res;
+ }
+ }
+ return bytes.length - b.length;
+ }
+
+ public int compare(final UTF8String other) {
+ return compareTo(other);
+ }
+
+ @Override
+ public boolean equals(final Object other) {
+ if (other instanceof UTF8String) {
+ return Arrays.equals(bytes, ((UTF8String) other).getBytes());
+ } else if (other instanceof String) {
+ // Used only in unit tests.
+ String s = (String) other;
+ return bytes.length >= s.length() && length() == s.length() && toString().equals(s);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(bytes);
+ }
+}
diff --git a/unsafe/src/test/java/org/apache/spark/unsafe/bitset/BitSetSuite.java b/unsafe/src/test/java/org/apache/spark/unsafe/bitset/BitSetSuite.java
index 18393db9f3..a93fc0ee29 100644
--- a/unsafe/src/test/java/org/apache/spark/unsafe/bitset/BitSetSuite.java
+++ b/unsafe/src/test/java/org/apache/spark/unsafe/bitset/BitSetSuite.java
@@ -18,7 +18,6 @@
package org.apache.spark.unsafe.bitset;
import junit.framework.Assert;
-import org.apache.spark.unsafe.bitset.BitSet;
import org.junit.Test;
import org.apache.spark.unsafe.memory.MemoryBlock;
diff --git a/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java b/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
new file mode 100644
index 0000000000..80c179a1b5
--- /dev/null
+++ b/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
@@ -0,0 +1,93 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.unsafe.types;
+
+import java.io.UnsupportedEncodingException;
+
+import junit.framework.Assert;
+import org.junit.Test;
+
+public class UTF8StringSuite {
+
+ private void checkBasic(String str, int len) throws UnsupportedEncodingException {
+ Assert.assertEquals(UTF8String.fromString(str).length(), len);
+ Assert.assertEquals(UTF8String.fromBytes(str.getBytes("utf8")).length(), len);
+
+ Assert.assertEquals(UTF8String.fromString(str), str);
+ Assert.assertEquals(UTF8String.fromBytes(str.getBytes("utf8")), str);
+ Assert.assertEquals(UTF8String.fromString(str).toString(), str);
+ Assert.assertEquals(UTF8String.fromBytes(str.getBytes("utf8")).toString(), str);
+ Assert.assertEquals(UTF8String.fromBytes(str.getBytes("utf8")), UTF8String.fromString(str));
+
+ Assert.assertEquals(UTF8String.fromString(str).hashCode(),
+ UTF8String.fromBytes(str.getBytes("utf8")).hashCode());
+ }
+
+ @Test
+ public void basicTest() throws UnsupportedEncodingException {
+ checkBasic("hello", 5);
+ checkBasic("世 界", 3);
+ }
+
+ @Test
+ public void contains() {
+ Assert.assertTrue(UTF8String.fromString("hello").contains(UTF8String.fromString("ello")));
+ Assert.assertFalse(UTF8String.fromString("hello").contains(UTF8String.fromString("vello")));
+ Assert.assertFalse(UTF8String.fromString("hello").contains(UTF8String.fromString("hellooo")));
+ Assert.assertTrue(UTF8String.fromString("大千世界").contains(UTF8String.fromString("千世")));
+ Assert.assertFalse(UTF8String.fromString("大千世界").contains(UTF8String.fromString("世千")));
+ Assert.assertFalse(
+ UTF8String.fromString("大千世界").contains(UTF8String.fromString("大千世界好")));
+ }
+
+ @Test
+ public void startsWith() {
+ Assert.assertTrue(UTF8String.fromString("hello").startsWith(UTF8String.fromString("hell")));
+ Assert.assertFalse(UTF8String.fromString("hello").startsWith(UTF8String.fromString("ell")));
+ Assert.assertFalse(UTF8String.fromString("hello").startsWith(UTF8String.fromString("hellooo")));
+ Assert.assertTrue(UTF8String.fromString("数据砖头").startsWith(UTF8String.fromString("数据")));
+ Assert.assertFalse(UTF8String.fromString("大千世界").startsWith(UTF8String.fromString("千")));
+ Assert.assertFalse(
+ UTF8String.fromString("大千世界").startsWith(UTF8String.fromString("大千世界好")));
+ }
+
+ @Test
+ public void endsWith() {
+ Assert.assertTrue(UTF8String.fromString("hello").endsWith(UTF8String.fromString("ello")));
+ Assert.assertFalse(UTF8String.fromString("hello").endsWith(UTF8String.fromString("ellov")));
+ Assert.assertFalse(UTF8String.fromString("hello").endsWith(UTF8String.fromString("hhhello")));
+ Assert.assertTrue(UTF8String.fromString("大千世界").endsWith(UTF8String.fromString("世界")));
+ Assert.assertFalse(UTF8String.fromString("大千世界").endsWith(UTF8String.fromString("世")));
+ Assert.assertFalse(
+ UTF8String.fromString("数据砖头").endsWith(UTF8String.fromString("我的数据砖头")));
+ }
+
+ @Test
+ public void substring() {
+ Assert.assertEquals(
+ UTF8String.fromString("hello").substring(0, 0), UTF8String.fromString(""));
+ Assert.assertEquals(
+ UTF8String.fromString("hello").substring(1, 3), UTF8String.fromString("el"));
+ Assert.assertEquals(
+ UTF8String.fromString("数据砖头").substring(0, 1), UTF8String.fromString("数"));
+ Assert.assertEquals(
+ UTF8String.fromString("数据砖头").substring(1, 3), UTF8String.fromString("据砖"));
+ Assert.assertEquals(
+ UTF8String.fromString("数据砖头").substring(3, 5), UTF8String.fromString("头"));
+ }
+}