aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src
diff options
context:
space:
mode:
authorWenchen Fan <cloud0fan@outlook.com>2015-06-25 22:44:26 -0700
committerDavies Liu <davies@databricks.com>2015-06-25 22:44:26 -0700
commit1a79f0eb8da7e850c443383b3bb24e0bf8e1e7cb (patch)
tree85061123f7de88e73e317304ee8fec392f54e7db /sql/catalyst/src
parent40360112c417b5432564f4bcb8a9100f4066b55e (diff)
downloadspark-1a79f0eb8da7e850c443383b3bb24e0bf8e1e7cb.tar.gz
spark-1a79f0eb8da7e850c443383b3bb24e0bf8e1e7cb.tar.bz2
spark-1a79f0eb8da7e850c443383b3bb24e0bf8e1e7cb.zip
[SPARK-8635] [SQL] improve performance of CatalystTypeConverters
In `CatalystTypeConverters.createToCatalystConverter`, we add special handling for primitive types. We can apply this strategy to more places to improve performance. Author: Wenchen Fan <cloud0fan@outlook.com> Closes #7018 from cloud-fan/converter and squashes the following commits: 8b16630 [Wenchen Fan] another fix 326c82c [Wenchen Fan] optimize type converter
Diffstat (limited to 'sql/catalyst/src')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala60
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala3
2 files changed, 38 insertions, 25 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index 429fc4077b..012f8bbecb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -52,6 +52,13 @@ object CatalystTypeConverters {
}
}
+ private def isWholePrimitive(dt: DataType): Boolean = dt match {
+ case dt if isPrimitive(dt) => true
+ case ArrayType(elementType, _) => isWholePrimitive(elementType)
+ case MapType(keyType, valueType, _) => isWholePrimitive(keyType) && isWholePrimitive(valueType)
+ case _ => false
+ }
+
private def getConverterForType(dataType: DataType): CatalystTypeConverter[Any, Any, Any] = {
val converter = dataType match {
case udt: UserDefinedType[_] => UDTConverter(udt)
@@ -148,6 +155,8 @@ object CatalystTypeConverters {
private[this] val elementConverter = getConverterForType(elementType)
+ private[this] val isNoChange = isWholePrimitive(elementType)
+
override def toCatalystImpl(scalaValue: Any): Seq[Any] = {
scalaValue match {
case a: Array[_] => a.toSeq.map(elementConverter.toCatalyst)
@@ -166,8 +175,10 @@ object CatalystTypeConverters {
override def toScala(catalystValue: Seq[Any]): Seq[Any] = {
if (catalystValue == null) {
null
+ } else if (isNoChange) {
+ catalystValue
} else {
- catalystValue.asInstanceOf[Seq[_]].map(elementConverter.toScala)
+ catalystValue.map(elementConverter.toScala)
}
}
@@ -183,6 +194,8 @@ object CatalystTypeConverters {
private[this] val keyConverter = getConverterForType(keyType)
private[this] val valueConverter = getConverterForType(valueType)
+ private[this] val isNoChange = isWholePrimitive(keyType) && isWholePrimitive(valueType)
+
override def toCatalystImpl(scalaValue: Any): Map[Any, Any] = scalaValue match {
case m: Map[_, _] =>
m.map { case (k, v) =>
@@ -203,6 +216,8 @@ object CatalystTypeConverters {
override def toScala(catalystValue: Map[Any, Any]): Map[Any, Any] = {
if (catalystValue == null) {
null
+ } else if (isNoChange) {
+ catalystValue
} else {
catalystValue.map { case (k, v) =>
keyConverter.toScala(k) -> valueConverter.toScala(v)
@@ -258,16 +273,13 @@ object CatalystTypeConverters {
toScala(row(column).asInstanceOf[InternalRow])
}
- private object StringConverter extends CatalystTypeConverter[Any, String, Any] {
+ private object StringConverter extends CatalystTypeConverter[Any, String, UTF8String] {
override def toCatalystImpl(scalaValue: Any): UTF8String = scalaValue match {
case str: String => UTF8String.fromString(str)
case utf8: UTF8String => utf8
}
- override def toScala(catalystValue: Any): String = catalystValue match {
- case null => null
- case str: String => str
- case utf8: UTF8String => utf8.toString()
- }
+ override def toScala(catalystValue: UTF8String): String =
+ if (catalystValue == null) null else catalystValue.toString
override def toScalaImpl(row: InternalRow, column: Int): String = row(column).toString
}
@@ -275,7 +287,8 @@ object CatalystTypeConverters {
override def toCatalystImpl(scalaValue: Date): Int = DateTimeUtils.fromJavaDate(scalaValue)
override def toScala(catalystValue: Any): Date =
if (catalystValue == null) null else DateTimeUtils.toJavaDate(catalystValue.asInstanceOf[Int])
- override def toScalaImpl(row: InternalRow, column: Int): Date = toScala(row.getInt(column))
+ override def toScalaImpl(row: InternalRow, column: Int): Date =
+ DateTimeUtils.toJavaDate(row.getInt(column))
}
private object TimestampConverter extends CatalystTypeConverter[Timestamp, Timestamp, Any] {
@@ -285,7 +298,7 @@ object CatalystTypeConverters {
if (catalystValue == null) null
else DateTimeUtils.toJavaTimestamp(catalystValue.asInstanceOf[Long])
override def toScalaImpl(row: InternalRow, column: Int): Timestamp =
- toScala(row.getLong(column))
+ DateTimeUtils.toJavaTimestamp(row.getLong(column))
}
private object BigDecimalConverter extends CatalystTypeConverter[Any, JavaBigDecimal, Decimal] {
@@ -296,10 +309,7 @@ object CatalystTypeConverters {
}
override def toScala(catalystValue: Decimal): JavaBigDecimal = catalystValue.toJavaBigDecimal
override def toScalaImpl(row: InternalRow, column: Int): JavaBigDecimal =
- row.get(column) match {
- case d: JavaBigDecimal => d
- case d: Decimal => d.toJavaBigDecimal
- }
+ row.get(column).asInstanceOf[Decimal].toJavaBigDecimal
}
private abstract class PrimitiveConverter[T] extends CatalystTypeConverter[T, Any, Any] {
@@ -363,6 +373,19 @@ object CatalystTypeConverters {
}
/**
+ * Creates a converter function that will convert Catalyst types to Scala type.
+ * Typical use case would be converting a collection of rows that have the same schema. You will
+ * call this function once to get a converter, and apply it to every row.
+ */
+ private[sql] def createToScalaConverter(dataType: DataType): Any => Any = {
+ if (isPrimitive(dataType)) {
+ identity
+ } else {
+ getConverterForType(dataType).toScala
+ }
+ }
+
+ /**
* Converts Scala objects to Catalyst rows / types.
*
* Note: This should be called before do evaluation on Row
@@ -389,15 +412,6 @@ object CatalystTypeConverters {
* produced by createToScalaConverter.
*/
def convertToScala(catalystValue: Any, dataType: DataType): Any = {
- getConverterForType(dataType).toScala(catalystValue)
- }
-
- /**
- * Creates a converter function that will convert Catalyst types to Scala type.
- * Typical use case would be converting a collection of rows that have the same schema. You will
- * call this function once to get a converter, and apply it to every row.
- */
- private[sql] def createToScalaConverter(dataType: DataType): Any => Any = {
- getConverterForType(dataType).toScala
+ createToScalaConverter(dataType)(catalystValue)
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
index 3992f1f59d..55df72f102 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.catalyst.expressions
-import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.types.DataType
@@ -39,7 +38,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
(1 to 22).map { x =>
val anys = (1 to x).map(x => "Any").reduce(_ + ", " + _)
val childs = (0 to x - 1).map(x => s"val child$x = children($x)").reduce(_ + "\n " + _)
- lazy val converters = (0 to x - 1).map(x => s"lazy val converter$x = CatalystTypeConverters.createToScalaConverter(child$x.dataType)").reduce(_ + "\n " + _)
+ val converters = (0 to x - 1).map(x => s"lazy val converter$x = CatalystTypeConverters.createToScalaConverter(child$x.dataType)").reduce(_ + "\n " + _)
val evals = (0 to x - 1).map(x => s"converter$x(child$x.eval(input))").reduce(_ + ",\n " + _)
s"""case $x =>