aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-04-15 13:06:38 -0700
committerMichael Armbrust <michael@databricks.com>2015-04-15 13:06:38 -0700
commit85842760dc4616577162f44cc0fa9db9bd23bd9c (patch)
tree3f0d8c9e0b9cb75c6fed3e2e3d6b5302a384d600 /sql/hive
parent785f95586b951d7b05481ee925fb95c20c4d6b6f (diff)
downloadspark-85842760dc4616577162f44cc0fa9db9bd23bd9c.tar.gz
spark-85842760dc4616577162f44cc0fa9db9bd23bd9c.tar.bz2
spark-85842760dc4616577162f44cc0fa9db9bd23bd9c.zip
[SPARK-6638] [SQL] Improve performance of StringType in SQL
This PR change the internal representation for StringType from java.lang.String to UTF8String, which is implemented use ArrayByte. This PR should not break any public API, Row.getString() will still return java.lang.String. This is the first step of improve the performance of String in SQL. cc rxin Author: Davies Liu <davies@databricks.com> Closes #5350 from davies/string and squashes the following commits: 3b7bfa8 [Davies Liu] fix schema of AddJar 2772f0d [Davies Liu] fix new test failure 6d776a9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into string 59025c8 [Davies Liu] address comments from @marmbrus 341ec2c [Davies Liu] turn off scala style check in UTF8StringSuite 744788f [Davies Liu] Merge branch 'master' of github.com:apache/spark into string b04a19c [Davies Liu] add comment for getString/setString 08d897b [Davies Liu] Merge branch 'master' of github.com:apache/spark into string 5116b43 [Davies Liu] rollback unrelated changes 1314a37 [Davies Liu] address comments from Yin 867bf50 [Davies Liu] fix String filter push down 13d9d42 [Davies Liu] Merge branch 'master' of github.com:apache/spark into string 2089d24 [Davies Liu] add hashcode check back ac18ae6 [Davies Liu] address comment fd11364 [Davies Liu] optimize UTF8String 8d17f21 [Davies Liu] fix hive compatibility tests e5fa5b8 [Davies Liu] remove clone in UTF8String 28f3d81 [Davies Liu] Merge branch 'master' of github.com:apache/spark into string 28d6f32 [Davies Liu] refactor 537631c [Davies Liu] some comment about Date 9f4c194 [Davies Liu] convert data type for data source 956b0a4 [Davies Liu] fix hive tests 73e4363 [Davies Liu] Merge branch 'master' of github.com:apache/spark into string 9dc32d1 [Davies Liu] fix some hive tests 23a766c [Davies Liu] refactor 8b45864 [Davies Liu] fix codegen with UTF8String bb52e44 [Davies Liu] fix scala style c7dd4d2 [Davies Liu] fix some catalyst tests 38c303e [Davies Liu] fix python sql tests 5f9e120 [Davies Liu] fix sql tests 6b499ac [Davies Liu] fix style a85fb27 [Davies Liu] refactor d32abd1 [Davies Liu] fix utf8 for python api 4699c3a [Davies Liu] use Array[Byte] in UTF8String 21f67c6 [Davies Liu] cleanup 685fd07 [Davies Liu] use UTF8String instead of String for StringType
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala22
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala13
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala17
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala10
-rw-r--r--sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala4
-rw-r--r--sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala36
6 files changed, 52 insertions, 50 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index 921c6194c7..74ae984f34 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -34,7 +34,7 @@ import scala.collection.JavaConversions._
* 1. The Underlying data type in catalyst and in Hive
* In catalyst:
* Primitive =>
- * java.lang.String
+ * UTF8String
* int / scala.Int
* boolean / scala.Boolean
* float / scala.Float
@@ -239,9 +239,10 @@ private[hive] trait HiveInspectors {
*/
def unwrap(data: Any, oi: ObjectInspector): Any = oi match {
case coi: ConstantObjectInspector if coi.getWritableConstantValue == null => null
- case poi: WritableConstantStringObjectInspector => poi.getWritableConstantValue.toString
+ case poi: WritableConstantStringObjectInspector =>
+ UTF8String(poi.getWritableConstantValue.toString)
case poi: WritableConstantHiveVarcharObjectInspector =>
- poi.getWritableConstantValue.getHiveVarchar.getValue
+ UTF8String(poi.getWritableConstantValue.getHiveVarchar.getValue)
case poi: WritableConstantHiveDecimalObjectInspector =>
HiveShim.toCatalystDecimal(
PrimitiveObjectInspectorFactory.javaHiveDecimalObjectInspector,
@@ -284,10 +285,13 @@ private[hive] trait HiveInspectors {
case pi: PrimitiveObjectInspector => pi match {
// We think HiveVarchar is also a String
case hvoi: HiveVarcharObjectInspector if hvoi.preferWritable() =>
- hvoi.getPrimitiveWritableObject(data).getHiveVarchar.getValue
- case hvoi: HiveVarcharObjectInspector => hvoi.getPrimitiveJavaObject(data).getValue
+ UTF8String(hvoi.getPrimitiveWritableObject(data).getHiveVarchar.getValue)
+ case hvoi: HiveVarcharObjectInspector =>
+ UTF8String(hvoi.getPrimitiveJavaObject(data).getValue)
case x: StringObjectInspector if x.preferWritable() =>
- x.getPrimitiveWritableObject(data).toString
+ UTF8String(x.getPrimitiveWritableObject(data).toString)
+ case x: StringObjectInspector =>
+ UTF8String(x.getPrimitiveJavaObject(data))
case x: IntObjectInspector if x.preferWritable() => x.get(data)
case x: BooleanObjectInspector if x.preferWritable() => x.get(data)
case x: FloatObjectInspector if x.preferWritable() => x.get(data)
@@ -340,7 +344,9 @@ private[hive] trait HiveInspectors {
*/
protected def wrapperFor(oi: ObjectInspector): Any => Any = oi match {
case _: JavaHiveVarcharObjectInspector =>
- (o: Any) => new HiveVarchar(o.asInstanceOf[String], o.asInstanceOf[String].size)
+ (o: Any) =>
+ val s = o.asInstanceOf[UTF8String].toString
+ new HiveVarchar(s, s.size)
case _: JavaHiveDecimalObjectInspector =>
(o: Any) => HiveShim.createDecimal(o.asInstanceOf[Decimal].toJavaBigDecimal)
@@ -409,7 +415,7 @@ private[hive] trait HiveInspectors {
case x: PrimitiveObjectInspector => x match {
// TODO we don't support the HiveVarcharObjectInspector yet.
case _: StringObjectInspector if x.preferWritable() => HiveShim.getStringWritable(a)
- case _: StringObjectInspector => a.asInstanceOf[java.lang.String]
+ case _: StringObjectInspector => a.asInstanceOf[UTF8String].toString()
case _: IntObjectInspector if x.preferWritable() => HiveShim.getIntWritable(a)
case _: IntObjectInspector => a.asInstanceOf[java.lang.Integer]
case _: BooleanObjectInspector if x.preferWritable() => HiveShim.getBooleanWritable(a)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 1ccb0c279c..a6f4fbe8ab 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -17,24 +17,21 @@
package org.apache.spark.sql.hive
-import org.apache.spark.sql.catalyst.expressions.Row
-
import scala.collection.JavaConversions._
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
+import org.apache.spark.sql.catalyst.expressions.{Row, _}
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.sources.DescribeCommand
-import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
-import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand, _}
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.parquet.ParquetRelation
-import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, CreateTableUsing}
+import org.apache.spark.sql.sources.{CreateTableUsing, CreateTableUsingAsSelect, DescribeCommand}
import org.apache.spark.sql.types.StringType
@@ -131,7 +128,7 @@ private[hive] trait HiveStrategies {
val partitionValues = part.getValues
var i = 0
while (i < partitionValues.size()) {
- inputData(i) = partitionValues(i)
+ inputData(i) = CatalystTypeConverters.convertToCatalyst(partitionValues(i))
i += 1
}
pruningCondition(inputData)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index 8efed7f029..cab0fdd357 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -17,8 +17,7 @@
package org.apache.spark.sql.hive.execution
-import java.io.{BufferedReader, InputStreamReader}
-import java.io.{DataInputStream, DataOutputStream, EOFException}
+import java.io.{BufferedReader, DataInputStream, DataOutputStream, EOFException, InputStreamReader}
import java.util.Properties
import scala.collection.JavaConversions._
@@ -28,12 +27,13 @@ import org.apache.hadoop.hive.serde2.AbstractSerDe
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.ScriptInputOutputSchema
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.types.DataType
-import org.apache.spark.sql.hive.{HiveContext, HiveInspectors}
import org.apache.spark.sql.hive.HiveShim._
+import org.apache.spark.sql.hive.{HiveContext, HiveInspectors}
+import org.apache.spark.sql.types.DataType
import org.apache.spark.util.Utils
/**
@@ -121,14 +121,13 @@ case class ScriptTransformation(
if (outputSerde == null) {
val prevLine = curLine
curLine = reader.readLine()
-
if (!ioschema.schemaLess) {
- new GenericRow(
- prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD"))
+ new GenericRow(CatalystTypeConverters.convertToCatalyst(
+ prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD")))
.asInstanceOf[Array[Any]])
} else {
- new GenericRow(
- prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD"), 2)
+ new GenericRow(CatalystTypeConverters.convertToCatalyst(
+ prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD"), 2))
.asInstanceOf[Array[Any]])
}
} else {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 902a12785e..a40a1e5311 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -22,11 +22,11 @@ import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext}
-import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types._
/**
* Analyzes the given table in the current database to generate statistics, which will be
@@ -76,6 +76,12 @@ case class DropTable(
private[hive]
case class AddJar(path: String) extends RunnableCommand {
+ override val output: Seq[Attribute] = {
+ val schema = StructType(
+ StructField("result", IntegerType, false) :: Nil)
+ schema.toAttributes
+ }
+
override def run(sqlContext: SQLContext): Seq[Row] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
hiveContext.runSqlHive(s"ADD JAR $path")
diff --git a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala
index 0ed93c2c5b..33e96eaabf 100644
--- a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala
+++ b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala
@@ -41,7 +41,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfo, TypeInfoFactory}
import org.apache.hadoop.io.{NullWritable, Writable}
import org.apache.hadoop.mapred.InputFormat
-import org.apache.spark.sql.types.{Decimal, DecimalType}
+import org.apache.spark.sql.types.{UTF8String, Decimal, DecimalType}
private[hive] case class HiveFunctionWrapper(functionClassName: String)
extends java.io.Serializable {
@@ -135,7 +135,7 @@ private[hive] object HiveShim {
PrimitiveCategory.VOID, null)
def getStringWritable(value: Any): hadoopIo.Text =
- if (value == null) null else new hadoopIo.Text(value.asInstanceOf[String])
+ if (value == null) null else new hadoopIo.Text(value.asInstanceOf[UTF8String].toString)
def getIntWritable(value: Any): hadoopIo.IntWritable =
if (value == null) null else new hadoopIo.IntWritable(value.asInstanceOf[Int])
diff --git a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
index 7577309900..d331c210e8 100644
--- a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
+++ b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
@@ -17,37 +17,35 @@
package org.apache.spark.sql.hive
-import java.util
-import java.util.{ArrayList => JArrayList}
-import java.util.Properties
import java.rmi.server.UID
+import java.util.{Properties, ArrayList => JArrayList}
import scala.collection.JavaConversions._
import scala.language.implicitConversions
+import com.esotericsoftware.kryo.Kryo
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.{NullWritable, Writable}
-import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.hive.common.StatsSetupConst
-import org.apache.hadoop.hive.common.`type`.{HiveDecimal}
+import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Context
-import org.apache.hadoop.hive.ql.metadata.{Table, Hive, Partition}
+import org.apache.hadoop.hive.ql.exec.{UDF, Utilities}
+import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
import org.apache.hadoop.hive.ql.plan.{CreateTableDesc, FileSinkDesc, TableDesc}
import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory
import org.apache.hadoop.hive.serde.serdeConstants
-import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfo, DecimalTypeInfo, TypeInfoFactory}
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.{HiveDecimalObjectInspector, PrimitiveObjectInspectorFactory}
-import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, PrimitiveObjectInspector, ObjectInspector}
-import org.apache.hadoop.hive.serde2.{Deserializer, ColumnProjectionUtils}
-import org.apache.hadoop.hive.serde2.{io => hiveIo}
import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.{HiveDecimalObjectInspector, PrimitiveObjectInspectorFactory}
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorConverters, PrimitiveObjectInspector}
+import org.apache.hadoop.hive.serde2.typeinfo.{DecimalTypeInfo, TypeInfo, TypeInfoFactory}
+import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Deserializer, io => hiveIo}
+import org.apache.hadoop.io.{NullWritable, Writable}
+import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.{io => hadoopIo}
import org.apache.spark.Logging
-import org.apache.spark.sql.types.{Decimal, DecimalType}
-
+import org.apache.spark.sql.types.{Decimal, DecimalType, UTF8String}
/**
* This class provides the UDF creation and also the UDF instance serialization and
@@ -63,18 +61,14 @@ private[hive] case class HiveFunctionWrapper(var functionClassName: String)
// for Serialization
def this() = this(null)
- import java.io.{OutputStream, InputStream}
- import com.esotericsoftware.kryo.Kryo
import org.apache.spark.util.Utils._
- import org.apache.hadoop.hive.ql.exec.Utilities
- import org.apache.hadoop.hive.ql.exec.UDF
@transient
private val methodDeSerialize = {
val method = classOf[Utilities].getDeclaredMethod(
"deserializeObjectByKryo",
classOf[Kryo],
- classOf[InputStream],
+ classOf[java.io.InputStream],
classOf[Class[_]])
method.setAccessible(true)
@@ -87,7 +81,7 @@ private[hive] case class HiveFunctionWrapper(var functionClassName: String)
"serializeObjectByKryo",
classOf[Kryo],
classOf[Object],
- classOf[OutputStream])
+ classOf[java.io.OutputStream])
method.setAccessible(true)
method
@@ -224,7 +218,7 @@ private[hive] object HiveShim {
TypeInfoFactory.voidTypeInfo, null)
def getStringWritable(value: Any): hadoopIo.Text =
- if (value == null) null else new hadoopIo.Text(value.asInstanceOf[String])
+ if (value == null) null else new hadoopIo.Text(value.asInstanceOf[UTF8String].toString)
def getIntWritable(value: Any): hadoopIo.IntWritable =
if (value == null) null else new hadoopIo.IntWritable(value.asInstanceOf[Int])