aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-06-11 16:07:15 -0700
committerReynold Xin <rxin@databricks.com>2015-06-11 16:07:15 -0700
commit7d669a56ffc7a4f5827830ef3c27d45cc0e8774f (patch)
tree26345664c6976534f8ba212e60c4b65855053f3e /sql/core
parent9cbdf31ec1399d4d43a1863c15688ce78b6dfd92 (diff)
downloadspark-7d669a56ffc7a4f5827830ef3c27d45cc0e8774f.tar.gz
spark-7d669a56ffc7a4f5827830ef3c27d45cc0e8774f.tar.bz2
spark-7d669a56ffc7a4f5827830ef3c27d45cc0e8774f.zip
[SPARK-8286] Rewrite UTF8String in Java and move it into unsafe package.
Unit test is still in Scala. Author: Reynold Xin <rxin@databricks.com> Closes #6738 from rxin/utf8string-java and squashes the following commits: 562dc6e [Reynold Xin] Flag... 98e600b [Reynold Xin] Another try with encoding setting .. cfa6bdf [Reynold Xin] Merge branch 'master' into utf8string-java a3b124d [Reynold Xin] Try different UTF-8 encoded characters. 1ff7c82 [Reynold Xin] Enable UTF-8 encoding. 82d58cc [Reynold Xin] Reset run-tests. 2cb3c69 [Reynold Xin] Use utf-8 encoding in set bytes. 53f8ef4 [Reynold Xin] Hack Jenkins to run one test. 9a48e8d [Reynold Xin] Fixed runtime compilation error. 911c450 [Reynold Xin] Moved unit test also to Java. 4eff7bd [Reynold Xin] Improved unit test coverage. 8e89a3c [Reynold Xin] Fixed tests. 77c64bd [Reynold Xin] Fixed string type codegen. ffedb62 [Reynold Xin] Code review feedback. 0967ce6 [Reynold Xin] Fixed import ordering. 45a123d [Reynold Xin] [SPARK-8286] Rewrite UTF8String in Java and move it into unsafe package.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala7
14 files changed, 43 insertions, 21 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
index 83881a3687..11c79c865f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.columnar
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
private[sql] class ColumnStatisticsSchema(a: Attribute) extends Serializable {
val upperBound = AttributeReference(a.name + ".upperBound", a.dataType, nullable = true)()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
index c9c4d630fb..8e21020917 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.MutableRow
import org.apache.spark.sql.execution.SparkSqlSerializer
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
/**
* An abstract class that represents type of a column. Used to append/extract Java objects into/from
@@ -320,7 +321,7 @@ private[sql] object STRING extends NativeColumnType(StringType, 7, 8) {
val length = buffer.getInt()
val stringBytes = new Array[Byte](length)
buffer.get(stringBytes, 0, length)
- UTF8String(stringBytes)
+ UTF8String.fromBytes(stringBytes)
}
override def setField(row: MutableRow, ordinal: Int, value: UTF8String): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
index 60f3b2d539..202e4488a6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
@@ -28,6 +28,7 @@ import org.apache.spark.serializer._
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, MutableRow, SpecificMutableRow}
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
/**
* The serialization stream for [[SparkSqlSerializer2]]. It assumes that the object passed in
@@ -434,7 +435,7 @@ private[sql] object SparkSqlSerializer2 {
val length = in.readInt()
val bytes = new Array[Byte](length)
in.readFully(bytes)
- mutableRow.update(i, UTF8String(bytes))
+ mutableRow.update(i, UTF8String.fromBytes(bytes))
}
case BinaryType =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index 720b529d59..83c1f65d5c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.unsafe.types.UTF8String
import scala.collection.mutable.HashSet
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
index b1333ec09a..2b45a83d14 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
import net.razorvine.pickle.{Pickler, Unpickler}
+import org.apache.spark.{Accumulator, Logging => SparkLogging}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.python.{PythonBroadcast, PythonRDD}
import org.apache.spark.broadcast.Broadcast
@@ -34,7 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.types._
-import org.apache.spark.{Accumulator, Logging => SparkLogging}
+import org.apache.spark.unsafe.types.UTF8String
/**
* A serialized version of a Python lambda function. Suitable for use in a [[PythonRDD]].
@@ -203,8 +204,10 @@ object EvaluatePython {
case (c: Long, IntegerType) => c.toInt
case (c: Int, LongType) => c.toLong
case (c: Double, FloatType) => c.toFloat
- case (c: String, StringType) => UTF8String(c)
- case (c, StringType) if !c.isInstanceOf[String] => UTF8String(c.toString)
+ case (c: String, StringType) => UTF8String.fromString(c)
+ case (c, StringType) =>
+ // If we get here, c is not a string. Call toString on it.
+ UTF8String.fromString(c.toString)
case (c, _) => c
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
index 9028d5ed72..e75e6681c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.{Row, SpecificMutableRow}
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.types._
import org.apache.spark.sql.sources._
+import org.apache.spark.unsafe.types.UTF8String
/**
* Data corresponding to one partition of a JDBCRDD.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
index 4e07cf36ae..f16075ce58 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
@@ -28,6 +28,8 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
private[sql] object JacksonParser {
def apply(
@@ -54,7 +56,7 @@ private[sql] object JacksonParser {
convertField(factory, parser, schema)
case (VALUE_STRING, StringType) =>
- UTF8String(parser.getText)
+ UTF8String.fromString(parser.getText)
case (VALUE_STRING, _) if parser.getTextLength < 1 =>
// guard the non string type
@@ -74,7 +76,7 @@ private[sql] object JacksonParser {
val generator = factory.createGenerator(writer, JsonEncoding.UTF8)
generator.copyCurrentStructure(parser)
generator.close()
- UTF8String(writer.toByteArray)
+ UTF8String.fromBytes(writer.toByteArray)
case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, FloatType) =>
parser.getFloatValue
@@ -152,7 +154,8 @@ private[sql] object JacksonParser {
valueType: DataType): Map[UTF8String, Any] = {
val builder = Map.newBuilder[UTF8String, Any]
while (nextUntil(parser, JsonToken.END_OBJECT)) {
- builder += UTF8String(parser.getCurrentName) -> convertField(factory, parser, valueType)
+ builder +=
+ UTF8String.fromString(parser.getCurrentName) -> convertField(factory, parser, valueType)
}
builder.result()
@@ -180,7 +183,7 @@ private[sql] object JacksonParser {
val row = new GenericMutableRow(schema.length)
for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecords)) {
require(schema(corruptIndex).dataType == StringType)
- row.update(corruptIndex, UTF8String(record))
+ row.update(corruptIndex, UTF8String.fromString(record))
}
Seq(row)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index fb0d137bdb..e4acf1ddaf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -30,6 +30,8 @@ import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
private[sql] object JsonRDD extends Logging {
@@ -317,7 +319,7 @@ private[sql] object JsonRDD extends Logging {
parsed
} catch {
case e: JsonProcessingException =>
- Map(columnNameOfCorruptRecords -> UTF8String(record)) :: Nil
+ Map(columnNameOfCorruptRecords -> UTF8String.fromString(record)) :: Nil
}
}
})
@@ -409,7 +411,7 @@ private[sql] object JsonRDD extends Logging {
null
} else {
desiredType match {
- case StringType => UTF8String(toString(value))
+ case StringType => UTF8String.fromString(toString(value))
case _ if value == null || value == "" => null // guard the non string type
case IntegerType => value.asInstanceOf[IntegerType.InternalType]
case LongType => toLong(value)
@@ -423,7 +425,7 @@ private[sql] object JsonRDD extends Logging {
val map = value.asInstanceOf[Map[String, Any]]
map.map {
case (k, v) =>
- (UTF8String(k), enforceCorrectType(v, valueType))
+ (UTF8String.fromString(k), enforceCorrectType(v, valueType))
}.map(identity)
case struct: StructType => asRow(value.asInstanceOf[Map[String, Any]], struct)
case DateType => toDate(value)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index ddc5097f88..ab9f878d1e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.parquet.CatalystConverter.FieldType
import org.apache.spark.sql.types._
import org.apache.spark.sql.parquet.timestamp.NanoTime
+import org.apache.spark.unsafe.types.UTF8String
/**
* Collection of converters of Parquet types (group and primitive types) that
@@ -222,7 +223,7 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
updateField(fieldIndex, value.getBytes)
protected[parquet] def updateString(fieldIndex: Int, value: Array[Byte]): Unit =
- updateField(fieldIndex, UTF8String(value))
+ updateField(fieldIndex, UTF8String.fromBytes(value))
protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit =
updateField(fieldIndex, readTimestamp(value))
@@ -423,7 +424,7 @@ private[parquet] class CatalystPrimitiveRowConverter(
current.update(fieldIndex, value.getBytes)
override protected[parquet] def updateString(fieldIndex: Int, value: Array[Byte]): Unit =
- current.update(fieldIndex, UTF8String(value))
+ current.update(fieldIndex, UTF8String.fromBytes(value))
override protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit =
current.setLong(fieldIndex, readTimestamp(value))
@@ -719,7 +720,7 @@ private[parquet] class CatalystNativeArrayConverter(
override protected[parquet] def updateString(fieldIndex: Int, value: Array[Byte]): Unit = {
checkGrowBuffer()
- buffer(elements) = UTF8String(value).asInstanceOf[NativeType]
+ buffer(elements) = UTF8String.fromBytes(value).asInstanceOf[NativeType]
elements += 1
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
index 88ae88e968..4d659f261a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
@@ -31,6 +31,7 @@ import org.apache.spark.SparkEnv
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.sources
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
private[sql] object ParquetFilters {
val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index e03dbdec04..c62c592b3f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -31,6 +31,7 @@ import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
/**
* A `parquet.io.api.RecordMaterializer` for Rows.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index c6a4dabbab..edda3f2017 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -26,9 +26,10 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.types.{StringType, StructType, UTF8String}
+import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{SaveMode, Strategy, execution, sources}
import org.apache.spark.util.Utils
+import org.apache.spark.unsafe.types.UTF8String
/**
* A Strategy for planning scans over data sources defined using the sources API.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
index 8421e670ff..6daddfb2c4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
@@ -22,12 +22,14 @@ import java.nio.ByteBuffer
import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
+import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.columnar.ColumnarTestUtils._
import org.apache.spark.sql.execution.SparkSqlSerializer
import org.apache.spark.sql.types._
-import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
+import org.apache.spark.unsafe.types.UTF8String
+
class ColumnTypeSuite extends SparkFunSuite with Logging {
val DEFAULT_BUFFER_SIZE = 512
@@ -66,7 +68,7 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
checkActualSize(FLOAT, Float.MaxValue, 4)
checkActualSize(FIXED_DECIMAL(15, 10), Decimal(0, 15, 10), 8)
checkActualSize(BOOLEAN, true, 1)
- checkActualSize(STRING, UTF8String("hello"), 4 + "hello".getBytes("utf-8").length)
+ checkActualSize(STRING, UTF8String.fromString("hello"), 4 + "hello".getBytes("utf-8").length)
checkActualSize(DATE, 0, 4)
checkActualSize(TIMESTAMP, 0L, 8)
@@ -118,7 +120,7 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
val length = buffer.getInt()
val bytes = new Array[Byte](length)
buffer.get(bytes)
- UTF8String(bytes)
+ UTF8String.fromBytes(bytes)
})
testColumnType[BinaryType.type, Array[Byte]](
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
index c5d38595c0..1bc7eb3631 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
@@ -22,7 +22,10 @@ import scala.util.Random
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
-import org.apache.spark.sql.types.{AtomicType, DataType, Decimal, UTF8String}
+import org.apache.spark.sql.types.{AtomicType, DataType, Decimal}
+import org.apache.spark.sql.types.{DataType, Decimal, AtomicType}
+import org.apache.spark.unsafe.types.UTF8String
+
object ColumnarTestUtils {
def makeNullRow(length: Int): GenericMutableRow = {
@@ -46,7 +49,7 @@ object ColumnarTestUtils {
case FLOAT => Random.nextFloat()
case DOUBLE => Random.nextDouble()
case FIXED_DECIMAL(precision, scale) => Decimal(Random.nextLong() % 100, precision, scale)
- case STRING => UTF8String(Random.nextString(Random.nextInt(32)))
+ case STRING => UTF8String.fromString(Random.nextString(Random.nextInt(32)))
case BOOLEAN => Random.nextBoolean()
case BINARY => randomBytes(Random.nextInt(32))
case DATE => Random.nextInt()